1#!/usr/bin/env python 2# 3# esp-idf serial output monitor tool. Does some helpful things: 4# - Looks up hex addresses in ELF file with addr2line 5# - Reset ESP32 via serial RTS line (Ctrl-T Ctrl-R) 6# - Run flash build target to rebuild and flash entire project (Ctrl-T Ctrl-F) 7# - Run app-flash build target to rebuild and flash app only (Ctrl-T Ctrl-A) 8# - If gdbstub output is detected, gdb is automatically loaded 9# - If core dump output is detected, it is converted to a human-readable report 10# by espcoredump.py. 11# 12# SPDX-FileCopyrightText: 2015-2021 Espressif Systems (Shanghai) CO LTD 13# SPDX-License-Identifier: Apache-2.0 14# 15# Contains elements taken from miniterm "Very simple serial terminal" which 16# is part of pySerial. https://github.com/pyserial/pyserial 17# (C)2002-2015 Chris Liechti <cliechti@gmx.net> 18# 19# Originally released under BSD-3-Clause license. 20# 21 22import codecs 23import os 24import queue 25import re 26import shlex 27import subprocess 28import sys 29import threading 30import time 31from builtins import bytes 32from typing import Any, List, Optional, Type, Union 33 34import serial 35import serial.tools.list_ports 36# Windows console stuff 37from idf_monitor_base.ansi_color_converter import get_converter 38from idf_monitor_base.argument_parser import get_parser 39from idf_monitor_base.console_parser import ConsoleParser 40from idf_monitor_base.console_reader import ConsoleReader 41from idf_monitor_base.constants import (CTRL_C, CTRL_H, DEFAULT_PRINT_FILTER, DEFAULT_TOOLCHAIN_PREFIX, 42 ESPPORT_ENVIRON, EVENT_QUEUE_TIMEOUT, GDB_EXIT_TIMEOUT, 43 GDB_UART_CONTINUE_COMMAND, LAST_LINE_THREAD_INTERVAL, MAKEFLAGS_ENVIRON, 44 PANIC_DECODE_DISABLE, PANIC_IDLE, TAG_CMD, TAG_KEY, TAG_SERIAL, 45 TAG_SERIAL_FLUSH) 46from idf_monitor_base.coredump import COREDUMP_DECODE_INFO, CoreDump 47from idf_monitor_base.exceptions import SerialStopException 48from idf_monitor_base.gdbhelper import GDBHelper 49from idf_monitor_base.line_matcher import LineMatcher 50from idf_monitor_base.logger import Logger 51from idf_monitor_base.output_helpers import normal_print, yellow_print 52from idf_monitor_base.serial_handler import SerialHandler, run_make 53from idf_monitor_base.serial_reader import LinuxReader, SerialReader 54from idf_monitor_base.web_socket_client import WebSocketClient 55from serial.tools import miniterm 56 57key_description = miniterm.key_description 58 59class Monitor: 60 """ 61 Monitor application base class. 62 63 This was originally derived from miniterm.Miniterm, but it turned out to be easier to write from scratch for this 64 purpose. 65 66 Main difference is that all event processing happens in the main thread, not the worker threads. 67 """ 68 69 def __init__( 70 self, 71 serial_instance, # type: serial.Serial 72 elf_file, # type: str 73 print_filter, # type: str 74 make='make', # type: str 75 encrypted=False, # type: bool 76 toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, # type: str 77 eol='CRLF', # type: str 78 decode_coredumps=COREDUMP_DECODE_INFO, # type: str 79 decode_panic=PANIC_DECODE_DISABLE, # type: str 80 target='esp32', # type: str 81 websocket_client=None, # type: Optional[WebSocketClient] 82 enable_address_decoding=True, # type: bool 83 timestamps=False, # type: bool 84 timestamp_format='' # type: str 85 ): 86 self.event_queue = queue.Queue() # type: queue.Queue 87 self.cmd_queue = queue.Queue() # type: queue.Queue 88 self.console = miniterm.Console() 89 90 sys.stderr = get_converter(sys.stderr, decode_output=True) 91 self.console.output = get_converter(self.console.output) 92 self.console.byte_output = get_converter(self.console.byte_output) 93 94 self.elf_file = elf_file 95 self.logger = Logger(self.elf_file, self.console, timestamps, timestamp_format, b'', enable_address_decoding, 96 toolchain_prefix) 97 self.coredump = CoreDump(decode_coredumps, self.event_queue, self.logger, websocket_client, self.elf_file) 98 99 # allow for possibility the "make" arg is a list of arguments (for idf.py) 100 self.make = make if os.path.exists(make) else shlex.split(make) # type: Any[Union[str, List[str]], str] 101 self.target = target 102 103 # testing hook - data from serial can make exit the monitor 104 if isinstance(self, SerialMonitor): 105 socket_mode = serial_instance.port.startswith('socket://') 106 self.serial = serial_instance 107 self.serial_reader = SerialReader(self.serial, self.event_queue) 108 109 self.gdb_helper = GDBHelper(toolchain_prefix, websocket_client, self.elf_file, self.serial.port, 110 self.serial.baudrate) 111 else: 112 socket_mode = False 113 self.serial = subprocess.Popen([elf_file], stdin=subprocess.PIPE, stdout=subprocess.PIPE, 114 stderr=subprocess.STDOUT) 115 self.serial_reader = LinuxReader(self.serial, self.event_queue) 116 117 self.gdb_helper = None 118 119 self.serial_handler = SerialHandler(b'', socket_mode, self.logger, decode_panic, PANIC_IDLE, b'', target, 120 False, False, self.serial, encrypted) 121 122 self.console_parser = ConsoleParser(eol) 123 self.console_reader = ConsoleReader(self.console, self.event_queue, self.cmd_queue, self.console_parser, 124 socket_mode) 125 126 self._line_matcher = LineMatcher(print_filter) 127 128 # internal state 129 self._invoke_processing_last_line_timer = None # type: Optional[threading.Timer] 130 131 def __enter__(self) -> None: 132 """ Use 'with self' to temporarily disable monitoring behaviour """ 133 self.serial_reader.stop() 134 self.console_reader.stop() 135 136 def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore 137 raise NotImplementedError 138 139 def run_make(self, target: str) -> None: 140 with self: 141 run_make(target, self.make, self.console, self.console_parser, self.event_queue, self.cmd_queue, 142 self.logger) 143 144 def _pre_start(self) -> None: 145 self.console_reader.start() 146 self.serial_reader.start() 147 148 def main_loop(self) -> None: 149 self._pre_start() 150 151 try: 152 while self.console_reader.alive and self.serial_reader.alive: 153 try: 154 self._main_loop() 155 except KeyboardInterrupt: 156 yellow_print('To exit from IDF monitor please use \"Ctrl+]\" or \"Ctrl+C\"') 157 self.serial_write(codecs.encode(CTRL_C)) 158 except SerialStopException: 159 normal_print('Stopping condition has been received\n') 160 except KeyboardInterrupt: 161 pass 162 finally: 163 try: 164 self.console_reader.stop() 165 self.serial_reader.stop() 166 self.logger.stop_logging() 167 # Cancelling _invoke_processing_last_line_timer is not 168 # important here because receiving empty data doesn't matter. 169 self._invoke_processing_last_line_timer = None 170 except Exception: # noqa 171 pass 172 normal_print('\n') 173 174 def serial_write(self, *args, **kwargs): # type: ignore 175 raise NotImplementedError 176 177 def check_gdb_stub_and_run(self, line: bytes) -> None: 178 raise NotImplementedError 179 180 def invoke_processing_last_line(self) -> None: 181 self.event_queue.put((TAG_SERIAL_FLUSH, b''), False) 182 183 def _main_loop(self) -> None: 184 try: 185 item = self.cmd_queue.get_nowait() 186 except queue.Empty: 187 try: 188 item = self.event_queue.get(timeout=EVENT_QUEUE_TIMEOUT) 189 except queue.Empty: 190 return 191 192 event_tag, data = item 193 if event_tag == TAG_CMD: 194 self.serial_handler.handle_commands(data, self.target, self.run_make, self.console_reader, 195 self.serial_reader) 196 elif event_tag == TAG_KEY: 197 self.serial_write(codecs.encode(data)) 198 elif event_tag == TAG_SERIAL: 199 self.serial_handler.handle_serial_input(data, self.console_parser, self.coredump, 200 self.gdb_helper, self._line_matcher, 201 self.check_gdb_stub_and_run) 202 if self._invoke_processing_last_line_timer is not None: 203 self._invoke_processing_last_line_timer.cancel() 204 self._invoke_processing_last_line_timer = threading.Timer(LAST_LINE_THREAD_INTERVAL, 205 self.invoke_processing_last_line) 206 self._invoke_processing_last_line_timer.start() 207 # If no further data is received in the next short period 208 # of time then the _invoke_processing_last_line_timer 209 # generates an event which will result in the finishing of 210 # the last line. This is fix for handling lines sent 211 # without EOL. 212 # finalizing the line when coredump is in progress causes decoding issues 213 # the espcoredump loader uses empty line as a sign for end-of-coredump 214 # line is finalized only for non coredump data 215 elif event_tag == TAG_SERIAL_FLUSH: 216 self.serial_handler.handle_serial_input(data, self.console_parser, self.coredump, 217 self.gdb_helper, self._line_matcher, 218 self.check_gdb_stub_and_run, 219 finalize_line=not self.coredump.in_progress) 220 else: 221 raise RuntimeError('Bad event data %r' % ((event_tag, data),)) 222 223 224class SerialMonitor(Monitor): 225 def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore 226 """ Use 'with self' to temporarily disable monitoring behaviour """ 227 self.console_reader.start() 228 self.serial_reader.gdb_exit = self.gdb_helper.gdb_exit # write gdb_exit flag 229 self.serial_reader.start() 230 231 def _pre_start(self) -> None: 232 super()._pre_start() 233 self.gdb_helper.gdb_exit = False 234 self.serial_handler.start_cmd_sent = False 235 236 def serial_write(self, *args, **kwargs): # type: ignore 237 self.serial: serial.Serial 238 try: 239 self.serial.write(*args, **kwargs) 240 except serial.SerialException: 241 pass # this shouldn't happen, but sometimes port has closed in serial thread 242 except UnicodeEncodeError: 243 pass # this can happen if a non-ascii character was passed, ignoring 244 245 def check_gdb_stub_and_run(self, line: bytes) -> None: # type: ignore # The base class one is a None value 246 if self.gdb_helper.check_gdb_stub_trigger(line): 247 with self: # disable console control 248 self.gdb_helper.run_gdb() 249 250 def _main_loop(self) -> None: 251 if self.gdb_helper.gdb_exit: 252 self.gdb_helper.gdb_exit = False 253 time.sleep(GDB_EXIT_TIMEOUT) 254 # Continue the program after exit from the GDB 255 self.serial_write(codecs.encode(GDB_UART_CONTINUE_COMMAND)) 256 self.serial_handler.start_cmd_sent = True 257 258 super()._main_loop() 259 260 261class LinuxMonitor(Monitor): 262 def __exit__(self, exc_type, exc_val, exc_tb) -> None: # type: ignore 263 """ Use 'with self' to temporarily disable monitoring behaviour """ 264 self.console_reader.start() 265 self.serial_reader.start() 266 267 def serial_write(self, *args, **kwargs): # type: ignore 268 self.serial.stdin.write(*args, **kwargs) 269 270 def check_gdb_stub_and_run(self, line: bytes) -> None: 271 return # fake function for linux target 272 273 274def main() -> None: 275 parser = get_parser() 276 args = parser.parse_args() 277 278 # GDB uses CreateFile to open COM port, which requires the COM name to be r'\\.\COMx' if the COM 279 # number is larger than 10 280 if os.name == 'nt' and args.port.startswith('COM'): 281 args.port = args.port.replace('COM', r'\\.\COM') 282 yellow_print('--- WARNING: GDB cannot open serial ports accessed as COMx') 283 yellow_print('--- Using %s instead...' % args.port) 284 elif args.port.startswith('/dev/tty.') and sys.platform == 'darwin': 285 args.port = args.port.replace('/dev/tty.', '/dev/cu.') 286 yellow_print('--- WARNING: Serial ports accessed as /dev/tty.* will hang gdb if launched.') 287 yellow_print('--- Using %s instead...' % args.port) 288 289 args.elf_file.close() # don't need this as a file 290 291 # remove the parallel jobserver arguments from MAKEFLAGS, as any 292 # parent make is only running 1 job (monitor), so we can re-spawn 293 # all of the child makes we need (the -j argument remains part of 294 # MAKEFLAGS) 295 try: 296 makeflags = os.environ[MAKEFLAGS_ENVIRON] 297 makeflags = re.sub(r'--jobserver[^ =]*=[0-9,]+ ?', '', makeflags) 298 os.environ[MAKEFLAGS_ENVIRON] = makeflags 299 except KeyError: 300 pass # not running a make jobserver 301 302 ws = WebSocketClient(args.ws) if args.ws else None 303 try: 304 cls: Type[Monitor] 305 if args.target == 'linux': 306 serial_instance = None 307 cls = LinuxMonitor 308 yellow_print('--- idf_monitor on linux ---') 309 else: 310 serial_instance = serial.serial_for_url(args.port, args.baud, do_not_open=True) 311 serial_instance.dtr = False 312 serial_instance.rts = False 313 314 # Pass the actual used port to callee of idf_monitor (e.g. make) through `ESPPORT` environment 315 # variable 316 # To make sure the key as well as the value are str type, by the requirements of subprocess 317 espport_val = str(args.port) 318 os.environ.update({ESPPORT_ENVIRON: espport_val}) 319 320 cls = SerialMonitor 321 yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format(p=serial_instance)) 322 323 monitor = cls(serial_instance, 324 args.elf_file.name, 325 args.print_filter, 326 args.make, 327 args.encrypted, 328 args.toolchain_prefix, 329 args.eol, 330 args.decode_coredumps, 331 args.decode_panic, 332 args.target, 333 ws, 334 not args.disable_address_decoding, 335 args.timestamps, 336 args.timestamp_format) 337 338 yellow_print('--- Quit: {} | Menu: {} | Help: {} followed by {} ---'.format( 339 key_description(monitor.console_parser.exit_key), 340 key_description(monitor.console_parser.menu_key), 341 key_description(monitor.console_parser.menu_key), 342 key_description(CTRL_H))) 343 if args.print_filter != DEFAULT_PRINT_FILTER: 344 yellow_print('--- Print filter: {} ---'.format(args.print_filter)) 345 monitor.main_loop() 346 except KeyboardInterrupt: 347 pass 348 finally: 349 if ws: 350 ws.close() 351 352 353if __name__ == '__main__': 354 main() 355