1#!/usr/bin/env python 2# 3# esp-idf serial output monitor tool. Does some helpful things: 4# - Looks up hex addresses in ELF file with addr2line 5# - Reset ESP32 via serial RTS line (Ctrl-T Ctrl-R) 6# - Run flash build target to rebuild and flash entire project (Ctrl-T Ctrl-F) 7# - Run app-flash build target to rebuild and flash app only (Ctrl-T Ctrl-A) 8# - If gdbstub output is detected, gdb is automatically loaded 9# - If core dump output is detected, it is converted to a human-readable report 10# by espcoredump.py. 11# 12# Copyright 2015-2016 Espressif Systems (Shanghai) PTE LTD 13# 14# Licensed under the Apache License, Version 2.0 (the "License"); 15# you may not use this file except in compliance with the License. 16# You may obtain a copy of the License at 17# 18# http://www.apache.org/licenses/LICENSE-2.0 19# 20# Unless required by applicable law or agreed to in writing, software 21# distributed under the License is distributed on an "AS IS" BASIS, 22# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 23# See the License for the specific language governing permissions and 24# limitations under the License. 25# 26# Contains elements taken from miniterm "Very simple serial terminal" which 27# is part of pySerial. https://github.com/pyserial/pyserial 28# (C)2002-2015 Chris Liechti <cliechti@gmx.net> 29# 30# Originally released under BSD-3-Clause license. 31# 32from __future__ import division, print_function, unicode_literals 33 34import argparse 35import codecs 36import datetime 37import os 38import re 39import subprocess 40from builtins import bytes, chr, object 41 42try: 43 import queue 44except ImportError: 45 import Queue as queue 46 47import ctypes 48import json 49import shlex 50import sys 51import tempfile 52import textwrap 53import threading 54import time 55import types 56from distutils.version import StrictVersion 57from io import open 58 59import serial 60import serial.tools.list_ports 61import serial.tools.miniterm as miniterm 62 63try: 64 import websocket 65except ImportError: 66 # This is needed for IDE integration only. 67 pass 68 69key_description = miniterm.key_description 70 71# Control-key characters 72CTRL_A = '\x01' 73CTRL_B = '\x02' 74CTRL_F = '\x06' 75CTRL_H = '\x08' 76CTRL_R = '\x12' 77CTRL_T = '\x14' 78CTRL_Y = '\x19' 79CTRL_P = '\x10' 80CTRL_X = '\x18' 81CTRL_L = '\x0c' 82CTRL_RBRACKET = '\x1d' # Ctrl+] 83 84# Command parsed from console inputs 85CMD_STOP = 1 86CMD_RESET = 2 87CMD_MAKE = 3 88CMD_APP_FLASH = 4 89CMD_OUTPUT_TOGGLE = 5 90CMD_TOGGLE_LOGGING = 6 91CMD_ENTER_BOOT = 7 92 93# ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated) 94ANSI_RED = '\033[1;31m' 95ANSI_YELLOW = '\033[0;33m' 96ANSI_NORMAL = '\033[0m' 97 98 99def color_print(message, color, newline='\n'): 100 """ Print a message to stderr with colored highlighting """ 101 sys.stderr.write('%s%s%s%s' % (color, message, ANSI_NORMAL, newline)) 102 103 104def yellow_print(message, newline='\n'): 105 color_print(message, ANSI_YELLOW, newline) 106 107 108def red_print(message, newline='\n'): 109 color_print(message, ANSI_RED, newline) 110 111 112__version__ = '1.1' 113 114# Tags for tuples in queues 115TAG_KEY = 0 116TAG_SERIAL = 1 117TAG_SERIAL_FLUSH = 2 118TAG_CMD = 3 119 120# regex matches an potential PC value (0x4xxxxxxx) 121MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE) 122 123DEFAULT_TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-' 124 125DEFAULT_PRINT_FILTER = '' 126 127# coredump related messages 128COREDUMP_UART_START = b'================= CORE DUMP START =================' 129COREDUMP_UART_END = b'================= CORE DUMP END =================' 130COREDUMP_UART_PROMPT = b'Press Enter to print core dump to UART...' 131 132# coredump states 133COREDUMP_IDLE = 0 134COREDUMP_READING = 1 135COREDUMP_DONE = 2 136 137# coredump decoding options 138COREDUMP_DECODE_DISABLE = 'disable' 139COREDUMP_DECODE_INFO = 'info' 140 141# panic handler related messages 142PANIC_START = r'Core \s*\d+ register dump:' 143PANIC_END = b'ELF file SHA256:' 144PANIC_STACK_DUMP = b'Stack memory:' 145 146# panic handler decoding states 147PANIC_IDLE = 0 148PANIC_READING = 1 149 150# panic handler decoding options 151PANIC_DECODE_DISABLE = 'disable' 152PANIC_DECODE_BACKTRACE = 'backtrace' 153 154 155class StoppableThread(object): 156 """ 157 Provide a Thread-like class which can be 'cancelled' via a subclass-provided 158 cancellation method. 159 160 Can be started and stopped multiple times. 161 162 Isn't an instance of type Thread because Python Thread objects can only be run once 163 """ 164 def __init__(self): 165 self._thread = None 166 167 @property 168 def alive(self): 169 """ 170 Is 'alive' whenever the internal thread object exists 171 """ 172 return self._thread is not None 173 174 def start(self): 175 if self._thread is None: 176 self._thread = threading.Thread(target=self._run_outer) 177 self._thread.start() 178 179 def _cancel(self): 180 pass # override to provide cancellation functionality 181 182 def run(self): 183 pass # override for the main thread behaviour 184 185 def _run_outer(self): 186 try: 187 self.run() 188 finally: 189 self._thread = None 190 191 def stop(self): 192 if self._thread is not None: 193 old_thread = self._thread 194 self._thread = None 195 self._cancel() 196 old_thread.join() 197 198 199class ConsoleReader(StoppableThread): 200 """ Read input keys from the console and push them to the queue, 201 until stopped. 202 """ 203 def __init__(self, console, event_queue, cmd_queue, parser, test_mode): 204 super(ConsoleReader, self).__init__() 205 self.console = console 206 self.event_queue = event_queue 207 self.cmd_queue = cmd_queue 208 self.parser = parser 209 self.test_mode = test_mode 210 211 def run(self): 212 self.console.setup() 213 try: 214 while self.alive: 215 try: 216 if os.name == 'nt': 217 # Windows kludge: because the console.cancel() method doesn't 218 # seem to work to unblock getkey() on the Windows implementation. 219 # 220 # So we only call getkey() if we know there's a key waiting for us. 221 import msvcrt 222 while not msvcrt.kbhit() and self.alive: 223 time.sleep(0.1) 224 if not self.alive: 225 break 226 elif self.test_mode: 227 # In testing mode the stdin is connected to PTY but is not used for input anything. For PTY 228 # the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey(). 229 # Therefore, we avoid calling it. 230 while self.alive: 231 time.sleep(0.1) 232 break 233 c = self.console.getkey() 234 except KeyboardInterrupt: 235 c = '\x03' 236 if c is not None: 237 ret = self.parser.parse(c) 238 if ret is not None: 239 (tag, cmd) = ret 240 # stop command should be executed last 241 if tag == TAG_CMD and cmd != CMD_STOP: 242 self.cmd_queue.put(ret) 243 else: 244 self.event_queue.put(ret) 245 246 finally: 247 self.console.cleanup() 248 249 def _cancel(self): 250 if os.name == 'posix' and not self.test_mode: 251 # this is the way cancel() is implemented in pyserial 3.3 or newer, 252 # older pyserial (3.1+) has cancellation implemented via 'select', 253 # which does not work when console sends an escape sequence response 254 # 255 # even older pyserial (<3.1) does not have this method 256 # 257 # on Windows there is a different (also hacky) fix, applied above. 258 # 259 # note that TIOCSTI is not implemented in WSL / bash-on-Windows. 260 # TODO: introduce some workaround to make it work there. 261 # 262 # Note: This would throw exception in testing mode when the stdin is connected to PTY. 263 import fcntl 264 import termios 265 fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0') 266 267 268class ConsoleParser(object): 269 270 def __init__(self, eol='CRLF'): 271 self.translate_eol = { 272 'CRLF': lambda c: c.replace('\n', '\r\n'), 273 'CR': lambda c: c.replace('\n', '\r'), 274 'LF': lambda c: c.replace('\r', '\n'), 275 }[eol] 276 self.menu_key = CTRL_T 277 self.exit_key = CTRL_RBRACKET 278 self._pressed_menu_key = False 279 280 def parse(self, key): 281 ret = None 282 if self._pressed_menu_key: 283 ret = self._handle_menu_key(key) 284 elif key == self.menu_key: 285 self._pressed_menu_key = True 286 elif key == self.exit_key: 287 ret = (TAG_CMD, CMD_STOP) 288 else: 289 key = self.translate_eol(key) 290 ret = (TAG_KEY, key) 291 return ret 292 293 def _handle_menu_key(self, c): 294 ret = None 295 if c == self.exit_key or c == self.menu_key: # send verbatim 296 ret = (TAG_KEY, c) 297 elif c in [CTRL_H, 'h', 'H', '?']: 298 red_print(self.get_help_text()) 299 elif c == CTRL_R: # Reset device via RTS 300 ret = (TAG_CMD, CMD_RESET) 301 elif c == CTRL_F: # Recompile & upload 302 ret = (TAG_CMD, CMD_MAKE) 303 elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only 304 # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used 305 # instead 306 ret = (TAG_CMD, CMD_APP_FLASH) 307 elif c == CTRL_Y: # Toggle output display 308 ret = (TAG_CMD, CMD_OUTPUT_TOGGLE) 309 elif c == CTRL_L: # Toggle saving output into file 310 ret = (TAG_CMD, CMD_TOGGLE_LOGGING) 311 elif c == CTRL_P: 312 yellow_print('Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart') 313 # to fast trigger pause without press menu key 314 ret = (TAG_CMD, CMD_ENTER_BOOT) 315 elif c in [CTRL_X, 'x', 'X']: # Exiting from within the menu 316 ret = (TAG_CMD, CMD_STOP) 317 else: 318 red_print('--- unknown menu character {} --'.format(key_description(c))) 319 320 self._pressed_menu_key = False 321 return ret 322 323 def get_help_text(self): 324 text = """\ 325 --- idf_monitor ({version}) - ESP-IDF monitor tool 326 --- based on miniterm from pySerial 327 --- 328 --- {exit:8} Exit program 329 --- {menu:8} Menu escape key, followed by: 330 --- Menu keys: 331 --- {menu:14} Send the menu character itself to remote 332 --- {exit:14} Send the exit character itself to remote 333 --- {reset:14} Reset target board via RTS line 334 --- {makecmd:14} Build & flash project 335 --- {appmake:14} Build & flash app only 336 --- {output:14} Toggle output display 337 --- {log:14} Toggle saving output into file 338 --- {pause:14} Reset target into bootloader to pause app via RTS line 339 --- {menuexit:14} Exit program 340 """.format(version=__version__, 341 exit=key_description(self.exit_key), 342 menu=key_description(self.menu_key), 343 reset=key_description(CTRL_R), 344 makecmd=key_description(CTRL_F), 345 appmake=key_description(CTRL_A) + ' (or A)', 346 output=key_description(CTRL_Y), 347 log=key_description(CTRL_L), 348 pause=key_description(CTRL_P), 349 menuexit=key_description(CTRL_X) + ' (or X)') 350 return textwrap.dedent(text) 351 352 def get_next_action_text(self): 353 text = """\ 354 --- Press {} to exit monitor. 355 --- Press {} to build & flash project. 356 --- Press {} to build & flash app. 357 --- Press any other key to resume monitor (resets target). 358 """.format(key_description(self.exit_key), 359 key_description(CTRL_F), 360 key_description(CTRL_A)) 361 return textwrap.dedent(text) 362 363 def parse_next_action_key(self, c): 364 ret = None 365 if c == self.exit_key: 366 ret = (TAG_CMD, CMD_STOP) 367 elif c == CTRL_F: # Recompile & upload 368 ret = (TAG_CMD, CMD_MAKE) 369 elif c in [CTRL_A, 'a', 'A']: # Recompile & upload app only 370 # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used 371 # instead 372 ret = (TAG_CMD, CMD_APP_FLASH) 373 return ret 374 375 376class SerialReader(StoppableThread): 377 """ Read serial data from the serial port and push to the 378 event queue, until stopped. 379 """ 380 def __init__(self, serial, event_queue): 381 super(SerialReader, self).__init__() 382 self.baud = serial.baudrate 383 self.serial = serial 384 self.event_queue = event_queue 385 if not hasattr(self.serial, 'cancel_read'): 386 # enable timeout for checking alive flag, 387 # if cancel_read not available 388 self.serial.timeout = 0.25 389 390 def run(self): 391 if not self.serial.is_open: 392 self.serial.baudrate = self.baud 393 self.serial.rts = True # Force an RTS reset on open 394 self.serial.open() 395 time.sleep(0.005) # Add a delay to meet the requirements of minimal EN low time (2ms for ESP32-C3) 396 self.serial.rts = False 397 self.serial.dtr = self.serial.dtr # usbser.sys workaround 398 try: 399 while self.alive: 400 try: 401 data = self.serial.read(self.serial.in_waiting or 1) 402 except (serial.serialutil.SerialException, IOError) as e: 403 data = b'' 404 # self.serial.open() was successful before, therefore, this is an issue related to 405 # the disappearance of the device 406 red_print(e) 407 yellow_print('Waiting for the device to reconnect', newline='') 408 self.serial.close() 409 while self.alive: # so that exiting monitor works while waiting 410 try: 411 time.sleep(0.5) 412 self.serial.open() 413 break # device connected 414 except serial.serialutil.SerialException: 415 yellow_print('.', newline='') 416 sys.stderr.flush() 417 yellow_print('') # go to new line 418 if len(data): 419 self.event_queue.put((TAG_SERIAL, data), False) 420 finally: 421 self.serial.close() 422 423 def _cancel(self): 424 if hasattr(self.serial, 'cancel_read'): 425 try: 426 self.serial.cancel_read() 427 except Exception: 428 pass 429 430 431class LineMatcher(object): 432 """ 433 Assembles a dictionary of filtering rules based on the --print_filter 434 argument of idf_monitor. Then later it is used to match lines and 435 determine whether they should be shown on screen or not. 436 """ 437 LEVEL_N = 0 438 LEVEL_E = 1 439 LEVEL_W = 2 440 LEVEL_I = 3 441 LEVEL_D = 4 442 LEVEL_V = 5 443 444 level = {'N': LEVEL_N, 'E': LEVEL_E, 'W': LEVEL_W, 'I': LEVEL_I, 'D': LEVEL_D, 445 'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V} 446 447 def __init__(self, print_filter): 448 self._dict = dict() 449 self._re = re.compile(r'^(?:\033\[[01];?[0-9]+m?)?([EWIDV]) \([0-9]+\) ([^:]+): ') 450 items = print_filter.split() 451 if len(items) == 0: 452 self._dict['*'] = self.LEVEL_V # default is to print everything 453 for f in items: 454 s = f.split(r':') 455 if len(s) == 1: 456 # specifying no warning level defaults to verbose level 457 lev = self.LEVEL_V 458 elif len(s) == 2: 459 if len(s[0]) == 0: 460 raise ValueError('No tag specified in filter ' + f) 461 try: 462 lev = self.level[s[1].upper()] 463 except KeyError: 464 raise ValueError('Unknown warning level in filter ' + f) 465 else: 466 raise ValueError('Missing ":" in filter ' + f) 467 self._dict[s[0]] = lev 468 469 def match(self, line): 470 try: 471 m = self._re.search(line) 472 if m: 473 lev = self.level[m.group(1)] 474 if m.group(2) in self._dict: 475 return self._dict[m.group(2)] >= lev 476 return self._dict.get('*', self.LEVEL_N) >= lev 477 except (KeyError, IndexError): 478 # Regular line written with something else than ESP_LOG* 479 # or an empty line. 480 pass 481 # We need something more than "*.N" for printing. 482 return self._dict.get('*', self.LEVEL_N) > self.LEVEL_N 483 484 485class SerialStopException(Exception): 486 """ 487 This exception is used for stopping the IDF monitor in testing mode. 488 """ 489 pass 490 491 492class Monitor(object): 493 """ 494 Monitor application main class. 495 496 This was originally derived from miniterm.Miniterm, but it turned out to be easier to write from scratch for this 497 purpose. 498 499 Main difference is that all event processing happens in the main thread, not the worker threads. 500 """ 501 def __init__(self, serial_instance, elf_file, print_filter, make='make', encrypted=False, 502 toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, eol='CRLF', 503 decode_coredumps=COREDUMP_DECODE_INFO, 504 decode_panic=PANIC_DECODE_DISABLE, 505 target=None, 506 websocket_client=None, 507 enable_address_decoding=True): 508 super(Monitor, self).__init__() 509 self.event_queue = queue.Queue() 510 self.cmd_queue = queue.Queue() 511 self.console = miniterm.Console() 512 self.enable_address_decoding = enable_address_decoding 513 if os.name == 'nt': 514 sys.stderr = ANSIColorConverter(sys.stderr, decode_output=True) 515 self.console.output = ANSIColorConverter(self.console.output) 516 self.console.byte_output = ANSIColorConverter(self.console.byte_output) 517 518 if StrictVersion(serial.VERSION) < StrictVersion('3.3.0'): 519 # Use Console.getkey implementation from 3.3.0 (to be in sync with the ConsoleReader._cancel patch above) 520 def getkey_patched(self): 521 c = self.enc_stdin.read(1) 522 if c == chr(0x7f): 523 c = chr(8) # map the BS key (which yields DEL) to backspace 524 return c 525 526 self.console.getkey = types.MethodType(getkey_patched, self.console) 527 528 socket_mode = serial_instance.port.startswith('socket://') # testing hook - data from serial can make exit the monitor 529 self.serial = serial_instance 530 self.console_parser = ConsoleParser(eol) 531 self.console_reader = ConsoleReader(self.console, self.event_queue, self.cmd_queue, self.console_parser, socket_mode) 532 self.serial_reader = SerialReader(self.serial, self.event_queue) 533 self.elf_file = elf_file 534 if not os.path.exists(make): 535 self.make = shlex.split(make) # allow for possibility the "make" arg is a list of arguments (for idf.py) 536 else: 537 self.make = make 538 self.encrypted = encrypted 539 self.toolchain_prefix = toolchain_prefix 540 self.websocket_client = websocket_client 541 self.target = target 542 543 # internal state 544 self._last_line_part = b'' 545 self._gdb_buffer = b'' 546 self._pc_address_buffer = b'' 547 self._line_matcher = LineMatcher(print_filter) 548 self._invoke_processing_last_line_timer = None 549 self._force_line_print = False 550 self._output_enabled = True 551 self._serial_check_exit = socket_mode 552 self._log_file = None 553 self._decode_coredumps = decode_coredumps 554 self._reading_coredump = COREDUMP_IDLE 555 self._coredump_buffer = b'' 556 self._decode_panic = decode_panic 557 self._reading_panic = PANIC_IDLE 558 self._panic_buffer = b'' 559 560 def invoke_processing_last_line(self): 561 self.event_queue.put((TAG_SERIAL_FLUSH, b''), False) 562 563 def main_loop(self): 564 self.console_reader.start() 565 self.serial_reader.start() 566 try: 567 while self.console_reader.alive and self.serial_reader.alive: 568 try: 569 item = self.cmd_queue.get_nowait() 570 except queue.Empty: 571 try: 572 item = self.event_queue.get(True, 0.03) 573 except queue.Empty: 574 continue 575 576 (event_tag, data) = item 577 if event_tag == TAG_CMD: 578 self.handle_commands(data) 579 elif event_tag == TAG_KEY: 580 try: 581 self.serial.write(codecs.encode(data)) 582 except serial.SerialException: 583 pass # this shouldn't happen, but sometimes port has closed in serial thread 584 except UnicodeEncodeError: 585 pass # this can happen if a non-ascii character was passed, ignoring 586 elif event_tag == TAG_SERIAL: 587 self.handle_serial_input(data) 588 if self._invoke_processing_last_line_timer is not None: 589 self._invoke_processing_last_line_timer.cancel() 590 self._invoke_processing_last_line_timer = threading.Timer(0.1, self.invoke_processing_last_line) 591 self._invoke_processing_last_line_timer.start() 592 # If no further data is received in the next short period 593 # of time then the _invoke_processing_last_line_timer 594 # generates an event which will result in the finishing of 595 # the last line. This is fix for handling lines sent 596 # without EOL. 597 elif event_tag == TAG_SERIAL_FLUSH: 598 self.handle_serial_input(data, finalize_line=True) 599 else: 600 raise RuntimeError('Bad event data %r' % ((event_tag,data),)) 601 except SerialStopException: 602 sys.stderr.write(ANSI_NORMAL + 'Stopping condition has been received\n') 603 finally: 604 try: 605 self.console_reader.stop() 606 self.serial_reader.stop() 607 self.stop_logging() 608 # Cancelling _invoke_processing_last_line_timer is not 609 # important here because receiving empty data doesn't matter. 610 self._invoke_processing_last_line_timer = None 611 except Exception: 612 pass 613 sys.stderr.write(ANSI_NORMAL + '\n') 614 615 def handle_serial_input(self, data, finalize_line=False): 616 sp = data.split(b'\n') 617 if self._last_line_part != b'': 618 # add unprocessed part from previous "data" to the first line 619 sp[0] = self._last_line_part + sp[0] 620 self._last_line_part = b'' 621 if sp[-1] != b'': 622 # last part is not a full line 623 self._last_line_part = sp.pop() 624 for line in sp: 625 if line != b'': 626 if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'): 627 raise SerialStopException() 628 self.check_panic_decode_trigger(line) 629 self.check_coredump_trigger_before_print(line) 630 if self._force_line_print or self._line_matcher.match(line.decode(errors='ignore')): 631 self._print(line + b'\n') 632 self.handle_possible_pc_address_in_line(line) 633 self.check_coredump_trigger_after_print(line) 634 self.check_gdbstub_trigger(line) 635 self._force_line_print = False 636 # Now we have the last part (incomplete line) in _last_line_part. By 637 # default we don't touch it and just wait for the arrival of the rest 638 # of the line. But after some time when we didn't received it we need 639 # to make a decision. 640 if self._last_line_part != b'': 641 if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors='ignore'))): 642 self._force_line_print = True 643 self._print(self._last_line_part) 644 self.handle_possible_pc_address_in_line(self._last_line_part) 645 self.check_gdbstub_trigger(self._last_line_part) 646 # It is possible that the incomplete line cuts in half the PC 647 # address. A small buffer is kept and will be used the next time 648 # handle_possible_pc_address_in_line is invoked to avoid this problem. 649 # MATCH_PCADDR matches 10 character long addresses. Therefore, we 650 # keep the last 9 characters. 651 self._pc_address_buffer = self._last_line_part[-9:] 652 # GDB sequence can be cut in half also. GDB sequence is 7 653 # characters long, therefore, we save the last 6 characters. 654 self._gdb_buffer = self._last_line_part[-6:] 655 self._last_line_part = b'' 656 # else: keeping _last_line_part and it will be processed the next time 657 # handle_serial_input is invoked 658 659 def handle_possible_pc_address_in_line(self, line): 660 line = self._pc_address_buffer + line 661 self._pc_address_buffer = b'' 662 if self.enable_address_decoding: 663 for m in re.finditer(MATCH_PCADDR, line.decode(errors='ignore')): 664 self.lookup_pc_address(m.group()) 665 666 def __enter__(self): 667 """ Use 'with self' to temporarily disable monitoring behaviour """ 668 self.serial_reader.stop() 669 self.console_reader.stop() 670 671 def __exit__(self, *args, **kwargs): 672 """ Use 'with self' to temporarily disable monitoring behaviour """ 673 self.console_reader.start() 674 self.serial_reader.start() 675 676 def prompt_next_action(self, reason): 677 self.console.setup() # set up console to trap input characters 678 try: 679 red_print('--- {}'.format(reason)) 680 red_print(self.console_parser.get_next_action_text()) 681 682 k = CTRL_T # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc. 683 while k == CTRL_T: 684 k = self.console.getkey() 685 finally: 686 self.console.cleanup() 687 ret = self.console_parser.parse_next_action_key(k) 688 if ret is not None: 689 cmd = ret[1] 690 if cmd == CMD_STOP: 691 # the stop command should be handled last 692 self.event_queue.put(ret) 693 else: 694 self.cmd_queue.put(ret) 695 696 def run_make(self, target): 697 with self: 698 if isinstance(self.make, list): 699 popen_args = self.make + [target] 700 else: 701 popen_args = [self.make, target] 702 yellow_print('Running %s...' % ' '.join(popen_args)) 703 p = subprocess.Popen(popen_args, env=os.environ) 704 try: 705 p.wait() 706 except KeyboardInterrupt: 707 p.wait() 708 if p.returncode != 0: 709 self.prompt_next_action('Build failed') 710 else: 711 self.output_enable(True) 712 713 def lookup_pc_address(self, pc_addr): 714 cmd = ['%saddr2line' % self.toolchain_prefix, 715 '-pfiaC', '-e', self.elf_file, pc_addr] 716 try: 717 translation = subprocess.check_output(cmd, cwd='.') 718 if b'?? ??:0' not in translation: 719 self._print(translation.decode(), console_printer=yellow_print) 720 except OSError as e: 721 red_print('%s: %s' % (' '.join(cmd), e)) 722 723 def check_gdbstub_trigger(self, line): 724 line = self._gdb_buffer + line 725 self._gdb_buffer = b'' 726 m = re.search(b'\\$(T..)#(..)', line) # look for a gdb "reason" for a break 727 if m is not None: 728 try: 729 chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF 730 calc_chsum = int(m.group(2), 16) 731 except ValueError: 732 return # payload wasn't valid hex digits 733 if chsum == calc_chsum: 734 if self.websocket_client: 735 yellow_print('Communicating through WebSocket') 736 self.websocket_client.send({'event': 'gdb_stub', 737 'port': self.serial.port, 738 'prog': self.elf_file}) 739 yellow_print('Waiting for debug finished event') 740 self.websocket_client.wait([('event', 'debug_finished')]) 741 yellow_print('Communications through WebSocket is finished') 742 else: 743 self.run_gdb() 744 else: 745 red_print('Malformed gdb message... calculated checksum %02x received %02x' % (chsum, calc_chsum)) 746 747 def check_coredump_trigger_before_print(self, line): 748 if self._decode_coredumps == COREDUMP_DECODE_DISABLE: 749 return 750 751 if COREDUMP_UART_PROMPT in line: 752 yellow_print('Initiating core dump!') 753 self.event_queue.put((TAG_KEY, '\n')) 754 return 755 756 if COREDUMP_UART_START in line: 757 yellow_print('Core dump started (further output muted)') 758 self._reading_coredump = COREDUMP_READING 759 self._coredump_buffer = b'' 760 self._output_enabled = False 761 return 762 763 if COREDUMP_UART_END in line: 764 self._reading_coredump = COREDUMP_DONE 765 yellow_print('\nCore dump finished!') 766 self.process_coredump() 767 return 768 769 if self._reading_coredump == COREDUMP_READING: 770 kb = 1024 771 buffer_len_kb = len(self._coredump_buffer) // kb 772 self._coredump_buffer += line.replace(b'\r', b'') + b'\n' 773 new_buffer_len_kb = len(self._coredump_buffer) // kb 774 if new_buffer_len_kb > buffer_len_kb: 775 yellow_print('Received %3d kB...' % (new_buffer_len_kb), newline='\r') 776 777 def check_coredump_trigger_after_print(self, line): 778 if self._decode_coredumps == COREDUMP_DECODE_DISABLE: 779 return 780 781 # Re-enable output after the last line of core dump has been consumed 782 if not self._output_enabled and self._reading_coredump == COREDUMP_DONE: 783 self._reading_coredump = COREDUMP_IDLE 784 self._output_enabled = True 785 self._coredump_buffer = b'' 786 787 def process_coredump(self): 788 if self._decode_coredumps != COREDUMP_DECODE_INFO: 789 raise NotImplementedError('process_coredump: %s not implemented' % self._decode_coredumps) 790 791 coredump_script = os.path.join(os.path.dirname(__file__), '..', 'components', 'espcoredump', 'espcoredump.py') 792 coredump_file = None 793 try: 794 # On Windows, the temporary file can't be read unless it is closed. 795 # Set delete=False and delete the file manually later. 796 with tempfile.NamedTemporaryFile(mode='wb', delete=False) as coredump_file: 797 coredump_file.write(self._coredump_buffer) 798 coredump_file.flush() 799 800 if self.websocket_client: 801 self._output_enabled = True 802 yellow_print('Communicating through WebSocket') 803 self.websocket_client.send({'event': 'coredump', 804 'file': coredump_file.name, 805 'prog': self.elf_file}) 806 yellow_print('Waiting for debug finished event') 807 self.websocket_client.wait([('event', 'debug_finished')]) 808 yellow_print('Communications through WebSocket is finished') 809 else: 810 cmd = [sys.executable, 811 coredump_script, 812 'info_corefile', 813 '--core', coredump_file.name, 814 '--core-format', 'b64', 815 self.elf_file 816 ] 817 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) 818 self._output_enabled = True 819 self._print(output) 820 self._output_enabled = False # Will be reenabled in check_coredump_trigger_after_print 821 except subprocess.CalledProcessError as e: 822 yellow_print('Failed to run espcoredump script: {}\n{}\n\n'.format(e, e.output)) 823 self._output_enabled = True 824 self._print(COREDUMP_UART_START + b'\n') 825 self._print(self._coredump_buffer) 826 # end line will be printed in handle_serial_input 827 finally: 828 if coredump_file is not None: 829 try: 830 os.unlink(coredump_file.name) 831 except OSError as e: 832 yellow_print("Couldn't remote temporary core dump file ({})".format(e)) 833 834 def check_panic_decode_trigger(self, line): 835 if self._decode_panic == PANIC_DECODE_DISABLE: 836 return 837 838 if self._reading_panic == PANIC_IDLE and re.search(PANIC_START, line.decode('ascii', errors='ignore')): 839 self._reading_panic = PANIC_READING 840 yellow_print('Stack dump detected') 841 842 if self._reading_panic == PANIC_READING and PANIC_STACK_DUMP in line: 843 self._output_enabled = False 844 845 if self._reading_panic == PANIC_READING: 846 self._panic_buffer += line.replace(b'\r', b'') + b'\n' 847 848 if self._reading_panic == PANIC_READING and PANIC_END in line: 849 self._reading_panic = PANIC_IDLE 850 self._output_enabled = True 851 self.process_panic_output(self._panic_buffer) 852 self._panic_buffer = b'' 853 854 def process_panic_output(self, panic_output): 855 panic_output_decode_script = os.path.join(os.path.dirname(__file__), '..', 'tools', 'gdb_panic_server.py') 856 panic_output_file = None 857 try: 858 # On Windows, the temporary file can't be read unless it is closed. 859 # Set delete=False and delete the file manually later. 860 with tempfile.NamedTemporaryFile(mode='wb', delete=False) as panic_output_file: 861 panic_output_file.write(panic_output) 862 panic_output_file.flush() 863 864 cmd = [self.toolchain_prefix + 'gdb', 865 '--batch', '-n', 866 self.elf_file, 867 '-ex', "target remote | \"{python}\" \"{script}\" --target {target} \"{output_file}\"" 868 .format(python=sys.executable, 869 script=panic_output_decode_script, 870 target=self.target, 871 output_file=panic_output_file.name), 872 '-ex', 'bt'] 873 874 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) 875 yellow_print('\nBacktrace:\n\n') 876 self._print(output) 877 except subprocess.CalledProcessError as e: 878 yellow_print('Failed to run gdb_panic_server.py script: {}\n{}\n\n'.format(e, e.output)) 879 self._print(panic_output) 880 finally: 881 if panic_output_file is not None: 882 try: 883 os.unlink(panic_output_file.name) 884 except OSError as e: 885 yellow_print("Couldn't remove temporary panic output file ({})".format(e)) 886 887 def run_gdb(self): 888 with self: # disable console control 889 sys.stderr.write(ANSI_NORMAL) 890 try: 891 cmd = ['%sgdb' % self.toolchain_prefix, 892 '-ex', 'set serial baud %d' % self.serial.baudrate, 893 '-ex', 'target remote %s' % self.serial.port, 894 '-ex', 'interrupt', # monitor has already parsed the first 'reason' command, need a second 895 self.elf_file] 896 process = subprocess.Popen(cmd, cwd='.') 897 process.wait() 898 except OSError as e: 899 red_print('%s: %s' % (' '.join(cmd), e)) 900 except KeyboardInterrupt: 901 pass # happens on Windows, maybe other OSes 902 finally: 903 try: 904 # on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns... 905 process.terminate() 906 except Exception: 907 pass 908 try: 909 # also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode 910 subprocess.call(['stty', 'sane']) 911 except Exception: 912 pass # don't care if there's no stty, we tried... 913 self.prompt_next_action('gdb exited') 914 915 def output_enable(self, enable): 916 self._output_enabled = enable 917 918 def output_toggle(self): 919 self._output_enabled = not self._output_enabled 920 yellow_print('\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.'.format(self._output_enabled)) 921 922 def toggle_logging(self): 923 if self._log_file: 924 self.stop_logging() 925 else: 926 self.start_logging() 927 928 def start_logging(self): 929 if not self._log_file: 930 try: 931 name = 'log.{}.{}.txt'.format(os.path.splitext(os.path.basename(self.elf_file))[0], 932 datetime.datetime.now().strftime('%Y%m%d%H%M%S')) 933 self._log_file = open(name, 'wb+') 934 yellow_print('\nLogging is enabled into file {}'.format(name)) 935 except Exception as e: 936 red_print('\nLog file {} cannot be created: {}'.format(name, e)) 937 938 def stop_logging(self): 939 if self._log_file: 940 try: 941 name = self._log_file.name 942 self._log_file.close() 943 yellow_print('\nLogging is disabled and file {} has been closed'.format(name)) 944 except Exception as e: 945 red_print('\nLog file cannot be closed: {}'.format(e)) 946 finally: 947 self._log_file = None 948 949 def _print(self, string, console_printer=None): 950 if console_printer is None: 951 console_printer = self.console.write_bytes 952 if self._output_enabled: 953 console_printer(string) 954 if self._log_file: 955 try: 956 if isinstance(string, type(u'')): 957 string = string.encode() 958 self._log_file.write(string) 959 except Exception as e: 960 red_print('\nCannot write to file: {}'.format(e)) 961 # don't fill-up the screen with the previous errors (probably consequent prints would fail also) 962 self.stop_logging() 963 964 def handle_commands(self, cmd): 965 if cmd == CMD_STOP: 966 self.console_reader.stop() 967 self.serial_reader.stop() 968 elif cmd == CMD_RESET: 969 self.serial.setRTS(True) 970 self.serial.setDTR(self.serial.dtr) # usbser.sys workaround 971 time.sleep(0.2) 972 self.serial.setRTS(False) 973 self.serial.setDTR(self.serial.dtr) # usbser.sys workaround 974 self.output_enable(True) 975 elif cmd == CMD_MAKE: 976 self.run_make('encrypted-flash' if self.encrypted else 'flash') 977 elif cmd == CMD_APP_FLASH: 978 self.run_make('encrypted-app-flash' if self.encrypted else 'app-flash') 979 elif cmd == CMD_OUTPUT_TOGGLE: 980 self.output_toggle() 981 elif cmd == CMD_TOGGLE_LOGGING: 982 self.toggle_logging() 983 elif cmd == CMD_ENTER_BOOT: 984 self.serial.setDTR(False) # IO0=HIGH 985 self.serial.setRTS(True) # EN=LOW, chip in reset 986 self.serial.setDTR(self.serial.dtr) # usbser.sys workaround 987 time.sleep(1.3) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1 988 self.serial.setDTR(True) # IO0=LOW 989 self.serial.setRTS(False) # EN=HIGH, chip out of reset 990 self.serial.setDTR(self.serial.dtr) # usbser.sys workaround 991 time.sleep(0.45) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05 992 self.serial.setDTR(False) # IO0=HIGH, done 993 else: 994 raise RuntimeError('Bad command data %d' % (cmd)) 995 996 997def main(): 998 parser = argparse.ArgumentParser('idf_monitor - a serial output monitor for esp-idf') 999 1000 parser.add_argument( 1001 '--port', '-p', 1002 help='Serial port device', 1003 default=os.environ.get('ESPTOOL_PORT', '/dev/ttyUSB0') 1004 ) 1005 1006 parser.add_argument( 1007 '--disable-address-decoding', '-d', 1008 help="Don't print lines about decoded addresses from the application ELF file.", 1009 action='store_true', 1010 default=True if os.environ.get('ESP_MONITOR_DECODE') == 0 else False 1011 ) 1012 1013 parser.add_argument( 1014 '--baud', '-b', 1015 help='Serial port baud rate', 1016 type=int, 1017 default=os.getenv('IDF_MONITOR_BAUD', os.getenv('MONITORBAUD', 115200))) 1018 1019 parser.add_argument( 1020 '--make', '-m', 1021 help='Command to run make', 1022 type=str, default='make') 1023 1024 parser.add_argument( 1025 '--encrypted', 1026 help='Use encrypted targets while running make', 1027 action='store_true') 1028 1029 parser.add_argument( 1030 '--toolchain-prefix', 1031 help='Triplet prefix to add before cross-toolchain names', 1032 default=DEFAULT_TOOLCHAIN_PREFIX) 1033 1034 parser.add_argument( 1035 '--eol', 1036 choices=['CR', 'LF', 'CRLF'], 1037 type=lambda c: c.upper(), 1038 help='End of line to use when sending to the serial port', 1039 default='CR') 1040 1041 parser.add_argument( 1042 'elf_file', help='ELF file of application', 1043 type=argparse.FileType('rb')) 1044 1045 parser.add_argument( 1046 '--print_filter', 1047 help='Filtering string', 1048 default=DEFAULT_PRINT_FILTER) 1049 1050 parser.add_argument( 1051 '--decode-coredumps', 1052 choices=[COREDUMP_DECODE_INFO, COREDUMP_DECODE_DISABLE], 1053 default=COREDUMP_DECODE_INFO, 1054 help='Handling of core dumps found in serial output' 1055 ) 1056 1057 parser.add_argument( 1058 '--decode-panic', 1059 choices=[PANIC_DECODE_BACKTRACE, PANIC_DECODE_DISABLE], 1060 default=PANIC_DECODE_DISABLE, 1061 help='Handling of panic handler info found in serial output' 1062 ) 1063 1064 parser.add_argument( 1065 '--target', 1066 required=False, 1067 help='Target name (used when stack dump decoding is enabled)' 1068 ) 1069 1070 parser.add_argument( 1071 '--ws', 1072 default=os.environ.get('ESP_IDF_MONITOR_WS', None), 1073 help='WebSocket URL for communicating with IDE tools for debugging purposes' 1074 ) 1075 1076 args = parser.parse_args() 1077 1078 # GDB uses CreateFile to open COM port, which requires the COM name to be r'\\.\COMx' if the COM 1079 # number is larger than 10 1080 if os.name == 'nt' and args.port.startswith('COM'): 1081 args.port = args.port.replace('COM', r'\\.\COM') 1082 yellow_print('--- WARNING: GDB cannot open serial ports accessed as COMx') 1083 yellow_print('--- Using %s instead...' % args.port) 1084 elif args.port.startswith('/dev/tty.') and sys.platform == 'darwin': 1085 args.port = args.port.replace('/dev/tty.', '/dev/cu.') 1086 yellow_print('--- WARNING: Serial ports accessed as /dev/tty.* will hang gdb if launched.') 1087 yellow_print('--- Using %s instead...' % args.port) 1088 1089 serial_instance = serial.serial_for_url(args.port, args.baud, 1090 do_not_open=True) 1091 serial_instance.dtr = False 1092 serial_instance.rts = False 1093 1094 args.elf_file.close() # don't need this as a file 1095 1096 # remove the parallel jobserver arguments from MAKEFLAGS, as any 1097 # parent make is only running 1 job (monitor), so we can re-spawn 1098 # all of the child makes we need (the -j argument remains part of 1099 # MAKEFLAGS) 1100 try: 1101 makeflags = os.environ['MAKEFLAGS'] 1102 makeflags = re.sub(r'--jobserver[^ =]*=[0-9,]+ ?', '', makeflags) 1103 os.environ['MAKEFLAGS'] = makeflags 1104 except KeyError: 1105 pass # not running a make jobserver 1106 1107 # Pass the actual used port to callee of idf_monitor (e.g. make) through `ESPPORT` environment 1108 # variable 1109 # To make sure the key as well as the value are str type, by the requirements of subprocess 1110 espport_key = str('ESPPORT') 1111 espport_val = str(args.port) 1112 os.environ.update({espport_key: espport_val}) 1113 1114 ws = WebSocketClient(args.ws) if args.ws else None 1115 try: 1116 monitor = Monitor(serial_instance, args.elf_file.name, args.print_filter, args.make, args.encrypted, 1117 args.toolchain_prefix, args.eol, 1118 args.decode_coredumps, args.decode_panic, args.target, 1119 ws, enable_address_decoding=not args.disable_address_decoding) 1120 1121 yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format( 1122 p=serial_instance)) 1123 yellow_print('--- Quit: {} | Menu: {} | Help: {} followed by {} ---'.format( 1124 key_description(monitor.console_parser.exit_key), 1125 key_description(monitor.console_parser.menu_key), 1126 key_description(monitor.console_parser.menu_key), 1127 key_description(CTRL_H))) 1128 if args.print_filter != DEFAULT_PRINT_FILTER: 1129 yellow_print('--- Print filter: {} ---'.format(args.print_filter)) 1130 1131 monitor.main_loop() 1132 finally: 1133 if ws: 1134 ws.close() 1135 1136 1137class WebSocketClient(object): 1138 """ 1139 WebSocket client used to advertise debug events to WebSocket server by sending and receiving JSON-serialized 1140 dictionaries. 1141 1142 Advertisement of debug event: 1143 {'event': 'gdb_stub', 'port': '/dev/ttyUSB1', 'prog': 'build/elf_file'} for GDB Stub, or 1144 {'event': 'coredump', 'file': '/tmp/xy', 'prog': 'build/elf_file'} for coredump, 1145 where 'port' is the port for the connected device, 'prog' is the full path to the ELF file and 'file' is the 1146 generated coredump file. 1147 1148 Expected end of external debugging: 1149 {'event': 'debug_finished'} 1150 """ 1151 1152 RETRIES = 3 1153 CONNECTION_RETRY_DELAY = 1 1154 1155 def __init__(self, url): 1156 self.url = url 1157 self._connect() 1158 1159 def _connect(self): 1160 """ 1161 Connect to WebSocket server at url 1162 """ 1163 self.close() 1164 1165 for _ in range(self.RETRIES): 1166 try: 1167 self.ws = websocket.create_connection(self.url) 1168 break # success 1169 except NameError: 1170 raise RuntimeError('Please install the websocket_client package for IDE integration!') 1171 except Exception as e: 1172 red_print('WebSocket connection error: {}'.format(e)) 1173 time.sleep(self.CONNECTION_RETRY_DELAY) 1174 else: 1175 raise RuntimeError('Cannot connect to WebSocket server') 1176 1177 def close(self): 1178 try: 1179 self.ws.close() 1180 except AttributeError: 1181 # Not yet connected 1182 pass 1183 except Exception as e: 1184 red_print('WebSocket close error: {}'.format(e)) 1185 1186 def send(self, payload_dict): 1187 """ 1188 Serialize payload_dict in JSON format and send it to the server 1189 """ 1190 for _ in range(self.RETRIES): 1191 try: 1192 self.ws.send(json.dumps(payload_dict)) 1193 yellow_print('WebSocket sent: {}'.format(payload_dict)) 1194 break 1195 except Exception as e: 1196 red_print('WebSocket send error: {}'.format(e)) 1197 self._connect() 1198 else: 1199 raise RuntimeError('Cannot send to WebSocket server') 1200 1201 def wait(self, expect_iterable): 1202 """ 1203 Wait until a dictionary in JSON format is received from the server with all (key, value) tuples from 1204 expect_iterable. 1205 """ 1206 for _ in range(self.RETRIES): 1207 try: 1208 r = self.ws.recv() 1209 except Exception as e: 1210 red_print('WebSocket receive error: {}'.format(e)) 1211 self._connect() 1212 continue 1213 obj = json.loads(r) 1214 if all([k in obj and obj[k] == v for k, v in expect_iterable]): 1215 yellow_print('WebSocket received: {}'.format(obj)) 1216 break 1217 red_print('WebSocket expected: {}, received: {}'.format(dict(expect_iterable), obj)) 1218 else: 1219 raise RuntimeError('Cannot receive from WebSocket server') 1220 1221 1222if os.name == 'nt': 1223 # Windows console stuff 1224 1225 STD_OUTPUT_HANDLE = -11 1226 STD_ERROR_HANDLE = -12 1227 1228 # wincon.h values 1229 FOREGROUND_INTENSITY = 8 1230 FOREGROUND_GREY = 7 1231 1232 # matches the ANSI color change sequences that IDF sends 1233 RE_ANSI_COLOR = re.compile(b'\033\\[([01]);3([0-7])m') 1234 1235 # list mapping the 8 ANSI colors (the indexes) to Windows Console colors 1236 ANSI_TO_WINDOWS_COLOR = [0, 4, 2, 6, 1, 5, 3, 7] 1237 1238 GetStdHandle = ctypes.windll.kernel32.GetStdHandle 1239 SetConsoleTextAttribute = ctypes.windll.kernel32.SetConsoleTextAttribute 1240 1241 class ANSIColorConverter(object): 1242 """Class to wrap a file-like output stream, intercept ANSI color codes, 1243 and convert them into calls to Windows SetConsoleTextAttribute. 1244 1245 Doesn't support all ANSI terminal code escape sequences, only the sequences IDF uses. 1246 1247 Ironically, in Windows this console output is normally wrapped by winpty which will then detect the console text 1248 color changes and convert these back to ANSI color codes for MSYS' terminal to display. However this is the 1249 least-bad working solution, as winpty doesn't support any "passthrough" mode for raw output. 1250 """ 1251 1252 def __init__(self, output=None, decode_output=False): 1253 self.output = output 1254 self.decode_output = decode_output 1255 self.handle = GetStdHandle(STD_ERROR_HANDLE if self.output == sys.stderr else STD_OUTPUT_HANDLE) 1256 self.matched = b'' 1257 1258 def _output_write(self, data): 1259 try: 1260 if self.decode_output: 1261 self.output.write(data.decode()) 1262 else: 1263 self.output.write(data) 1264 except (IOError, OSError): 1265 # Windows 10 bug since the Fall Creators Update, sometimes writing to console randomly throws 1266 # an exception (however, the character is still written to the screen) 1267 # Ref https://github.com/espressif/esp-idf/issues/1163 1268 # 1269 # Also possible for Windows to throw an OSError error if the data is invalid for the console 1270 # (garbage bytes, etc) 1271 pass 1272 1273 def write(self, data): 1274 if isinstance(data, bytes): 1275 data = bytearray(data) 1276 else: 1277 data = bytearray(data, 'utf-8') 1278 for b in data: 1279 b = bytes([b]) 1280 length = len(self.matched) 1281 if b == b'\033': # ESC 1282 self.matched = b 1283 elif (length == 1 and b == b'[') or (1 < length < 7): 1284 self.matched += b 1285 if self.matched == ANSI_NORMAL.encode('latin-1'): # reset console 1286 # Flush is required only with Python3 - switching color before it is printed would mess up the console 1287 self.flush() 1288 SetConsoleTextAttribute(self.handle, FOREGROUND_GREY) 1289 self.matched = b'' 1290 elif len(self.matched) == 7: # could be an ANSI sequence 1291 m = re.match(RE_ANSI_COLOR, self.matched) 1292 if m is not None: 1293 color = ANSI_TO_WINDOWS_COLOR[int(m.group(2))] 1294 if m.group(1) == b'1': 1295 color |= FOREGROUND_INTENSITY 1296 # Flush is required only with Python3 - switching color before it is printed would mess up the console 1297 self.flush() 1298 SetConsoleTextAttribute(self.handle, color) 1299 else: 1300 self._output_write(self.matched) # not an ANSI color code, display verbatim 1301 self.matched = b'' 1302 else: 1303 self._output_write(b) 1304 self.matched = b'' 1305 1306 def flush(self): 1307 try: 1308 self.output.flush() 1309 except OSError: 1310 # Account for Windows Console refusing to accept garbage bytes (serial noise, etc) 1311 pass 1312 1313 1314if __name__ == '__main__': 1315 main() 1316