1#
2# Copyright (c) 2010-2025 Antmicro
3#
4# This file is licensed under the MIT License.
5# Full license text is available in 'licenses/MIT.txt'.
6#
7
8import os
9import itertools
10import functools
11from collections import defaultdict
12from dataclasses import dataclass, astuple, field
13from typing import TYPE_CHECKING, BinaryIO, Dict, Generator, Iterable, List, Set, IO, Optional, Tuple
14from elftools.common.utils import bytes2str
15from elftools.elf.elffile import ELFFile
16
17if TYPE_CHECKING:
18    from execution_tracer_reader import TraceData
19    from elftools.elf.elffile import DWARFInfo
20
21@dataclass
22class AddressRange:
23    low: int
24    high: int
25
26    def __iter__(self):
27        return iter(astuple(self))
28
29
30class CodeLine:
31    def __init__(self, content: str, number: int, filename: str, is_exec: bool):
32        self.content = content
33        self.number = number
34        self.filename = filename
35        self.addresses: List[AddressRange] = []
36        self.is_exec = is_exec
37        self.address_counter = defaultdict(lambda: ExecutionCount())
38
39    def add_address(self, low: int, high: int):
40        # Try simply merge ranges if they are continuous.
41        # since they are incrementing, this check is enough
42        if self.addresses and self.addresses[-1].high == low:
43            self.addresses[-1].high = high
44        else:
45            self.addresses.append(AddressRange(low, high))
46
47    def count_execution(self, address) -> 'ExecutionCount':
48        self.address_counter[address].count_up()
49        return self.address_counter[address]
50
51    def most_executions(self) -> int:
52        if len(self.address_counter) == 0:
53            return 0
54        return max(count.count for count in self.address_counter.values())
55
56    def to_lcov_format(self) -> str:
57        return f"DA:{self.number},{self.most_executions()}"
58
59
60class Record:
61    def __init__(self, name: str):
62        self.name = name
63        self.lines: List[CodeLine] = []
64
65    def add_code_line(self, cl: CodeLine):
66        self.lines.append(cl)
67
68    def get_exec_lines(self) -> Generator[CodeLine, None, None]:
69        for line in self.lines:
70            if not line.is_exec:
71                continue
72            yield line
73
74    def to_lcov_format(self) -> Generator[str, None, None]:
75        yield "TN:"
76        yield f'SF:{self.name}'
77        yield from (l.to_lcov_format() for l in self.get_exec_lines())
78        yield 'end_of_record'
79
80
81class ExecutionCount:
82    def __init__(self):
83        self.count = 0
84
85    def count_up(self):
86        self.count += 1
87
88
89@dataclass(frozen=True)
90class PathSubstitution:
91    before: str
92    after: str
93
94    def apply(self, path: str) -> str:
95        return path.replace(self.before, self.after)
96
97    @classmethod
98    def from_arg(cls, s: str) -> 'Self':
99        args = s.split(':')
100        if len(args) != 2:
101            raise ValueError('Path substitution should be in old_path:new_path format')
102        return cls(*args)
103
104
105@dataclass
106class Coverage:
107    elf_file_handler: BinaryIO
108    code_filenames: List[str]
109    substitute_paths: List[PathSubstitution]
110    print_unmatched_address: bool = False
111    debug: bool = False
112    noisy: bool = False
113    lazy_line_cache: bool = False
114
115    _code_files: List[IO] = field(init=False)
116
117    def __post_init__(self):
118        if not self.code_filenames:
119            print("No sources provided, will attempt to discover automatically")
120            self._code_files = self._find_code_files(get_dwarf_info(self.elf_file_handler), self.substitute_paths)
121            self.code_filenames = [self._apply_path_substitutions(code_filename.name, self.substitute_paths) for code_filename in self._code_files]
122        else:
123            self._code_files = [open(file) for file in self.code_filenames]
124
125    @staticmethod
126    def _find_code_files(dwarf_info, substitute_paths: Iterable[PathSubstitution], verbose=True) -> List[IO]:
127        unique_files: Set[str] = set()
128        code_files: List[IO] = []
129        if verbose:
130            print('Attempting to resolve source files by scanning DWARF data...')
131        for file_name, _, _, _, directory in get_addresses(dwarf_info):
132            absolute_path = os.path.join(directory, file_name)
133            unique_files.add(absolute_path)
134
135        for code_filename in unique_files:
136            code_filename = Coverage._apply_path_substitutions(code_filename, substitute_paths)
137            if verbose:
138                print('Found source:', code_filename)
139            try:
140                code_files.append(open(code_filename))
141            except FileNotFoundError:
142                print('Source file not found:', code_filename)
143        return code_files
144
145    @staticmethod
146    def _apply_path_substitutions(code_filename: str, substitute_paths: Iterable[PathSubstitution]) -> str:
147        return functools.reduce(lambda p, sub: sub.apply(p), substitute_paths, code_filename)
148
149    def _approx_file_match(self, file_name: str) -> Optional[str]:
150        for file in self.code_filenames:
151            if file_name == file:
152                return file_name
153        for file in self.code_filenames:
154            if os.path.basename(file) == os.path.basename(file_name):
155                return file
156        return None
157
158    def _build_addr_map(self, code_lines_with_address: List[CodeLine], pc_length: int) -> Dict[bytes, ExecutionCount]:
159        # This is a dictionary of references, that will be used to quickly update counters for each code line.
160        # We can walk only once over all code lines and pre-generate mappings between PCs (addresses) and code lines.
161        # This way, when we'll later parse the trace, all we do are quick look-ups into this dictionary to increment the execution counters.
162        address_count_cache: Dict[bytes, ExecutionCount] = {}
163
164        for line in code_lines_with_address:
165            for addr in line.addresses:
166                addr_lo = addr.low
167                while addr_lo < addr.high:
168                    if not addr_lo in address_count_cache:
169                        address_count_cache[addr_lo.to_bytes(pc_length, byteorder='little', signed=False)] = line.address_counter[addr_lo]
170                    else:
171                        print(f'Address {addr_lo} is already mapped to another line. Ignoring this time')
172                    # This is a naive approach. If memory usage is of greater concern, find a better way to store ranges
173                    addr_lo += 1
174        return address_count_cache
175
176    # Get list of code lines, grouped by the file where they belong
177    # Result is a tuple: lowest address in the binary, highest address in the binary, and a dictionary of code lines
178    def _get_code_lines_by_file(self, dwarf_info: 'DWARFInfo') -> Tuple[int, int, Dict[str, List[CodeLine]]]:
179        code_lines: Dict[str, List[CodeLine]] = defaultdict(list)
180        for code_file in self._code_files:
181            for no, line in enumerate(code_file):
182                # Right now we mark all code lines as non-executable (that is, ignore when calculating coverage)
183                # later, when parsing DWARF info, some will be marked as executable
184                code_lines[code_file.name].append(CodeLine(line, no + 1, code_file.name, False))
185
186        # The lowest and highest interesting (corresponding to our sources' files) addresses, respectively
187        files_low_address = None
188        files_high_address = 0
189        for file_name, line_number, address_low, address_high, file_path in get_addresses(dwarf_info, debug=self.debug, noisy=self.noisy):
190            file_full_name = os.path.join(file_path, file_name)
191            file_full_name = self._apply_path_substitutions(file_full_name, self.substitute_paths)
192            # If the files are provided by hand, patch their names
193            if (file_full_name := self._approx_file_match(file_full_name)) is None:
194                continue
195            if line_number > len(code_lines[file_full_name]):
196                raise ValueError(
197                    f"Unexpected line number ({line_number}) in ELF file, file with code {file_full_name} contains only {len(code_lines[file_full_name])}"
198                )
199            if file_full_name in code_lines:
200                if self.debug:
201                    print(f'file: {file_full_name}, line {line_number}, addr_low 0x{address_low:x}, addr_high: 0x{address_high:x}')
202                code_lines[file_full_name][line_number - 1].add_address(address_low, address_high)
203                code_lines[file_full_name][line_number - 1].is_exec = True
204
205            if files_low_address is None:
206                files_low_address = address_low
207                files_high_address = address_high
208            else:
209                files_low_address = min(files_low_address, address_low)
210                files_high_address = max(files_high_address, address_high)
211
212        if files_low_address is None:
213            raise RuntimeError("No matching address for provided files found")
214
215        return files_low_address, files_high_address, code_lines
216
217    def report_coverage(self, trace_data: 'TraceData') -> Iterable[CodeLine]:
218        if not trace_data.has_pc:
219            raise ValueError("The trace data doesn't contain PCs.")
220
221        dwarf_info = get_dwarf_info(self.elf_file_handler)
222        files_low_address, files_high_address, code_lines = self._get_code_lines_by_file(dwarf_info)
223
224        # Note, that this is just a cache to `code_lines`
225        # but after eliminating lines that don't correspond to any address
226        code_lines_with_address: List[CodeLine] = []
227        for file_name in code_lines.keys():
228            code_lines_with_address.extend(line for line in code_lines[file_name] if line.addresses)
229
230        # This is also a cache to ExecutionCount
231        address_count_cache: Dict[bytes, ExecutionCount] = {}
232        unmatched_address: Set[int] = set()
233
234        if not self.lazy_line_cache:
235            print('Populating address cache...')
236            address_count_cache = self._build_addr_map(code_lines_with_address, trace_data.pc_length)
237
238        # This step takes some time for large traces and codebases, let's advise the user to wait
239        print('Processing the trace, please wait...')
240        for address_bytes, _, _, _ in trace_data:
241            if address_bytes in address_count_cache:
242                address_count_cache[address_bytes].count_up()
243            else:
244                address = int.from_bytes(address_bytes, byteorder="little", signed=False)
245                if address in unmatched_address:
246                    # Is marked as unmatched, short cut!
247                    # For unmatched address, a walk over all lines is very slow, since we need to check the entire map each time
248                    # So make sure that we mark the address as "unmatched" on the first try, and don't care about it later on
249                    continue
250                # Optimization: cut-off addresses from trace that for sure don't matter to us
251                if not (files_low_address <= address < files_high_address):
252                    unmatched_address.add(address)
253                    continue
254                if self.debug and self.noisy:
255                    print(f'parsing new addr in trace: {address:x}')
256                # Find a line, for which one of the addresses matches with the address bytes present in the trace
257                # If we pre-generated the mappings earlier (in `_build_addr_map`), this function will only serve as a back-up to find not matched addresses
258                # Walking each time is slow, so it's generally better to pre-generate mappings, if we aren't running out of memory
259                for line in code_lines_with_address:
260                    if any(
261                        # Check for all address ranges
262                        address_range.low <= address < address_range.high
263                        for address_range in line.addresses
264                    ):
265                        # One line is likely to exist at several addresses
266                        address_count_cache[address_bytes] = line.count_execution(
267                            address_bytes
268                        )
269                        break
270                if address_bytes not in address_count_cache:
271                    unmatched_address.add(address)
272
273        if self.print_unmatched_address:
274            print(f'Found {len(unmatched_address)} unmatched unique addresses')
275            print('Addresses in trace not matching any sources:', ', '.join(f'0x{addr:X}' for addr in sorted(unmatched_address)))
276
277        return itertools.chain.from_iterable(code_lines.values())
278
279    def convert_to_lcov(self, code_lines: Iterable[CodeLine]) -> Generator[str, None, None]:
280        records: Dict[str, Record] = {}
281        for code_file in self._code_files:
282            records[code_file.name] = Record(code_file.name)
283
284        for line in code_lines:
285            records[line.filename].add_code_line(line)
286
287        for record in records.values():
288            yield from record.to_lcov_format()
289
290def get_dwarf_info(elf_file_handler: BinaryIO) -> 'DWARFInfo':
291    elf_file = ELFFile(elf_file_handler)
292    if not elf_file.has_dwarf_info():
293        raise ValueError(
294            f"The file ({elf_file_handler.name}) doesn't contain DWARF data."
295        )
296    return elf_file.get_dwarf_info()
297
298def get_addresses(dwarf_info, *, debug=False, noisy=False):
299    # Go over all the line programs in the DWARF information, looking for
300    # one that describes the given address.
301    for CU in dwarf_info.iter_CUs():
302        yield from get_addresses_for_CU(dwarf_info, CU, debug=debug, noisy=noisy)
303
304def get_addresses_for_CU(dwarf_info, CU, *, debug=False, noisy=False):
305    # First, look at line programs to find the file/line for the address
306    line_program = dwarf_info.line_program_for_CU(CU)
307    delta = 1 if line_program.header.version < 5 else 0
308    previous_state = None
309    for entry in line_program.get_entries():
310        # We're interested in those entries where a new state is assigned
311        if entry.state is None:
312            continue
313        # We need to check if an address changes in the state machine
314        # otherwise, the mapping isn't really valid
315        if previous_state and previous_state.address < entry.state.address:
316            filename = bytes2str(
317                line_program["file_entry"][previous_state.file - delta].name
318            )
319            # Adapted from https://github.com/eliben/pyelftools/blob/3181eedfffc8eaea874152fb67f77d0be4ca969e/examples/dwarf_lineprogram_filenames.py#L71
320            dir_index = line_program["file_entry"][previous_state.file - delta]["dir_index"] - delta
321            directory_path = bytes2str(
322                line_program["include_directory"][dir_index]
323            )
324            if debug and noisy:
325                print('Parsing:', directory_path, filename)
326            yield filename, previous_state.line, previous_state.address, entry.state.address, directory_path
327        if entry.state.end_sequence:
328            # For the state with `end_sequence`, `address` means the address
329            # of the first byte after the target machine instruction
330            # sequence and other information is meaningless. We clear
331            # prevstate so that it's not used in the next iteration. Address
332            # info is used in the above comparison to see if we need to use
333            # the line information for the prevstate.
334            previous_state = None
335        else:
336            previous_state = entry.state
337