1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2022 Intel Corporation
4# SPDX-License-Identifier: Apache-2.0
5
6import sys
7import os
8import logging
9import pathlib
10import shutil
11import subprocess
12import glob
13import re
14import tempfile
15
16logger = logging.getLogger('twister')
17logger.setLevel(logging.DEBUG)
18
19supported_coverage_formats = {
20    "gcovr": ["html", "xml", "csv", "txt", "coveralls", "sonarqube"],
21    "lcov":  ["html", "lcov"]
22}
23
24
25class CoverageTool:
26    """ Base class for every supported coverage tool
27    """
28
29    def __init__(self):
30        self.gcov_tool = None
31        self.base_dir = None
32        self.output_formats = None
33
34    @staticmethod
35    def factory(tool, jobs=None):
36        if tool == 'lcov':
37            t =  Lcov(jobs)
38        elif tool == 'gcovr':
39            t =  Gcovr()
40        else:
41            logger.error("Unsupported coverage tool specified: {}".format(tool))
42            return None
43
44        logger.debug(f"Select {tool} as the coverage tool...")
45        return t
46
47    @staticmethod
48    def retrieve_gcov_data(input_file):
49        logger.debug("Working on %s" % input_file)
50        extracted_coverage_info = {}
51        capture_data = False
52        capture_complete = False
53        with open(input_file, 'r') as fp:
54            for line in fp.readlines():
55                if re.search("GCOV_COVERAGE_DUMP_START", line):
56                    capture_data = True
57                    capture_complete = False
58                    continue
59                if re.search("GCOV_COVERAGE_DUMP_END", line):
60                    capture_complete = True
61                    # Keep searching for additional dumps
62                # Loop until the coverage data is found.
63                if not capture_data:
64                    continue
65                if line.startswith("*"):
66                    sp = line.split("<")
67                    if len(sp) > 1:
68                        # Remove the leading delimiter "*"
69                        file_name = sp[0][1:]
70                        # Remove the trailing new line char
71                        hex_dump = sp[1][:-1]
72                    else:
73                        continue
74                else:
75                    continue
76                if file_name in extracted_coverage_info:
77                    extracted_coverage_info[file_name].append(hex_dump)
78                else:
79                    extracted_coverage_info[file_name] = [hex_dump]
80        if not capture_data:
81            capture_complete = True
82        return {'complete': capture_complete, 'data': extracted_coverage_info}
83
84    def merge_hexdumps(self, hexdumps):
85        # Only one hexdump
86        if len(hexdumps) == 1:
87            return hexdumps[0]
88
89        with tempfile.TemporaryDirectory() as dir:
90            # Write each hexdump to a dedicated temporary folder
91            dirs = []
92            for idx, dump in enumerate(hexdumps):
93                subdir = dir + f'/{idx}'
94                os.mkdir(subdir)
95                dirs.append(subdir)
96                with open(f'{subdir}/tmp.gcda', 'wb') as fp:
97                    fp.write(bytes.fromhex(dump))
98
99            # Iteratively call gcov-tool (not gcov) to merge the files
100            merge_tool = self.gcov_tool + '-tool'
101            for d1, d2 in zip(dirs[:-1], dirs[1:]):
102                cmd = [merge_tool, 'merge', d1, d2, '--output', d2]
103                subprocess.call(cmd)
104
105            # Read back the final output file
106            with open(f'{dirs[-1]}/tmp.gcda', 'rb') as fp:
107                return fp.read(-1).hex()
108
109    def create_gcda_files(self, extracted_coverage_info):
110        gcda_created = True
111        logger.debug("Generating gcda files")
112        for filename, hexdumps in extracted_coverage_info.items():
113            # if kobject_hash is given for coverage gcovr fails
114            # hence skipping it problem only in gcovr v4.1
115            if "kobject_hash" in filename:
116                filename = (filename[:-4]) + "gcno"
117                try:
118                    os.remove(filename)
119                except Exception:
120                    pass
121                continue
122
123            try:
124                hexdump_val = self.merge_hexdumps(hexdumps)
125                with open(filename, 'wb') as fp:
126                    fp.write(bytes.fromhex(hexdump_val))
127            except ValueError:
128                logger.exception("Unable to convert hex data for file: {}".format(filename))
129                gcda_created = False
130            except FileNotFoundError:
131                logger.exception("Unable to create gcda file: {}".format(filename))
132                gcda_created = False
133        return gcda_created
134
135    def generate(self, outdir):
136        coverage_completed = True
137        for filename in glob.glob("%s/**/handler.log" % outdir, recursive=True):
138            gcov_data = self.__class__.retrieve_gcov_data(filename)
139            capture_complete = gcov_data['complete']
140            extracted_coverage_info = gcov_data['data']
141            if capture_complete:
142                gcda_created = self.create_gcda_files(extracted_coverage_info)
143                if gcda_created:
144                    logger.debug("Gcov data captured: {}".format(filename))
145                else:
146                    logger.error("Gcov data invalid for: {}".format(filename))
147                    coverage_completed = False
148            else:
149                logger.error("Gcov data capture incomplete: {}".format(filename))
150                coverage_completed = False
151
152        with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog:
153            ret = self._generate(outdir, coveragelog)
154            if ret == 0:
155                report_log = {
156                    "html": "HTML report generated: {}".format(os.path.join(outdir, "coverage", "index.html")),
157                    "lcov": "LCOV report generated: {}".format(os.path.join(outdir, "coverage.info")),
158                    "xml": "XML report generated: {}".format(os.path.join(outdir, "coverage", "coverage.xml")),
159                    "csv": "CSV report generated: {}".format(os.path.join(outdir, "coverage", "coverage.csv")),
160                    "txt": "TXT report generated: {}".format(os.path.join(outdir, "coverage", "coverage.txt")),
161                    "coveralls": "Coveralls report generated: {}".format(os.path.join(outdir, "coverage", "coverage.coveralls.json")),
162                    "sonarqube": "Sonarqube report generated: {}".format(os.path.join(outdir, "coverage", "coverage.sonarqube.xml"))
163                }
164                for r in self.output_formats.split(','):
165                    logger.info(report_log[r])
166            else:
167                coverage_completed = False
168        logger.debug("All coverage data processed: {}".format(coverage_completed))
169        return coverage_completed
170
171
172class Lcov(CoverageTool):
173
174    def __init__(self, jobs=None):
175        super().__init__()
176        self.ignores = []
177        self.ignore_branch_patterns = []
178        self.output_formats = "lcov,html"
179        self.version = self.get_version()
180        self.jobs = jobs
181
182    def get_version(self):
183        try:
184            result = subprocess.run(['lcov', '--version'],
185                                    stdout=subprocess.PIPE,
186                                    stderr=subprocess.PIPE,
187                                    text=True, check=True)
188            version_output = result.stdout.strip().replace('lcov: LCOV version ', '')
189            return version_output
190        except subprocess.CalledProcessError as e:
191            logger.error(f"Unable to determine lcov version: {e}")
192            sys.exit(1)
193        except FileNotFoundError as e:
194            logger.error(f"Unable to find lcov tool: {e}")
195            sys.exit(1)
196
197    def add_ignore_file(self, pattern):
198        self.ignores.append('*' + pattern + '*')
199
200    def add_ignore_directory(self, pattern):
201        self.ignores.append('*/' + pattern + '/*')
202
203    def add_ignore_branch_pattern(self, pattern):
204        self.ignore_branch_patterns.append(pattern)
205
206    @property
207    def is_lcov_v2(self):
208        return self.version.startswith("2")
209
210    def run_command(self, cmd, coveragelog):
211        if self.is_lcov_v2:
212            # The --ignore-errors source option is added for genhtml as well as
213            # lcov to avoid it exiting due to
214            # samples/application_development/external_lib/
215            cmd += [
216                "--ignore-errors", "inconsistent,inconsistent",
217                "--ignore-errors", "negative,negative",
218                "--ignore-errors", "unused,unused",
219                "--ignore-errors", "empty,empty",
220                "--ignore-errors", "mismatch,mismatch",
221            ]
222
223        cmd_str = " ".join(cmd)
224        logger.debug(f"Running {cmd_str}...")
225        return subprocess.call(cmd, stdout=coveragelog)
226
227    def run_lcov(self, args, coveragelog):
228        if self.is_lcov_v2:
229            branch_coverage = "branch_coverage=1"
230            if self.jobs is None:
231                # Default: --parallel=0 will autodetect appropriate parallelism
232                parallel = ["--parallel", "0"]
233            elif self.jobs == 1:
234                # Serial execution requested, don't parallelize at all
235                parallel = []
236            else:
237                parallel = ["--parallel", str(self.jobs)]
238        else:
239            branch_coverage = "lcov_branch_coverage=1"
240            parallel = []
241
242        cmd = [
243            "lcov", "--gcov-tool", self.gcov_tool,
244            "--rc", branch_coverage,
245        ] + parallel + args
246        return self.run_command(cmd, coveragelog)
247
248    def _generate(self, outdir, coveragelog):
249        coveragefile = os.path.join(outdir, "coverage.info")
250        ztestfile = os.path.join(outdir, "ztest.info")
251
252        cmd = ["--capture", "--directory", outdir, "--output-file", coveragefile]
253        self.run_lcov(cmd, coveragelog)
254
255        # We want to remove tests/* and tests/ztest/test/* but save tests/ztest
256        cmd = ["--extract", coveragefile,
257               os.path.join(self.base_dir, "tests", "ztest", "*"),
258               "--output-file", ztestfile]
259        self.run_lcov(cmd, coveragelog)
260
261        if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
262            cmd = ["--remove", ztestfile,
263                   os.path.join(self.base_dir, "tests/ztest/test/*"),
264                   "--output-file", ztestfile]
265            self.run_lcov(cmd, coveragelog)
266
267            files = [coveragefile, ztestfile]
268        else:
269            files = [coveragefile]
270
271        for i in self.ignores:
272            cmd = ["--remove", coveragefile, i, "--output-file", coveragefile]
273            self.run_lcov(cmd, coveragelog)
274
275        if 'html' not in self.output_formats.split(','):
276            return 0
277
278        cmd = ["genhtml", "--legend", "--branch-coverage",
279               "--prefix", self.base_dir,
280               "-output-directory", os.path.join(outdir, "coverage")] + files
281        return self.run_command(cmd, coveragelog)
282
283
284class Gcovr(CoverageTool):
285
286    def __init__(self):
287        super().__init__()
288        self.ignores = []
289        self.ignore_branch_patterns = []
290        self.output_formats = "html"
291        self.version = self.get_version()
292
293    def get_version(self):
294        try:
295            result = subprocess.run(['gcovr', '--version'],
296                                    stdout=subprocess.PIPE,
297                                    stderr=subprocess.PIPE,
298                                    text=True, check=True)
299            version_lines = result.stdout.strip().split('\n')
300            if version_lines:
301                version_output = version_lines[0].replace('gcovr ', '')
302                return version_output
303        except subprocess.CalledProcessError as e:
304            logger.error(f"Unable to determine gcovr version: {e}")
305            sys.exit(1)
306        except FileNotFoundError as e:
307            logger.error(f"Unable to find gcovr tool: {e}")
308            sys.exit(1)
309
310    def add_ignore_file(self, pattern):
311        self.ignores.append('.*' + pattern + '.*')
312
313    def add_ignore_directory(self, pattern):
314        self.ignores.append(".*/" + pattern + '/.*')
315
316    def add_ignore_branch_pattern(self, pattern):
317        self.ignore_branch_patterns.append(pattern)
318
319    @staticmethod
320    def _interleave_list(prefix, list):
321        tuple_list = [(prefix, item) for item in list]
322        return [item for sublist in tuple_list for item in sublist]
323
324    @staticmethod
325    def _flatten_list(list):
326        return [a for b in list for a in b]
327
328    def _generate(self, outdir, coveragelog):
329        coveragefile = os.path.join(outdir, "coverage.json")
330        ztestfile = os.path.join(outdir, "ztest.json")
331
332        excludes = Gcovr._interleave_list("-e", self.ignores)
333        if len(self.ignore_branch_patterns) > 0:
334            # Last pattern overrides previous values, so merge all patterns together
335            merged_regex = "|".join([f"({p})" for p in self.ignore_branch_patterns])
336            excludes += ["--exclude-branches-by-pattern", merged_regex]
337
338        # Different ifdef-ed implementations of the same function should not be
339        # in conflict treated by GCOVR as separate objects for coverage statistics.
340        mode_options = ["--merge-mode-functions=separate"]
341
342        # We want to remove tests/* and tests/ztest/test/* but save tests/ztest
343        cmd = ["gcovr", "-r", self.base_dir,
344               "--gcov-ignore-parse-errors=negative_hits.warn_once_per_file",
345               "--gcov-executable", self.gcov_tool,
346               "-e", "tests/*"]
347        cmd += excludes + mode_options + ["--json", "-o", coveragefile, outdir]
348        cmd_str = " ".join(cmd)
349        logger.debug(f"Running {cmd_str}...")
350        subprocess.call(cmd, stdout=coveragelog)
351
352        subprocess.call(["gcovr", "-r", self.base_dir, "--gcov-executable",
353                         self.gcov_tool, "-f", "tests/ztest", "-e",
354                         "tests/ztest/test/*", "--json", "-o", ztestfile,
355                         outdir] + mode_options, stdout=coveragelog)
356
357        if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
358            files = [coveragefile, ztestfile]
359        else:
360            files = [coveragefile]
361
362        subdir = os.path.join(outdir, "coverage")
363        os.makedirs(subdir, exist_ok=True)
364
365        tracefiles = self._interleave_list("--add-tracefile", files)
366
367        # Convert command line argument (comma-separated list) to gcovr flags
368        report_options = {
369            "html": ["--html", os.path.join(subdir, "index.html"), "--html-details"],
370            "xml": ["--xml", os.path.join(subdir, "coverage.xml"), "--xml-pretty"],
371            "csv": ["--csv", os.path.join(subdir, "coverage.csv")],
372            "txt": ["--txt", os.path.join(subdir, "coverage.txt")],
373            "coveralls": ["--coveralls", os.path.join(subdir, "coverage.coveralls.json"), "--coveralls-pretty"],
374            "sonarqube": ["--sonarqube", os.path.join(subdir, "coverage.sonarqube.xml")]
375        }
376        gcovr_options = self._flatten_list([report_options[r] for r in self.output_formats.split(',')])
377
378        return subprocess.call(["gcovr", "-r", self.base_dir] + mode_options + gcovr_options + tracefiles,
379                               stdout=coveragelog)
380
381
382
383def run_coverage(testplan, options):
384    use_system_gcov = False
385    gcov_tool = None
386
387    for plat in options.coverage_platform:
388        _plat = testplan.get_platform(plat)
389        if _plat and (_plat.type in {"native", "unit"}):
390            use_system_gcov = True
391    if not options.gcov_tool:
392        zephyr_sdk_gcov_tool = os.path.join(
393            os.environ.get("ZEPHYR_SDK_INSTALL_DIR", default=""),
394            "x86_64-zephyr-elf/bin/x86_64-zephyr-elf-gcov")
395        if os.environ.get("ZEPHYR_TOOLCHAIN_VARIANT") == "llvm":
396            llvm_path = os.environ.get("LLVM_TOOLCHAIN_PATH")
397            if llvm_path is not None:
398                llvm_path = os.path.join(llvm_path, "bin")
399            llvm_cov = shutil.which("llvm-cov", path=llvm_path)
400            llvm_cov_ext = pathlib.Path(llvm_cov).suffix
401            gcov_lnk = os.path.join(options.outdir, f"gcov{llvm_cov_ext}")
402            try:
403                os.symlink(llvm_cov, gcov_lnk)
404            except OSError:
405                shutil.copy(llvm_cov, gcov_lnk)
406            gcov_tool = gcov_lnk
407        elif use_system_gcov:
408            gcov_tool = "gcov"
409        elif os.path.exists(zephyr_sdk_gcov_tool):
410            gcov_tool = zephyr_sdk_gcov_tool
411        else:
412            logger.error(f"Can't find a suitable gcov tool. Use --gcov-tool or set ZEPHYR_SDK_INSTALL_DIR.")
413            sys.exit(1)
414    else:
415        gcov_tool = str(options.gcov_tool)
416
417    logger.info("Generating coverage files...")
418    logger.info(f"Using gcov tool: {gcov_tool}")
419    coverage_tool = CoverageTool.factory(options.coverage_tool, jobs=options.jobs)
420    coverage_tool.gcov_tool = gcov_tool
421    coverage_tool.base_dir = os.path.abspath(options.coverage_basedir)
422    # Apply output format default
423    if options.coverage_formats is not None:
424        coverage_tool.output_formats = options.coverage_formats
425    coverage_tool.add_ignore_file('generated')
426    coverage_tool.add_ignore_directory('tests')
427    coverage_tool.add_ignore_directory('samples')
428    # Ignore branch coverage on LOG_* and LOG_HEXDUMP_* macros
429    # Branch misses are due to the implementation of Z_LOG2 and cannot be avoided
430    coverage_tool.add_ignore_branch_pattern(r"^\s*LOG_(?:HEXDUMP_)?(?:DBG|INF|WRN|ERR)\(.*")
431    # Ignore branch coverage on __ASSERT* macros
432    # Covering the failing case is not desirable as it will immediately terminate the test.
433    coverage_tool.add_ignore_branch_pattern(r"^\s*__ASSERT(?:_EVAL|_NO_MSG|_POST_ACTION)?\(.*")
434    coverage_completed = coverage_tool.generate(options.outdir)
435    return coverage_completed
436