1# vim: set syntax=python ts=4 :
2#
3# Copyright (c) 2018-2022 Intel Corporation
4# SPDX-License-Identifier: Apache-2.0
5
6import sys
7import os
8import logging
9import pathlib
10import shutil
11import subprocess
12import glob
13import re
14
15logger = logging.getLogger('twister')
16logger.setLevel(logging.DEBUG)
17
18supported_coverage_formats = {
19    "gcovr": ["html", "xml", "csv", "txt", "coveralls", "sonarqube"],
20    "lcov":  ["html", "lcov"]
21}
22
23
24class CoverageTool:
25    """ Base class for every supported coverage tool
26    """
27
28    def __init__(self):
29        self.gcov_tool = None
30        self.base_dir = None
31        self.output_formats = None
32
33    @staticmethod
34    def factory(tool, jobs=None):
35        if tool == 'lcov':
36            t =  Lcov(jobs)
37        elif tool == 'gcovr':
38            t =  Gcovr()
39        else:
40            logger.error("Unsupported coverage tool specified: {}".format(tool))
41            return None
42
43        logger.debug(f"Select {tool} as the coverage tool...")
44        return t
45
46    @staticmethod
47    def retrieve_gcov_data(input_file):
48        logger.debug("Working on %s" % input_file)
49        extracted_coverage_info = {}
50        capture_data = False
51        capture_complete = False
52        with open(input_file, 'r') as fp:
53            for line in fp.readlines():
54                if re.search("GCOV_COVERAGE_DUMP_START", line):
55                    capture_data = True
56                    continue
57                if re.search("GCOV_COVERAGE_DUMP_END", line):
58                    capture_complete = True
59                    break
60                # Loop until the coverage data is found.
61                if not capture_data:
62                    continue
63                if line.startswith("*"):
64                    sp = line.split("<")
65                    if len(sp) > 1:
66                        # Remove the leading delimiter "*"
67                        file_name = sp[0][1:]
68                        # Remove the trailing new line char
69                        hex_dump = sp[1][:-1]
70                    else:
71                        continue
72                else:
73                    continue
74                extracted_coverage_info.update({file_name: hex_dump})
75        if not capture_data:
76            capture_complete = True
77        return {'complete': capture_complete, 'data': extracted_coverage_info}
78
79    @staticmethod
80    def create_gcda_files(extracted_coverage_info):
81        gcda_created = True
82        logger.debug("Generating gcda files")
83        for filename, hexdump_val in extracted_coverage_info.items():
84            # if kobject_hash is given for coverage gcovr fails
85            # hence skipping it problem only in gcovr v4.1
86            if "kobject_hash" in filename:
87                filename = (filename[:-4]) + "gcno"
88                try:
89                    os.remove(filename)
90                except Exception:
91                    pass
92                continue
93
94            try:
95                with open(filename, 'wb') as fp:
96                    fp.write(bytes.fromhex(hexdump_val))
97            except ValueError:
98                logger.exception("Unable to convert hex data for file: {}".format(filename))
99                gcda_created = False
100            except FileNotFoundError:
101                logger.exception("Unable to create gcda file: {}".format(filename))
102                gcda_created = False
103        return gcda_created
104
105    def generate(self, outdir):
106        coverage_completed = True
107        for filename in glob.glob("%s/**/handler.log" % outdir, recursive=True):
108            gcov_data = self.__class__.retrieve_gcov_data(filename)
109            capture_complete = gcov_data['complete']
110            extracted_coverage_info = gcov_data['data']
111            if capture_complete:
112                gcda_created = self.__class__.create_gcda_files(extracted_coverage_info)
113                if gcda_created:
114                    logger.debug("Gcov data captured: {}".format(filename))
115                else:
116                    logger.error("Gcov data invalid for: {}".format(filename))
117                    coverage_completed = False
118            else:
119                logger.error("Gcov data capture incomplete: {}".format(filename))
120                coverage_completed = False
121
122        with open(os.path.join(outdir, "coverage.log"), "a") as coveragelog:
123            ret = self._generate(outdir, coveragelog)
124            if ret == 0:
125                report_log = {
126                    "html": "HTML report generated: {}".format(os.path.join(outdir, "coverage", "index.html")),
127                    "lcov": "LCOV report generated: {}".format(os.path.join(outdir, "coverage.info")),
128                    "xml": "XML report generated: {}".format(os.path.join(outdir, "coverage", "coverage.xml")),
129                    "csv": "CSV report generated: {}".format(os.path.join(outdir, "coverage", "coverage.csv")),
130                    "txt": "TXT report generated: {}".format(os.path.join(outdir, "coverage", "coverage.txt")),
131                    "coveralls": "Coveralls report generated: {}".format(os.path.join(outdir, "coverage", "coverage.coveralls.json")),
132                    "sonarqube": "Sonarqube report generated: {}".format(os.path.join(outdir, "coverage", "coverage.sonarqube.xml"))
133                }
134                for r in self.output_formats.split(','):
135                    logger.info(report_log[r])
136            else:
137                coverage_completed = False
138        logger.debug("All coverage data processed: {}".format(coverage_completed))
139        return coverage_completed
140
141
142class Lcov(CoverageTool):
143
144    def __init__(self, jobs=None):
145        super().__init__()
146        self.ignores = []
147        self.output_formats = "lcov,html"
148        self.version = self.get_version()
149        self.jobs = jobs
150
151    def get_version(self):
152        try:
153            result = subprocess.run(['lcov', '--version'],
154                                    stdout=subprocess.PIPE,
155                                    stderr=subprocess.PIPE,
156                                    text=True, check=True)
157            version_output = result.stdout.strip().replace('lcov: LCOV version ', '')
158            return version_output
159        except subprocess.CalledProcessError as e:
160            logger.error(f"Unable to determine lcov version: {e}")
161            sys.exit(1)
162        except FileNotFoundError as e:
163            logger.error(f"Unable to to find lcov tool: {e}")
164            sys.exit(1)
165
166    def add_ignore_file(self, pattern):
167        self.ignores.append('*' + pattern + '*')
168
169    def add_ignore_directory(self, pattern):
170        self.ignores.append('*/' + pattern + '/*')
171
172    @property
173    def is_lcov_v2(self):
174        return self.version.startswith("2")
175
176    def run_command(self, cmd, coveragelog):
177        if self.is_lcov_v2:
178            # The --ignore-errors source option is added for genhtml as well as
179            # lcov to avoid it exiting due to
180            # samples/application_development/external_lib/
181            cmd += [
182                "--ignore-errors", "inconsistent,inconsistent",
183                "--ignore-errors", "negative,negative",
184                "--ignore-errors", "unused,unused",
185                "--ignore-errors", "empty,empty",
186                "--ignore-errors", "mismatch,mismatch",
187            ]
188
189        cmd_str = " ".join(cmd)
190        logger.debug(f"Running {cmd_str}...")
191        return subprocess.call(cmd, stdout=coveragelog)
192
193    def run_lcov(self, args, coveragelog):
194        if self.is_lcov_v2:
195            branch_coverage = "branch_coverage=1"
196            if self.jobs is None:
197                # Default: --parallel=0 will autodetect appropriate parallelism
198                parallel = ["--parallel", "0"]
199            elif self.jobs == 1:
200                # Serial execution requested, don't parallelize at all
201                parallel = []
202            else:
203                parallel = ["--parallel", str(self.jobs)]
204        else:
205            branch_coverage = "lcov_branch_coverage=1"
206            parallel = []
207
208        cmd = [
209            "lcov", "--gcov-tool", self.gcov_tool,
210            "--rc", branch_coverage,
211        ] + parallel + args
212        return self.run_command(cmd, coveragelog)
213
214    def _generate(self, outdir, coveragelog):
215        coveragefile = os.path.join(outdir, "coverage.info")
216        ztestfile = os.path.join(outdir, "ztest.info")
217
218        cmd = ["--capture", "--directory", outdir, "--output-file", coveragefile]
219        self.run_lcov(cmd, coveragelog)
220
221        # We want to remove tests/* and tests/ztest/test/* but save tests/ztest
222        cmd = ["--extract", coveragefile,
223               os.path.join(self.base_dir, "tests", "ztest", "*"),
224               "--output-file", ztestfile]
225        self.run_lcov(cmd, coveragelog)
226
227        if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
228            cmd = ["--remove", ztestfile,
229                   os.path.join(self.base_dir, "tests/ztest/test/*"),
230                   "--output-file", ztestfile]
231            self.run_lcov(cmd, coveragelog)
232
233            files = [coveragefile, ztestfile]
234        else:
235            files = [coveragefile]
236
237        for i in self.ignores:
238            cmd = ["--remove", coveragefile, i, "--output-file", coveragefile]
239            self.run_lcov(cmd, coveragelog)
240
241        if 'html' not in self.output_formats.split(','):
242            return 0
243
244        cmd = ["genhtml", "--legend", "--branch-coverage",
245               "--prefix", self.base_dir,
246               "-output-directory", os.path.join(outdir, "coverage")] + files
247        return self.run_command(cmd, coveragelog)
248
249
250class Gcovr(CoverageTool):
251
252    def __init__(self):
253        super().__init__()
254        self.ignores = []
255        self.output_formats = "html"
256        self.version = self.get_version()
257
258    def get_version(self):
259        try:
260            result = subprocess.run(['gcovr', '--version'],
261                                    stdout=subprocess.PIPE,
262                                    stderr=subprocess.PIPE,
263                                    text=True, check=True)
264            version_lines = result.stdout.strip().split('\n')
265            if version_lines:
266                version_output = version_lines[0].replace('gcovr ', '')
267                return version_output
268        except subprocess.CalledProcessError as e:
269            logger.error(f"Unable to determine gcovr version: {e}")
270            sys.exit(1)
271        except FileNotFoundError as e:
272            logger.error(f"Unable to to find gcovr tool: {e}")
273            sys.exit(1)
274
275    def add_ignore_file(self, pattern):
276        self.ignores.append('.*' + pattern + '.*')
277
278    def add_ignore_directory(self, pattern):
279        self.ignores.append(".*/" + pattern + '/.*')
280
281    @staticmethod
282    def _interleave_list(prefix, list):
283        tuple_list = [(prefix, item) for item in list]
284        return [item for sublist in tuple_list for item in sublist]
285
286    @staticmethod
287    def _flatten_list(list):
288        return [a for b in list for a in b]
289
290    def _generate(self, outdir, coveragelog):
291        coveragefile = os.path.join(outdir, "coverage.json")
292        ztestfile = os.path.join(outdir, "ztest.json")
293
294        excludes = Gcovr._interleave_list("-e", self.ignores)
295
296        # Different ifdef-ed implementations of the same function should not be
297        # in conflict treated by GCOVR as separate objects for coverage statistics.
298        mode_options = ["--merge-mode-functions=separate"]
299
300        # We want to remove tests/* and tests/ztest/test/* but save tests/ztest
301        cmd = ["gcovr", "-r", self.base_dir,
302               "--gcov-ignore-parse-errors=negative_hits.warn_once_per_file",
303               "--gcov-executable", self.gcov_tool,
304               "-e", "tests/*"]
305        cmd += excludes + mode_options + ["--json", "-o", coveragefile, outdir]
306        cmd_str = " ".join(cmd)
307        logger.debug(f"Running {cmd_str}...")
308        subprocess.call(cmd, stdout=coveragelog)
309
310        subprocess.call(["gcovr", "-r", self.base_dir, "--gcov-executable",
311                         self.gcov_tool, "-f", "tests/ztest", "-e",
312                         "tests/ztest/test/*", "--json", "-o", ztestfile,
313                         outdir] + mode_options, stdout=coveragelog)
314
315        if os.path.exists(ztestfile) and os.path.getsize(ztestfile) > 0:
316            files = [coveragefile, ztestfile]
317        else:
318            files = [coveragefile]
319
320        subdir = os.path.join(outdir, "coverage")
321        os.makedirs(subdir, exist_ok=True)
322
323        tracefiles = self._interleave_list("--add-tracefile", files)
324
325        # Convert command line argument (comma-separated list) to gcovr flags
326        report_options = {
327            "html": ["--html", os.path.join(subdir, "index.html"), "--html-details"],
328            "xml": ["--xml", os.path.join(subdir, "coverage.xml"), "--xml-pretty"],
329            "csv": ["--csv", os.path.join(subdir, "coverage.csv")],
330            "txt": ["--txt", os.path.join(subdir, "coverage.txt")],
331            "coveralls": ["--coveralls", os.path.join(subdir, "coverage.coveralls.json"), "--coveralls-pretty"],
332            "sonarqube": ["--sonarqube", os.path.join(subdir, "coverage.sonarqube.xml")]
333        }
334        gcovr_options = self._flatten_list([report_options[r] for r in self.output_formats.split(',')])
335
336        return subprocess.call(["gcovr", "-r", self.base_dir] + mode_options + gcovr_options + tracefiles,
337                               stdout=coveragelog)
338
339
340
341def run_coverage(testplan, options):
342    use_system_gcov = False
343    gcov_tool = None
344
345    for plat in options.coverage_platform:
346        _plat = testplan.get_platform(plat)
347        if _plat and (_plat.type in {"native", "unit"}):
348            use_system_gcov = True
349    if not options.gcov_tool:
350        zephyr_sdk_gcov_tool = os.path.join(
351            os.environ.get("ZEPHYR_SDK_INSTALL_DIR", default=""),
352            "x86_64-zephyr-elf/bin/x86_64-zephyr-elf-gcov")
353        if os.environ.get("ZEPHYR_TOOLCHAIN_VARIANT") == "llvm":
354            llvm_path = os.environ.get("LLVM_TOOLCHAIN_PATH")
355            if llvm_path is not None:
356                llvm_path = os.path.join(llvm_path, "bin")
357            llvm_cov = shutil.which("llvm-cov", path=llvm_path)
358            llvm_cov_ext = pathlib.Path(llvm_cov).suffix
359            gcov_lnk = os.path.join(options.outdir, f"gcov{llvm_cov_ext}")
360            try:
361                os.symlink(llvm_cov, gcov_lnk)
362            except OSError:
363                shutil.copy(llvm_cov, gcov_lnk)
364            gcov_tool = gcov_lnk
365        elif use_system_gcov:
366            gcov_tool = "gcov"
367        elif os.path.exists(zephyr_sdk_gcov_tool):
368            gcov_tool = zephyr_sdk_gcov_tool
369        else:
370            logger.error(f"Can't find a suitable gcov tool. Use --gcov-tool or set ZEPHYR_SDK_INSTALL_DIR.")
371            sys.exit(1)
372    else:
373        gcov_tool = str(options.gcov_tool)
374
375    logger.info("Generating coverage files...")
376    logger.info(f"Using gcov tool: {gcov_tool}")
377    coverage_tool = CoverageTool.factory(options.coverage_tool, jobs=options.jobs)
378    coverage_tool.gcov_tool = gcov_tool
379    coverage_tool.base_dir = os.path.abspath(options.coverage_basedir)
380    # Apply output format default
381    if options.coverage_formats is not None:
382        coverage_tool.output_formats = options.coverage_formats
383    coverage_tool.add_ignore_file('generated')
384    coverage_tool.add_ignore_directory('tests')
385    coverage_tool.add_ignore_directory('samples')
386    coverage_completed = coverage_tool.generate(options.outdir)
387    return coverage_completed
388