1# 2# Copyright (c) 2010-2025 Antmicro 3# 4# This file is licensed under the MIT License. 5# Full license text is available in 'licenses/MIT.txt'. 6# 7 8import os 9import itertools 10import functools 11from collections import defaultdict 12from dataclasses import dataclass, astuple, field 13from typing import TYPE_CHECKING, BinaryIO, Dict, Generator, Iterable, List, Set, IO, Optional, Tuple 14from elftools.common.utils import bytes2str 15from elftools.elf.elffile import ELFFile 16 17if TYPE_CHECKING: 18 from execution_tracer_reader import TraceData 19 from elftools.elf.elffile import DWARFInfo 20 21@dataclass 22class AddressRange: 23 low: int 24 high: int 25 26 def __iter__(self): 27 return iter(astuple(self)) 28 29 30class CodeLine: 31 def __init__(self, content: str, number: int, filename: str, is_exec: bool): 32 self.content = content 33 self.number = number 34 self.filename = filename 35 self.addresses: List[AddressRange] = [] 36 self.is_exec = is_exec 37 self.address_counter = defaultdict(lambda: ExecutionCount()) 38 39 def add_address(self, low: int, high: int): 40 # Try simply merge ranges if they are continuous. 41 # since they are incrementing, this check is enough 42 if self.addresses and self.addresses[-1].high == low: 43 self.addresses[-1].high = high 44 else: 45 self.addresses.append(AddressRange(low, high)) 46 47 def count_execution(self, address) -> 'ExecutionCount': 48 self.address_counter[address].count_up() 49 return self.address_counter[address] 50 51 def most_executions(self) -> int: 52 if len(self.address_counter) == 0: 53 return 0 54 return max(count.count for count in self.address_counter.values()) 55 56 def to_lcov_format(self) -> str: 57 return f"DA:{self.number},{self.most_executions()}" 58 59 60class Record: 61 def __init__(self, name: str): 62 self.name = name 63 self.lines: List[CodeLine] = [] 64 65 def add_code_line(self, cl: CodeLine): 66 self.lines.append(cl) 67 68 def get_exec_lines(self) -> Generator[CodeLine, None, None]: 69 for line in self.lines: 70 if not line.is_exec: 71 continue 72 yield line 73 74 def to_lcov_format(self) -> Generator[str, None, None]: 75 yield "TN:" 76 yield f'SF:{self.name}' 77 yield from (l.to_lcov_format() for l in self.get_exec_lines()) 78 yield 'end_of_record' 79 80 81class ExecutionCount: 82 def __init__(self): 83 self.count = 0 84 85 def count_up(self): 86 self.count += 1 87 88 89@dataclass(frozen=True) 90class PathSubstitution: 91 before: str 92 after: str 93 94 def apply(self, path: str) -> str: 95 return path.replace(self.before, self.after) 96 97 @classmethod 98 def from_arg(cls, s: str) -> 'Self': 99 args = s.split(':') 100 if len(args) != 2: 101 raise ValueError('Path substitution should be in old_path:new_path format') 102 return cls(*args) 103 104 105@dataclass 106class Coverage: 107 elf_file_handler: BinaryIO 108 code_filenames: List[str] 109 substitute_paths: List[PathSubstitution] 110 print_unmatched_address: bool = False 111 debug: bool = False 112 noisy: bool = False 113 lazy_line_cache: bool = False 114 115 _code_files: List[IO] = field(init=False) 116 117 def __post_init__(self): 118 if not self.code_filenames: 119 print("No sources provided, will attempt to discover automatically") 120 self._code_files = self._find_code_files(get_dwarf_info(self.elf_file_handler), self.substitute_paths) 121 self.code_filenames = [self._apply_path_substitutions(code_filename.name, self.substitute_paths) for code_filename in self._code_files] 122 else: 123 self._code_files = [open(file) for file in self.code_filenames] 124 125 @staticmethod 126 def _find_code_files(dwarf_info, substitute_paths: Iterable[PathSubstitution], verbose=True) -> List[IO]: 127 unique_files: Set[str] = set() 128 code_files: List[IO] = [] 129 if verbose: 130 print('Attempting to resolve source files by scanning DWARF data...') 131 for file_name, _, _, _, directory in get_addresses(dwarf_info): 132 absolute_path = os.path.join(directory, file_name) 133 unique_files.add(absolute_path) 134 135 for code_filename in unique_files: 136 code_filename = Coverage._apply_path_substitutions(code_filename, substitute_paths) 137 if verbose: 138 print('Found source:', code_filename) 139 try: 140 code_files.append(open(code_filename)) 141 except FileNotFoundError: 142 print('Source file not found:', code_filename) 143 return code_files 144 145 @staticmethod 146 def _apply_path_substitutions(code_filename: str, substitute_paths: Iterable[PathSubstitution]) -> str: 147 return functools.reduce(lambda p, sub: sub.apply(p), substitute_paths, code_filename) 148 149 def _approx_file_match(self, file_name: str) -> Optional[str]: 150 for file in self.code_filenames: 151 if file_name == file: 152 return file_name 153 for file in self.code_filenames: 154 if os.path.basename(file) == os.path.basename(file_name): 155 return file 156 return None 157 158 def _build_addr_map(self, code_lines_with_address: List[CodeLine], pc_length: int) -> Dict[bytes, ExecutionCount]: 159 # This is a dictionary of references, that will be used to quickly update counters for each code line. 160 # We can walk only once over all code lines and pre-generate mappings between PCs (addresses) and code lines. 161 # This way, when we'll later parse the trace, all we do are quick look-ups into this dictionary to increment the execution counters. 162 address_count_cache: Dict[bytes, ExecutionCount] = {} 163 164 for line in code_lines_with_address: 165 for addr in line.addresses: 166 addr_lo = addr.low 167 while addr_lo < addr.high: 168 if not addr_lo in address_count_cache: 169 address_count_cache[addr_lo.to_bytes(pc_length, byteorder='little', signed=False)] = line.address_counter[addr_lo] 170 else: 171 print(f'Address {addr_lo} is already mapped to another line. Ignoring this time') 172 # This is a naive approach. If memory usage is of greater concern, find a better way to store ranges 173 addr_lo += 1 174 return address_count_cache 175 176 # Get list of code lines, grouped by the file where they belong 177 # Result is a tuple: lowest address in the binary, highest address in the binary, and a dictionary of code lines 178 def _get_code_lines_by_file(self, dwarf_info: 'DWARFInfo') -> Tuple[int, int, Dict[str, List[CodeLine]]]: 179 code_lines: Dict[str, List[CodeLine]] = defaultdict(list) 180 for code_file in self._code_files: 181 for no, line in enumerate(code_file): 182 # Right now we mark all code lines as non-executable (that is, ignore when calculating coverage) 183 # later, when parsing DWARF info, some will be marked as executable 184 code_lines[code_file.name].append(CodeLine(line, no + 1, code_file.name, False)) 185 186 # The lowest and highest interesting (corresponding to our sources' files) addresses, respectively 187 files_low_address = None 188 files_high_address = 0 189 for file_name, line_number, address_low, address_high, file_path in get_addresses(dwarf_info, debug=self.debug, noisy=self.noisy): 190 file_full_name = os.path.join(file_path, file_name) 191 file_full_name = self._apply_path_substitutions(file_full_name, self.substitute_paths) 192 # If the files are provided by hand, patch their names 193 if (file_full_name := self._approx_file_match(file_full_name)) is None: 194 continue 195 if line_number > len(code_lines[file_full_name]): 196 raise ValueError( 197 f"Unexpected line number ({line_number}) in ELF file, file with code {file_full_name} contains only {len(code_lines[file_full_name])}" 198 ) 199 if file_full_name in code_lines: 200 if self.debug: 201 print(f'file: {file_full_name}, line {line_number}, addr_low 0x{address_low:x}, addr_high: 0x{address_high:x}') 202 code_lines[file_full_name][line_number - 1].add_address(address_low, address_high) 203 code_lines[file_full_name][line_number - 1].is_exec = True 204 205 if files_low_address is None: 206 files_low_address = address_low 207 files_high_address = address_high 208 else: 209 files_low_address = min(files_low_address, address_low) 210 files_high_address = max(files_high_address, address_high) 211 212 if files_low_address is None: 213 raise RuntimeError("No matching address for provided files found") 214 215 return files_low_address, files_high_address, code_lines 216 217 def report_coverage(self, trace_data: 'TraceData') -> Iterable[CodeLine]: 218 if not trace_data.has_pc: 219 raise ValueError("The trace data doesn't contain PCs.") 220 221 dwarf_info = get_dwarf_info(self.elf_file_handler) 222 files_low_address, files_high_address, code_lines = self._get_code_lines_by_file(dwarf_info) 223 224 # Note, that this is just a cache to `code_lines` 225 # but after eliminating lines that don't correspond to any address 226 code_lines_with_address: List[CodeLine] = [] 227 for file_name in code_lines.keys(): 228 code_lines_with_address.extend(line for line in code_lines[file_name] if line.addresses) 229 230 # This is also a cache to ExecutionCount 231 address_count_cache: Dict[bytes, ExecutionCount] = {} 232 unmatched_address: Set[int] = set() 233 234 if not self.lazy_line_cache: 235 print('Populating address cache...') 236 address_count_cache = self._build_addr_map(code_lines_with_address, trace_data.pc_length) 237 238 # This step takes some time for large traces and codebases, let's advise the user to wait 239 print('Processing the trace, please wait...') 240 for address_bytes, _, _, _ in trace_data: 241 if address_bytes in address_count_cache: 242 address_count_cache[address_bytes].count_up() 243 else: 244 address = int.from_bytes(address_bytes, byteorder="little", signed=False) 245 if address in unmatched_address: 246 # Is marked as unmatched, short cut! 247 # For unmatched address, a walk over all lines is very slow, since we need to check the entire map each time 248 # So make sure that we mark the address as "unmatched" on the first try, and don't care about it later on 249 continue 250 # Optimization: cut-off addresses from trace that for sure don't matter to us 251 if not (files_low_address <= address < files_high_address): 252 unmatched_address.add(address) 253 continue 254 if self.debug and self.noisy: 255 print(f'parsing new addr in trace: {address:x}') 256 # Find a line, for which one of the addresses matches with the address bytes present in the trace 257 # If we pre-generated the mappings earlier (in `_build_addr_map`), this function will only serve as a back-up to find not matched addresses 258 # Walking each time is slow, so it's generally better to pre-generate mappings, if we aren't running out of memory 259 for line in code_lines_with_address: 260 if any( 261 # Check for all address ranges 262 address_range.low <= address < address_range.high 263 for address_range in line.addresses 264 ): 265 # One line is likely to exist at several addresses 266 address_count_cache[address_bytes] = line.count_execution( 267 address_bytes 268 ) 269 break 270 if address_bytes not in address_count_cache: 271 unmatched_address.add(address) 272 273 if self.print_unmatched_address: 274 print(f'Found {len(unmatched_address)} unmatched unique addresses') 275 print('Addresses in trace not matching any sources:', ', '.join(f'0x{addr:X}' for addr in sorted(unmatched_address))) 276 277 return itertools.chain.from_iterable(code_lines.values()) 278 279 def convert_to_lcov(self, code_lines: Iterable[CodeLine]) -> Generator[str, None, None]: 280 records: Dict[str, Record] = {} 281 for code_file in self._code_files: 282 records[code_file.name] = Record(code_file.name) 283 284 for line in code_lines: 285 records[line.filename].add_code_line(line) 286 287 for record in records.values(): 288 yield from record.to_lcov_format() 289 290def get_dwarf_info(elf_file_handler: BinaryIO) -> 'DWARFInfo': 291 elf_file = ELFFile(elf_file_handler) 292 if not elf_file.has_dwarf_info(): 293 raise ValueError( 294 f"The file ({elf_file_handler.name}) doesn't contain DWARF data." 295 ) 296 return elf_file.get_dwarf_info() 297 298def get_addresses(dwarf_info, *, debug=False, noisy=False): 299 # Go over all the line programs in the DWARF information, looking for 300 # one that describes the given address. 301 for CU in dwarf_info.iter_CUs(): 302 yield from get_addresses_for_CU(dwarf_info, CU, debug=debug, noisy=noisy) 303 304def get_addresses_for_CU(dwarf_info, CU, *, debug=False, noisy=False): 305 # First, look at line programs to find the file/line for the address 306 line_program = dwarf_info.line_program_for_CU(CU) 307 delta = 1 if line_program.header.version < 5 else 0 308 previous_state = None 309 for entry in line_program.get_entries(): 310 # We're interested in those entries where a new state is assigned 311 if entry.state is None: 312 continue 313 # We need to check if an address changes in the state machine 314 # otherwise, the mapping isn't really valid 315 if previous_state and previous_state.address < entry.state.address: 316 filename = bytes2str( 317 line_program["file_entry"][previous_state.file - delta].name 318 ) 319 # Adapted from https://github.com/eliben/pyelftools/blob/3181eedfffc8eaea874152fb67f77d0be4ca969e/examples/dwarf_lineprogram_filenames.py#L71 320 dir_index = line_program["file_entry"][previous_state.file - delta]["dir_index"] - delta 321 directory_path = bytes2str( 322 line_program["include_directory"][dir_index] 323 ) 324 if debug and noisy: 325 print('Parsing:', directory_path, filename) 326 yield filename, previous_state.line, previous_state.address, entry.state.address, directory_path 327 if entry.state.end_sequence: 328 # For the state with `end_sequence`, `address` means the address 329 # of the first byte after the target machine instruction 330 # sequence and other information is meaningless. We clear 331 # prevstate so that it's not used in the next iteration. Address 332 # info is used in the above comparison to see if we need to use 333 # the line information for the prevstate. 334 previous_state = None 335 else: 336 previous_state = entry.state 337