1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2022 Intel Corporation
4# SPDX-License-Identifier: Apache-2.0
5
6import contextlib
7import glob
8import logging
9import os
10import pathlib
11import re
12import shutil
13import subprocess
14import sys
15import tempfile
16
17logger = logging.getLogger('twister')
18logger.setLevel(logging.DEBUG)
19
20supported_coverage_formats = {
21    "gcovr": ["html", "xml", "csv", "txt", "coveralls", "sonarqube"],
22    "lcov":  ["html", "lcov"]
23}
24
25
26class CoverageTool:
27    """ Base class for every supported coverage tool
28    """
29
30    def __init__(self):
31        self.gcov_tool = None
32        self.base_dir = None
33        self.output_formats = None
34
35    @staticmethod
36    def factory(tool, jobs=None):
37        if tool == 'lcov':
38            t =  Lcov(jobs)
39        elif tool == 'gcovr':
40            t =  Gcovr()
41        else:
42            logger.error(f"Unsupported coverage tool specified: {tool}")
43            return None
44
45        logger.debug(f"Select {tool} as the coverage tool...")
46        return t
47
48    @staticmethod
49    def retrieve_gcov_data(input_file):
50        logger.debug(f"Working on {input_file}")
51        extracted_coverage_info = {}
52        capture_data = False
53        capture_complete = False
54        with open(input_file) as fp:
55            for line in fp.readlines():
56                if re.search("GCOV_COVERAGE_DUMP_START", line):
57                    capture_data = True
58                    capture_complete = False
59                    continue
60                if re.search("GCOV_COVERAGE_DUMP_END", line):
61                    capture_complete = True
62                    # Keep searching for additional dumps
63                # Loop until the coverage data is found.
64                if not capture_data:
65                    continue
66                if line.startswith("*"):
67                    sp = line.split("<")
68                    if len(sp) > 1:
69                        # Remove the leading delimiter "*"
70                        file_name = sp[0][1:]
71                        # Remove the trailing new line char
72                        hex_dump = sp[1][:-1]
73                    else:
74                        continue
75                else:
76                    continue
77                if file_name in extracted_coverage_info:
78                    extracted_coverage_info[file_name].append(hex_dump)
79                else:
80                    extracted_coverage_info[file_name] = [hex_dump]
81        if not capture_data:
82            capture_complete = True
83        return {'complete': capture_complete, 'data': extracted_coverage_info}
84
85    def merge_hexdumps(self, hexdumps):
86        # Only one hexdump
87        if len(hexdumps) == 1:
88            return hexdumps[0]
89
90        with tempfile.TemporaryDirectory() as dir:
91            # Write each hexdump to a dedicated temporary folder
92            dirs = []
93            for idx, dump in enumerate(hexdumps):
94                subdir = dir + f'/{idx}'
95                os.mkdir(subdir)
96                dirs.append(subdir)
97                with open(f'{subdir}/tmp.gcda', 'wb') as fp:
98                    fp.write(bytes.fromhex(dump))
99
100            # Iteratively call gcov-tool (not gcov) to merge the files
101            merge_tool = self.gcov_tool + '-tool'
102            for d1, d2 in zip(dirs[:-1], dirs[1:], strict=False):
103                cmd = [merge_tool, 'merge', d1, d2, '--output', d2]
104                subprocess.call(cmd)
105
106            # Read back the final output file
107            with open(f'{dirs[-1]}/tmp.gcda', 'rb') as fp:
108                return fp.read(-1).hex()
109
110    def create_gcda_files(self, extracted_coverage_info):
111        gcda_created = True
112        logger.debug("Generating gcda files")
113        for filename, hexdumps in extracted_coverage_info.items():
114            # if kobject_hash is given for coverage gcovr fails
115            # hence skipping it problem only in gcovr v4.1
116            if "kobject_hash" in filename:
117                filename = (filename[:-4]) + "gcno"
118                with contextlib.suppress(Exception):
119                    os.remove(filename)
120                continue
121
122            try:
123                hexdump_val = self.merge_hexdumps(hexdumps)
124                hex_bytes = bytes.fromhex(hexdump_val)
125                with open(filename, 'wb') as fp:
126                    fp.write(hex_bytes)
127            except ValueError:
128                logger.exception(f"Unable to convert hex data for file: {filename}")
129                gcda_created = False
130            except FileNotFoundError:
131                logger.exception(f"Unable to create gcda file: {filename}")
132                gcda_created = False
133        return gcda_created
134
135    def generate(self, outdir):
136        coverage_completed = True
137        for filename in glob.glob(f"{outdir}/**/handler.log", recursive=True):
138            gcov_data = self.__class__.retrieve_gcov_data(filename)
139            capture_complete = gcov_data['complete']
140            extracted_coverage_info = gcov_data['data']
141            if capture_complete:
142                gcda_created = self.create_gcda_files(extracted_coverage_info)
143                if gcda_created:
144                    logger.debug(f"Gcov data captured: {filename}")
145                else:
146                    logger.error(f"Gcov data invalid for: {filename}")
147                    coverage_completed = False
148            else:
149                logger.error(f"Gcov data capture incomplete: {filename}")
150                coverage_completed = False
151
152        with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog:
153            ret = self._generate(outdir, coveragelog)
154            if ret == 0:
155                report_log = {
156                    "html": "HTML report generated: {}".format(
157                        os.path.join(outdir, "coverage", "index.html")
158                    ),
159                    "lcov": "LCOV report generated: {}".format(
160                        os.path.join(outdir, "coverage.info")
161                    ),
162                    "xml": "XML report generated: {}".format(
163                        os.path.join(outdir, "coverage", "coverage.xml")
164                    ),
165                    "csv": "CSV report generated: {}".format(
166                        os.path.join(outdir, "coverage", "coverage.csv")
167                    ),
168                    "txt": "TXT report generated: {}".format(
169                        os.path.join(outdir, "coverage", "coverage.txt")
170                    ),
171                    "coveralls": "Coveralls report generated: {}".format(
172                        os.path.join(outdir, "coverage", "coverage.coveralls.json")
173                    ),
174                    "sonarqube": "Sonarqube report generated: {}".format(
175                        os.path.join(outdir, "coverage", "coverage.sonarqube.xml")
176                    )
177                }
178                for r in self.output_formats.split(','):
179                    logger.info(report_log[r])
180            else:
181                coverage_completed = False
182        logger.debug(f"All coverage data processed: {coverage_completed}")
183        return coverage_completed
184
185
186class Lcov(CoverageTool):
187
188    def __init__(self, jobs=None):
189        super().__init__()
190        self.ignores = []
191        self.ignore_branch_patterns = []
192        self.output_formats = "lcov,html"
193        self.version = self.get_version()
194        self.jobs = jobs
195
196    def get_version(self):
197        try:
198            result = subprocess.run(
199                ['lcov', '--version'],
200                capture_output=True,
201                text=True,
202                check=True
203            )
204            version_output = result.stdout.strip().replace('lcov: LCOV version ', '')
205            return version_output
206        except subprocess.CalledProcessError as e:
207            logger.error(f"Unable to determine lcov version: {e}")
208            sys.exit(1)
209        except FileNotFoundError as e:
210            logger.error(f"Unable to find lcov tool: {e}")
211            sys.exit(1)
212
213    def add_ignore_file(self, pattern):
214        self.ignores.append('*' + pattern + '*')
215
216    def add_ignore_directory(self, pattern):
217        self.ignores.append('*/' + pattern + '/*')
218
219    def add_ignore_branch_pattern(self, pattern):
220        self.ignore_branch_patterns.append(pattern)
221
222    @property
223    def is_lcov_v2(self):
224        return self.version.startswith("2")
225
226    def run_command(self, cmd, coveragelog):
227        if self.is_lcov_v2:
228            # The --ignore-errors source option is added for genhtml as well as
229            # lcov to avoid it exiting due to
230            # samples/application_development/external_lib/
231            cmd += [
232                "--ignore-errors", "inconsistent,inconsistent",
233                "--ignore-errors", "negative,negative",
234                "--ignore-errors", "unused,unused",
235                "--ignore-errors", "empty,empty",
236                "--ignore-errors", "mismatch,mismatch",
237            ]
238
239        cmd_str = " ".join(cmd)
240        logger.debug(f"Running {cmd_str}...")
241        return subprocess.call(cmd, stdout=coveragelog)
242
243    def run_lcov(self, args, coveragelog):
244        if self.is_lcov_v2:
245            branch_coverage = "branch_coverage=1"
246            if self.jobs is None:
247                # Default: --parallel=0 will autodetect appropriate parallelism
248                parallel = ["--parallel", "0"]
249            elif self.jobs == 1:
250                # Serial execution requested, don't parallelize at all
251                parallel = []
252            else:
253                parallel = ["--parallel", str(self.jobs)]
254        else:
255            branch_coverage = "lcov_branch_coverage=1"
256            parallel = []
257
258        cmd = [
259            "lcov", "--gcov-tool", self.gcov_tool,
260            "--rc", branch_coverage,
261        ] + parallel + args
262        return self.run_command(cmd, coveragelog)
263
264    def _generate(self, outdir, coveragelog):
265        coveragefile = os.path.join(outdir, "coverage.info")
266        ztestfile = os.path.join(outdir, "ztest.info")
267
268        cmd = ["--capture", "--directory", outdir, "--output-file", coveragefile]
269        self.run_lcov(cmd, coveragelog)
270
271        # We want to remove tests/* and tests/ztest/test/* but save tests/ztest
272        cmd = ["--extract", coveragefile,
273               os.path.join(self.base_dir, "tests", "ztest", "*"),
274               "--output-file", ztestfile]
275        self.run_lcov(cmd, coveragelog)
276
277        if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
278            cmd = ["--remove", ztestfile,
279                   os.path.join(self.base_dir, "tests/ztest/test/*"),
280                   "--output-file", ztestfile]
281            self.run_lcov(cmd, coveragelog)
282
283            files = [coveragefile, ztestfile]
284        else:
285            files = [coveragefile]
286
287        for i in self.ignores:
288            cmd = ["--remove", coveragefile, i, "--output-file", coveragefile]
289            self.run_lcov(cmd, coveragelog)
290
291        if 'html' not in self.output_formats.split(','):
292            return 0
293
294        cmd = ["genhtml", "--legend", "--branch-coverage",
295               "--prefix", self.base_dir,
296               "-output-directory", os.path.join(outdir, "coverage")] + files
297        return self.run_command(cmd, coveragelog)
298
299
300class Gcovr(CoverageTool):
301
302    def __init__(self):
303        super().__init__()
304        self.ignores = []
305        self.ignore_branch_patterns = []
306        self.output_formats = "html"
307        self.version = self.get_version()
308
309    def get_version(self):
310        try:
311            result = subprocess.run(
312                ['gcovr', '--version'],
313                capture_output=True,
314                text=True,
315                check=True
316            )
317            version_lines = result.stdout.strip().split('\n')
318            if version_lines:
319                version_output = version_lines[0].replace('gcovr ', '')
320                return version_output
321        except subprocess.CalledProcessError as e:
322            logger.error(f"Unable to determine gcovr version: {e}")
323            sys.exit(1)
324        except FileNotFoundError as e:
325            logger.error(f"Unable to find gcovr tool: {e}")
326            sys.exit(1)
327
328    def add_ignore_file(self, pattern):
329        self.ignores.append('.*' + pattern + '.*')
330
331    def add_ignore_directory(self, pattern):
332        self.ignores.append(".*/" + pattern + '/.*')
333
334    def add_ignore_branch_pattern(self, pattern):
335        self.ignore_branch_patterns.append(pattern)
336
337    @staticmethod
338    def _interleave_list(prefix, list):
339        tuple_list = [(prefix, item) for item in list]
340        return [item for sublist in tuple_list for item in sublist]
341
342    @staticmethod
343    def _flatten_list(list):
344        return [a for b in list for a in b]
345
346    def _generate(self, outdir, coveragelog):
347        coveragefile = os.path.join(outdir, "coverage.json")
348        ztestfile = os.path.join(outdir, "ztest.json")
349
350        excludes = Gcovr._interleave_list("-e", self.ignores)
351        if len(self.ignore_branch_patterns) > 0:
352            # Last pattern overrides previous values, so merge all patterns together
353            merged_regex = "|".join([f"({p})" for p in self.ignore_branch_patterns])
354            excludes += ["--exclude-branches-by-pattern", merged_regex]
355
356        # Different ifdef-ed implementations of the same function should not be
357        # in conflict treated by GCOVR as separate objects for coverage statistics.
358        mode_options = ["--merge-mode-functions=separate"]
359
360        # We want to remove tests/* and tests/ztest/test/* but save tests/ztest
361        cmd = ["gcovr", "-r", self.base_dir,
362               "--gcov-ignore-parse-errors=negative_hits.warn_once_per_file",
363               "--gcov-executable", self.gcov_tool,
364               "-e", "tests/*"]
365        cmd += excludes + mode_options + ["--json", "-o", coveragefile, outdir]
366        cmd_str = " ".join(cmd)
367        logger.debug(f"Running {cmd_str}...")
368        subprocess.call(cmd, stdout=coveragelog)
369
370        subprocess.call(["gcovr", "-r", self.base_dir, "--gcov-executable",
371                         self.gcov_tool, "-f", "tests/ztest", "-e",
372                         "tests/ztest/test/*", "--json", "-o", ztestfile,
373                         outdir] + mode_options, stdout=coveragelog)
374
375        if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
376            files = [coveragefile, ztestfile]
377        else:
378            files = [coveragefile]
379
380        subdir = os.path.join(outdir, "coverage")
381        os.makedirs(subdir, exist_ok=True)
382
383        tracefiles = self._interleave_list("--add-tracefile", files)
384
385        # Convert command line argument (comma-separated list) to gcovr flags
386        report_options = {
387            "html": ["--html", os.path.join(subdir, "index.html"), "--html-details"],
388            "xml": ["--xml", os.path.join(subdir, "coverage.xml"), "--xml-pretty"],
389            "csv": ["--csv", os.path.join(subdir, "coverage.csv")],
390            "txt": ["--txt", os.path.join(subdir, "coverage.txt")],
391            "coveralls": ["--coveralls", os.path.join(subdir, "coverage.coveralls.json"),
392                          "--coveralls-pretty"],
393            "sonarqube": ["--sonarqube", os.path.join(subdir, "coverage.sonarqube.xml")]
394        }
395        gcovr_options = self._flatten_list(
396            [report_options[r] for r in self.output_formats.split(',')]
397        )
398
399        return subprocess.call(
400            ["gcovr", "-r", self.base_dir] \
401                + mode_options + gcovr_options + tracefiles, stdout=coveragelog
402        )
403
404
405
406def run_coverage(testplan, options):
407    use_system_gcov = False
408    gcov_tool = None
409
410    for plat in options.coverage_platform:
411        _plat = testplan.get_platform(plat)
412        if _plat and (_plat.type in {"native", "unit"}):
413            use_system_gcov = True
414    if not options.gcov_tool:
415        zephyr_sdk_gcov_tool = os.path.join(
416            os.environ.get("ZEPHYR_SDK_INSTALL_DIR", default=""),
417            "x86_64-zephyr-elf/bin/x86_64-zephyr-elf-gcov")
418        if os.environ.get("ZEPHYR_TOOLCHAIN_VARIANT") == "llvm":
419            llvm_path = os.environ.get("LLVM_TOOLCHAIN_PATH")
420            if llvm_path is not None:
421                llvm_path = os.path.join(llvm_path, "bin")
422            llvm_cov = shutil.which("llvm-cov", path=llvm_path)
423            llvm_cov_ext = pathlib.Path(llvm_cov).suffix
424            gcov_lnk = os.path.join(options.outdir, f"gcov{llvm_cov_ext}")
425            try:
426                os.symlink(llvm_cov, gcov_lnk)
427            except OSError:
428                shutil.copy(llvm_cov, gcov_lnk)
429            gcov_tool = gcov_lnk
430        elif use_system_gcov:
431            gcov_tool = "gcov"
432        elif os.path.exists(zephyr_sdk_gcov_tool):
433            gcov_tool = zephyr_sdk_gcov_tool
434        else:
435            logger.error(
436                "Can't find a suitable gcov tool. Use --gcov-tool or set ZEPHYR_SDK_INSTALL_DIR."
437            )
438            sys.exit(1)
439    else:
440        gcov_tool = str(options.gcov_tool)
441
442    logger.info("Generating coverage files...")
443    logger.info(f"Using gcov tool: {gcov_tool}")
444    coverage_tool = CoverageTool.factory(options.coverage_tool, jobs=options.jobs)
445    coverage_tool.gcov_tool = gcov_tool
446    coverage_tool.base_dir = os.path.abspath(options.coverage_basedir)
447    # Apply output format default
448    if options.coverage_formats is not None:
449        coverage_tool.output_formats = options.coverage_formats
450    coverage_tool.add_ignore_file('generated')
451    coverage_tool.add_ignore_directory('tests')
452    coverage_tool.add_ignore_directory('samples')
453    # Ignore branch coverage on LOG_* and LOG_HEXDUMP_* macros
454    # Branch misses are due to the implementation of Z_LOG2 and cannot be avoided
455    coverage_tool.add_ignore_branch_pattern(r"^\s*LOG_(?:HEXDUMP_)?(?:DBG|INF|WRN|ERR)\(.*")
456    # Ignore branch coverage on __ASSERT* macros
457    # Covering the failing case is not desirable as it will immediately terminate the test.
458    coverage_tool.add_ignore_branch_pattern(r"^\s*__ASSERT(?:_EVAL|_NO_MSG|_POST_ACTION)?\(.*")
459    coverage_completed = coverage_tool.generate(options.outdir)
460    return coverage_completed
461