#!/usr/bin/env python3 # # Copyright (c) 2010-2023 Antmicro # # This file is licensed under the MIT License. # Full license text is available in 'licenses/MIT.txt'. # import asyncio import argparse import pexpect import psutil import re import telnetlib import difflib from time import time from os import path from typing import Any, Optional, Callable, Awaitable RENODE_GDB_PORT = 2222 RENODE_TELNET_PORT = 12348 RE_HEX = re.compile(r"0x[0-9A-Fa-f]+") RE_VEC_REGNAME = re.compile(r"v\d+") RE_FLOAT_REGNAME = re.compile(r"f[tsa]\d+") RE_GDB_ERRORS = ( re.compile(r"\bUndefined .*?\.", re.MULTILINE), re.compile(r"\bThe \"remote\" target does not support \".*?\"\.", re.MULTILINE), re.compile(r"\bNo symbol \".*?\".*?\.", re.MULTILINE), re.compile(r"\bCannot .*$", re.MULTILINE), re.compile(r"\bRemote communication error\..*$", re.MULTILINE), re.compile(r"\bRemote connection closed", re.MULTILINE), re.compile(r"\bThe program has no registers.*?\.", re.MULTILINE), re.compile(r"\bThe program is not being run.*?\.", re.MULTILINE), re.compile(r"\b.*: cannot resolve name.*$", re.MULTILINE), re.compile(r"\b.*: no such file or directory\.", re.MULTILINE), re.compile(r"\bArgument required.*?\.", re.MULTILINE), re.compile(r'\b.*: .*(not in executable format|file format not recognized)', re.MULTILINE), re.compile(r"\bNo symbol table is loaded\.", re.MULTILINE), ) parser = argparse.ArgumentParser( description="Compare Renode execution with hardware/other simulator state using GDB") cmp_parser = parser.add_mutually_exclusive_group(required=True) cmp_parser.add_argument("-c", "--gdb-command", dest="command", default=None, help="GDB command to run on both instances after each instruction. Outputs of these commands are compared against each other.") cmp_parser.add_argument("-R", "--register-list", dest="registers", action="store", default=None, help="Sequence of register names to compare. Formated as ';' separated list of register names, e.g. 'pc;ra'") parser.add_argument("-r", "--reference-command", dest="reference_command", action="store", required=True, help="Command used to run the GDB server provider used as a reference") parser.add_argument("-s", "--renode-script", dest="renode_script", action="store", required=True, help="Path to the '.resc' script") parser.add_argument("-p", "--reference-gdb-port", type=int, dest="reference_gdb_port", action="store", required=True, help="Port on which the reference GDB server can be reached") parser.add_argument("--renode-gdb-port", type=int, dest="renode_gdb_port", action="store", default=RENODE_GDB_PORT, help="Port on which Renode will comunicate with GDB server") parser.add_argument("-P", "--renode-telnet-port", type=int, dest="renode_telnet_port", action="store", default=RENODE_TELNET_PORT, help="Port on which Renode will comunicate with telnet") parser.add_argument("-b", "--binary", dest="debug_binary", action="store", required=True, help="Path to ELF file with symbols") parser.add_argument("-x", "--renode-path", dest="renode_path", action="store", default="renode", help="Path to the Renode runscript") parser.add_argument("-g", "--gdb-path", dest="gdb_path", action="store", default="/usr/bin/gdb", help="Path to the GDB binary to be run") parser.add_argument("-f", "--start-frame", dest="start_frame", action="store", default=None, help="Sequence of jumps to reach target frame. Formated as 'addr, occurrence', separated with ';', e.g. '_start,1;printf,7'") parser.add_argument("-i", "--interest-points", dest="ips", action="store", default=None, help="Sequence of address, interest points, after which state will be compared. Formatted as ';' spearated list of hexadecimal addresses, e.g. '0x8000;0x340eba3c'") parser.add_argument("-S", "--stop-address", dest="stop_address", action="store", default=None, help="Stop condition, if reached script will stop") SECTION_SEPARATOR = "==================================================" # A stack is a list of (address, nth_occurrence) tuples. # `address` is a PC value, as a hex string, e.g. "0x00f24710". # `nth_occurrence` is the number of times `address` was reached since the start of execution. # Therefore, assuming deterministic runtime, an arbitrary program state can be reached # by setting a breakpoint at `address` and continuing `nth_occurrence` times. Stack = list[tuple[str, int]] class Renode: """A class for communicating with a remote instance of Renode.""" def __init__(self, binary: str, port: int): """Spawns a new instance of Renode and connects to it through Telnet.""" print(f"* Starting Renode instance on telnet port {port}") # making sure there is only one instance of Renode on this port for p in psutil.process_iter(): process_name = p.name().casefold() if "renode" in process_name and str(port) in process_name: print("!!! Found another instance of Renode running on the same port. Killing it before proceeding") p.kill() try: self.proc = pexpect.spawn(f"{binary} --disable-gui --plain --port {port}", timeout=20) self.proc.stripcr = True self.proc.expect("Monitor available in telnet mode on port") except pexpect.exceptions.EOF as err: print("!!! Renode failed to start telnet server! (is --renode-path correct? is --renode-telnet-port available?)") raise err self.connection = telnetlib.Telnet("localhost", port) # Sometimes first command does not work, hence we send this dummy one to make sure we got functional connection right after initialization self.command("echo 'Connected to GDB comparator'") def close(self) -> None: """Closes the underlying Renode instance.""" self.command("quit", expected_log="Disposed") def command(self, input: str, expected_log: str = "") -> None: """Sends an arbitrary command to the underlying Renode instance.""" if not self.proc.isalive(): print("!!! Renode has died!") print("Process:") print(str(self.proc)) raise RuntimeError input = input + "\n" self.connection.write(input.encode()) if expected_log != "": try: self.proc.expect([expected_log.encode()]) except pexpect.TIMEOUT as err: print(SECTION_SEPARATOR) print(f"Renode command '{input.strip()}' failed!") print(f"Expected regex '{expected_log}' was not found") print("Process:") print(str(self.proc)) print(SECTION_SEPARATOR) print(f"{err=} ; {type(err)=}") exit(1) def get_output(self) -> bytes: """Reads all output from the Telnet connection.""" return self.connection.read_all() class GDBInstance: """A class for controlling a remote GDB instance.""" def __init__(self, gdb_binary: str, port: int, debug_binary: str, name: str, target_process: pexpect.spawn): """Spawns a new GDB instance and connects to it.""" self.dimensions = (0, 4096) self.name = name self.last_cmd = "" self.last_output = "" self.task: Awaitable[Any] self.target_process = target_process print(f"* Connecting {self.name} GDB instance to target on port {port}") self.process = pexpect.spawn(f"{gdb_binary} --silent --nx --nh", timeout=10, dimensions=self.dimensions) self.process.timeout = 120 self.run_command("clear", async_=False) self.run_command("set pagination off", async_=False) self.run_command(f"file {debug_binary}", async_=False) self.run_command(f"target remote :{port}", async_=False) def close(self) -> None: """Closes the underlying GDB instance.""" self.run_command("quit", dont_wait_for_output=True, async_=False) def progress_by(self, delta: int, type: str = "stepi") -> None: """Steps `delta` times.""" adjusted_timeout = max(120, int(delta) / 5) self.run_command(type + (f" {delta}" if int(delta) > 1 else ""), timeout=adjusted_timeout) def get_symbol_at(self, addr: str) -> str: """Returns the name of the symbol which is stored at `addr` (`info symbol`).""" self.run_command(f"info symbol {addr}", async_=False) return self.last_output.splitlines()[-1] def delete_breakpoints(self) -> None: """Deletes all breakpoints.""" self.run_command("clear", async_=False) def run_command(self, command: str, timeout: float = 10, confirm: bool = False, dont_wait_for_output: bool = False, async_: bool = True) -> None: """Send an arbitrary command to the underlying GDB instance.""" if not self.process.isalive(): print(f"!!! The {self.name} GDB process has died!") print("Process:") print(str(self.process)) self.last_output = "" raise RuntimeError if not self.target_process.isalive(): print(f"!!! {self.name} GDB's target has died!") print("Target process:") print(str(self.target_process)) self.last_output = "" raise RuntimeError self.last_cmd = command self.process.write(command + "\n") if dont_wait_for_output: return try: if not confirm: result = self.process.expect(re.escape(command) + r".+\n", timeout, async_=async_) self.task = result if async_ else None if not async_: self.last_output = "" line = self.process.match[0].decode().strip("\r") while "(gdb)" not in line: self.last_output += line self.process.expect([r".+\n", r"\(gdb\)"], timeout) line = self.process.match[0].decode().strip("\r") self.validate_response(self.last_output) else: self.process.expect("[(]y or n[)]") self.process.writelines("y") result = self.process.expect("[(]gdb[)]", async_=async_) self.task = result if async_ else None self.last_output = self.process.match[0].decode().strip("\r") except pexpect.TIMEOUT as err: print(f"!!! {self.name} GDB: Command '{command}' timed out!") print("Process:") print(str(self.process)) self.last_output = "" raise err except pexpect.exceptions.EOF as err: print(f"!!! {self.name} GDB: pexpect encountered an unexpected EOF (is --gdb-path correct?)") print("Process:") print(str(self.process)) self.last_output = "" raise err def print_stack(self, stack: Stack) -> None: """Prints a stack.""" print("Address\t\tOccurrence\t\tSymbol") for address, occurrence in stack: print(f"{address}\t{occurrence}\t{self.get_symbol_at(address)}") def validate_response(self, response: str) -> None: """Scans a GDB response for common error messages.""" for regex in RE_GDB_ERRORS: err_match = regex.search(response) if err_match is not None: print(f"!!! {self.name} GDB: {err_match[0].strip()} (last command: \"{self.last_cmd}\")") # Assuming we correctly identified a GDB error, this would be # the right place to terminate execution. However, there is # a risk of a false positive, so it's safer not to (if it is # a critical error, it will most likely cause a timeout anyway). async def get_pc(self) -> str: """Returns the value of the PC register, as a hex string.""" self.run_command("i r pc") await self.expect() pc_match = RE_HEX.search(self.last_output) if pc_match is not None: return pc_match[0] else: raise TypeError async def expect(self, timeout: float = 10) -> None: """Await execution of the last command to finish and update `self.last_output`.""" try: await self.task line = self.process.match[0].decode().strip("\r") self.last_output = "" while "(gdb)" not in line: self.last_output += line self.task = self.process.expect([r".+\n", r"\(gdb\)"], timeout, async_=True) await self.task line = self.process.match[0].decode().strip("\r") self.validate_response(self.last_output) except pexpect.TIMEOUT as err: print(f"!!! {self.name} GDB: Command '{self.last_cmd}' timed out!") print("Process:") print(str(self.process)) self.last_output = "" raise err class GDBComparator: """A helper class to aggregate control over 2 `GDBInstance` objects.""" COMMAND_NAME = "gdb_compare__print_registers" COMMANDS = None # REGISTER_CASES is an ordered list of (condition_func, cmd_builder_func) tuples. # It is used to assign registers to groups based on their type, and for each such group # have a dedicated function that constructs a gdb command to pretty-print those registers. # Each tuple in REGISTER_CASES represents a group of registers. condition_func is used to # determine whether a register belongs to the group. cmd_builder_func intakes a list of # registers belonging to the group and returns a GDB command to print all their values. # The order of tuples matters - only the first match is used. RegNameTester = Callable[[str], bool] # condition_func type CommandsBuilder = Callable[[list[str]], list[str]] # cmd_builder_func type REGISTER_CASES: list[tuple[RegNameTester, CommandsBuilder]] = [ (lambda reg: RE_VEC_REGNAME.fullmatch(reg) is not None, lambda regs: [f"p/x (char[])${reg}.b" for reg in regs]), (lambda reg: RE_FLOAT_REGNAME.fullmatch(reg) is not None, lambda regs: [f"p/x (char[])${reg}" for reg in regs]), (lambda _: True, lambda regs: ["printf \"" + ": 0x%x\\n".join(regs) + ": 0x%x\\n\",$" + ",$".join(regs)]), ] def __init__(self, args: argparse.Namespace, renode_proc: pexpect.spawn, ref_proc: pexpect.spawn): """Creates 2 `GDBInstance` objects, one expecting to connect on port `args.renode_gdb_port` and the other on `args.reference_gdb_port`.""" self.instances = [ GDBInstance(args.gdb_path, args.renode_gdb_port, args.debug_binary, "Renode", renode_proc), GDBInstance(args.gdb_path, args.reference_gdb_port, args.debug_binary, "Reference", ref_proc), ] self.cmd = args.command if args.command else self.build_command_from_register_list(args.registers.split(";")) def close(self) -> None: """Closes all owned instances.""" for i in self.instances: i.close() def build_command_from_register_list(self, regs: list[str]) -> str: """Defines a custom gdb command for pretty-printing all registers and returns its name.""" if GDBComparator.COMMANDS is None: # Assign registers to groups based on the RegNameTester functions reg_groups: dict[GDBComparator.CommandsBuilder, list[str]] = {} for reg in regs: for test, cmds_builder in GDBComparator.REGISTER_CASES: if test(reg): reg_groups.setdefault(cmds_builder, []).append(reg) break # Compose a gdb script that defines a custom command for printing all groups of registers GDBComparator.COMMANDS = [ f"define {GDBComparator.COMMAND_NAME}", *[cmd for cmds_builder, reg_group in reg_groups.items() for cmd in cmds_builder(reg_group)], "end" ] # Warn if for any GDBInstance there is a register that was requested by the user # but does not appear in the output of "info registers all" for i in self.instances: i.run_command("i r all", async_=False) reported_regs = list(map(lambda x: x.split()[0], i.last_output.split("\n")[1:-1])) not_found = list(filter(lambda reg: reg not in reported_regs, regs)) if not_found: print("WARNING: " + ", ".join(not_found) + " register[s] not found when executing 'info registers all' for " + i.name) # Define the custom command commands = GDBComparator.COMMANDS for i in self.instances: for cmd in commands[:-1]: i.run_command(cmd, dont_wait_for_output=True, async_=False) i.run_command(commands[-1], async_=False) return GDBComparator.COMMAND_NAME def delete_breakpoints(self) -> None: """Deletes all breakpoints in all owned instances.""" for i in self.instances: i.delete_breakpoints() def get_symbol_at(self, addr: str) -> str: """Returns the name of the symbol which is stored at `addr` (`info symbol`).""" return self.instances[0].get_symbol_at(addr) def print_stack(self, stack: Stack) -> None: """Prints a stack.""" return self.instances[0].print_stack(stack) async def run_command(self, cmd: Optional[str] = None, **kwargs: Any) -> list[str]: """Sends an arbitrary command to all owned instances and returns a list of outputs.""" cmd = cmd if cmd else self.cmd for i in self.instances: i.run_command(cmd, **kwargs) await asyncio.gather(*[i.expect(**kwargs) for i in self.instances]) return [i.last_output for i in self.instances] async def get_pcs(self) -> list[str]: """Returns a list containing the values of PC registers of all owned instances, as hex strings.""" return await asyncio.gather(*[i.get_pc() for i in self.instances]) async def progress_by(self, delta: int, type: str = "stepi") -> None: """Steps `delta` times in all owned instances.""" adjusted_timeout = max(120, int(delta) / 5) await self.run_command(type + (f" {delta}" if int(delta) > 1 else ""), timeout=adjusted_timeout) async def compare_instances(self, previous_pc: str) -> None: """Compares the execution states of all owned instances. `previous_pc` must refer to the previous value of PC; it does not offer a choice.""" for name, command in [("Opcode at previous pc", f"x/i {previous_pc}"), ("Frame", "frame"), ("Registers", "info registers all")]: print("*** " + name + ":") GDBComparator.compare_outputs(await self.run_command(command)) @staticmethod def compare_outputs(outputs: list[str]) -> None: """Prints a comparison of two output strings (same & different values).""" assert len(outputs) == 2 output1_dict: dict[str, str] = {} output2_dict: dict[str, str] = {} # Truncate 1st elements in outputs, because it's the repl for output, output_dict in zip([x.split("\n")[1:] for x in outputs], [output1_dict, output2_dict]): for x in output: end_of_name = x.strip().find(" ") name = x[:end_of_name].strip() rest = x[end_of_name:].strip() output_dict[name] = rest output_same = "" output_different = "" for name in output1_dict.keys(): if name in output2_dict: if name == "": continue if output1_dict[name] != output2_dict[name]: output_different += f">> {name}:\n" output_different += string_compare(output1_dict[name], output2_dict[name]) + "\n" else: output_same += f">> {name}:\t{output1_dict[name]}\n" if len(output_different) == 0: print("Same:") print(output_same) else: print("Same values:") print(output_same) print("Different values:") print(output_different) def setup_processes(args: argparse.Namespace) -> tuple[Renode, pexpect.spawn, GDBComparator]: """Spawns Renode, the reference process, `GDBComparator` and returns their handles (in that order).""" reference = pexpect.spawn(args.reference_command, timeout=10) renode = Renode(args.renode_path, args.renode_telnet_port) renode.command("include @" + path.abspath(args.renode_script), expected_log="System bus created") renode.command(f"machine StartGdbServer {args.renode_gdb_port}", expected_log=f"started on port :{args.renode_gdb_port}") gdb_comparator = GDBComparator(args, renode.proc, reference) renode.command("start") return renode, reference, gdb_comparator def string_compare(renode_string: str, reference_string: str) -> str: """Returns a pretty diff of two single-line strings.""" BOLD = "\033[1m" END = "\033[0m" RED = "\033[91m" GREEN = "\033[92m" renode_string = re.sub(r"\x1b\[[0-9]*m", "", renode_string) reference_string = re.sub(r"\x1b\[[0-9]*m", "", reference_string) assert len(RED) == len(GREEN) formatting_length = len(BOLD + RED + END) s1_insertions = 0 s2_insertions = 0 diff = difflib.SequenceMatcher(None, renode_string, reference_string) for type, s1_start, s1_end, s2_start, s2_end in diff.get_opcodes(): if type == "equal": continue elif type == "replace": s1_start += s1_insertions * formatting_length s1_end += s1_insertions * formatting_length s2_end += s2_insertions * formatting_length s2_start += s2_insertions * formatting_length renode_string = renode_string[:s1_start] + GREEN + BOLD + renode_string[s1_start:s1_end] + END + renode_string[s1_end:] reference_string = reference_string[:s2_start] + RED + BOLD + reference_string[s2_start:s2_end] + END + reference_string[s2_end:] s1_insertions += 1 s2_insertions += 1 elif type == "insert": s2_end += s2_insertions * (len(BOLD) + len(RED) + len(END)) s2_start += s2_insertions * (len(BOLD) + len(RED) + len(END)) reference_string = reference_string[:s2_start] + RED + BOLD + \ reference_string[s2_start:s2_end] + END + reference_string[s2_end:] s2_insertions += 1 elif type == "delete": s1_end += s1_insertions * (len(BOLD) + len(GREEN) + len(END)) s1_start += s1_insertions * (len(BOLD) + len(GREEN) + len(END)) renode_string = renode_string[:s1_start] + GREEN + BOLD + \ renode_string[s1_start:s1_end] + END + renode_string[s1_end:] s1_insertions += 1 return f"Renode: {renode_string}\nReference: {reference_string}" class CheckStatus: """This class serves as an enum for possible outcomes of the `check` function.""" STOP = 1 CONTINUE = 2 FOUND = 3 MISMATCH = 4 async def check(stack: Stack, gdb_comparator: GDBComparator, previous_pc: str, previous_output: str, steps_count: int, exec_count: dict[str, int], time_of_start: float, args: argparse.Namespace) -> tuple[str, str, int]: """Executes the next `gdb_comparator` instruction, compares the outputs and returns the new PC value, output and `CheckStatus`.""" ren_pc, pc = await gdb_comparator.get_pcs() pc_mismatch = False if pc != ren_pc: print("Renode and reference PC differs!") print(string_compare(ren_pc, pc)) print(f"\tPrevious PC: {previous_pc}") pc_mismatch = True if pc not in exec_count: exec_count[pc] = 0 exec_count[pc] += 1 if args.stop_address and int(ren_pc, 16) == args.stop_address: print("stop address reached") return previous_pc, previous_output, CheckStatus.STOP if not pc_mismatch: output_ren, output_reference = map(lambda s: s.splitlines(), await gdb_comparator.run_command()) for line in range(len(output_ren)): if output_ren[line] != output_reference[line]: print(SECTION_SEPARATOR) print(f"!!! Difference in line {line + 1} of output:") print(string_compare(output_ren[line], output_reference[line])) print(f"Previous: {previous_output}") break else: if steps_count % 10 == 0: print(f"{steps_count} steps; current pc = {pc} {gdb_comparator.get_symbol_at(pc)}") previous_pc = pc previous_output = "\n".join(output_ren[1:]) return previous_pc, previous_output, CheckStatus.CONTINUE if pc_mismatch or (len(stack) > 0 and previous_pc == stack[-1][0]): print(SECTION_SEPARATOR) print("Found faulting insn at " + previous_pc + " " + gdb_comparator.get_symbol_at(previous_pc)) elapsed_time = time() - time_of_start print(f"Took {elapsed_time:.2f} seconds [~ {elapsed_time/steps_count:.2f} steps/sec]") print(SECTION_SEPARATOR) print("*** Stack:") gdb_comparator.print_stack(stack) print("*** Gdb command:") print(args.command) print(SECTION_SEPARATOR) print("Gdb instances comparision:") await gdb_comparator.compare_instances(previous_pc) return previous_pc, previous_output, CheckStatus.FOUND if previous_pc not in exec_count: previous_pc = pc print("Found point after which state is different. Adding to `stack` for later iterations") occurrence = exec_count[previous_pc] print(f"\tAddress: {previous_pc}\n\tOccurrence: {occurrence}") stack.append((previous_pc, occurrence)) exec_count = {} print(SECTION_SEPARATOR) return previous_pc, previous_output, CheckStatus.MISMATCH async def main() -> None: """Script entry point.""" args = parser.parse_args() assert 0 <= args.reference_gdb_port <= 65535, "Illegal reference GDB port" assert 0 <= args.renode_gdb_port <= 65535, "Illegal Renode GDB port" assert 0 <= args.renode_telnet_port <= 65535, "Illegal Renode Telnet port" assert args.reference_gdb_port != args.renode_gdb_port != args.renode_telnet_port, "Overlapping port numbers" if args.stop_address: args.stop_address = int(args.stop_address, 16) pcs = [args.stop_address] if args.stop_address else [] if args.ips: pcs += [pc for pc in args.ips.split(";")] execution_cmd = "continue" if args.ips else "nexti" print(SECTION_SEPARATOR) time_of_start = time() previous_pc = "Unknown" previous_output = "Unknown" steps_count = 0 iterations_count = 0 stack = [] if args.start_frame is not None: jumps = args.start_frame.split(";") for jump in jumps: addr, occur = jump.split(",") address = addr.strip() occurrence = int(occur.strip()) stack.append((address, occurrence)) insn_found = False while not insn_found: iterations_count += 1 print("Preparing processes for iteration number " + str(iterations_count)) renode, reference, gdb_comparator = setup_processes(args) if len(stack) != 0: print("Recreating stack; jumping to breakpoint at:") for address, count in stack: print("\t" + address + ", " + str(count) + " occurrence") await gdb_comparator.run_command(f"break *{address}") for _ in range(count): await gdb_comparator.run_command("continue", timeout=120) gdb_comparator.delete_breakpoints() print("Stepping single instruction") await gdb_comparator.progress_by(1) for pc in pcs: await gdb_comparator.run_command(f"br *{pc}") exec_count: dict[str, int] = {} print("Starting execution") while True: await gdb_comparator.run_command(execution_cmd) steps_count += 1 previous_pc, previous_output, status = await check(stack, gdb_comparator, previous_pc, previous_output, steps_count, exec_count, time_of_start, args) if status == CheckStatus.CONTINUE and execution_cmd == "continue": await gdb_comparator.run_command("stepi") steps_count += 1 previous_pc, previous_output, status = await check(stack, gdb_comparator, previous_pc, previous_output, steps_count, exec_count, time_of_start, args) if status == CheckStatus.STOP: return elif status == CheckStatus.CONTINUE: continue elif status == CheckStatus.FOUND: insn_found = True break elif status == CheckStatus.MISMATCH: execution_cmd = "nexti" gdb_comparator.close() renode.close() reference.close(force=True) break else: exit(1) if __name__ == "__main__": asyncio.run(main()) exit(0)