1#!/usr/bin/env python
2#
3# esp-idf serial output monitor tool. Does some helpful things:
4# - Looks up hex addresses in ELF file with addr2line
5# - Reset ESP32 via serial RTS line (Ctrl-T Ctrl-R)
6# - Run flash build target to rebuild and flash entire project (Ctrl-T Ctrl-F)
7# - Run app-flash build target to rebuild and flash app only (Ctrl-T Ctrl-A)
8# - If gdbstub output is detected, gdb is automatically loaded
9# - If core dump output is detected, it is converted to a human-readable report
10#   by espcoredump.py.
11#
12# Copyright 2015-2016 Espressif Systems (Shanghai) PTE LTD
13#
14# Licensed under the Apache License, Version 2.0 (the "License");
15# you may not use this file except in compliance with the License.
16# You may obtain a copy of the License at
17#
18#     http://www.apache.org/licenses/LICENSE-2.0
19#
20# Unless required by applicable law or agreed to in writing, software
21# distributed under the License is distributed on an "AS IS" BASIS,
22# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
23# See the License for the specific language governing permissions and
24# limitations under the License.
25#
26# Contains elements taken from miniterm "Very simple serial terminal" which
27# is part of pySerial. https://github.com/pyserial/pyserial
28# (C)2002-2015 Chris Liechti <cliechti@gmx.net>
29#
30# Originally released under BSD-3-Clause license.
31#
32from __future__ import division, print_function, unicode_literals
33
34import argparse
35import codecs
36import datetime
37import os
38import re
39import subprocess
40from builtins import bytes, chr, object
41
42try:
43    import queue
44except ImportError:
45    import Queue as queue
46
47import ctypes
48import json
49import shlex
50import sys
51import tempfile
52import textwrap
53import threading
54import time
55import types
56from distutils.version import StrictVersion
57from io import open
58
59import serial
60import serial.tools.list_ports
61import serial.tools.miniterm as miniterm
62
63try:
64    import websocket
65except ImportError:
66    # This is needed for IDE integration only.
67    pass
68
69key_description = miniterm.key_description
70
71# Control-key characters
72CTRL_A = '\x01'
73CTRL_B = '\x02'
74CTRL_F = '\x06'
75CTRL_H = '\x08'
76CTRL_R = '\x12'
77CTRL_T = '\x14'
78CTRL_Y = '\x19'
79CTRL_P = '\x10'
80CTRL_X = '\x18'
81CTRL_L = '\x0c'
82CTRL_RBRACKET = '\x1d'  # Ctrl+]
83
84# Command parsed from console inputs
85CMD_STOP = 1
86CMD_RESET = 2
87CMD_MAKE = 3
88CMD_APP_FLASH = 4
89CMD_OUTPUT_TOGGLE = 5
90CMD_TOGGLE_LOGGING = 6
91CMD_ENTER_BOOT = 7
92
93# ANSI terminal codes (if changed, regular expressions in LineMatcher need to be udpated)
94ANSI_RED = '\033[1;31m'
95ANSI_YELLOW = '\033[0;33m'
96ANSI_NORMAL = '\033[0m'
97
98
99def color_print(message, color, newline='\n'):
100    """ Print a message to stderr with colored highlighting """
101    sys.stderr.write('%s%s%s%s' % (color, message,  ANSI_NORMAL, newline))
102
103
104def yellow_print(message, newline='\n'):
105    color_print(message, ANSI_YELLOW, newline)
106
107
108def red_print(message, newline='\n'):
109    color_print(message, ANSI_RED, newline)
110
111
112__version__ = '1.1'
113
114# Tags for tuples in queues
115TAG_KEY = 0
116TAG_SERIAL = 1
117TAG_SERIAL_FLUSH = 2
118TAG_CMD = 3
119
120# regex matches an potential PC value (0x4xxxxxxx)
121MATCH_PCADDR = re.compile(r'0x4[0-9a-f]{7}', re.IGNORECASE)
122
123DEFAULT_TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-'
124
125DEFAULT_PRINT_FILTER = ''
126
127# coredump related messages
128COREDUMP_UART_START = b'================= CORE DUMP START ================='
129COREDUMP_UART_END = b'================= CORE DUMP END ================='
130COREDUMP_UART_PROMPT = b'Press Enter to print core dump to UART...'
131
132# coredump states
133COREDUMP_IDLE = 0
134COREDUMP_READING = 1
135COREDUMP_DONE = 2
136
137# coredump decoding options
138COREDUMP_DECODE_DISABLE = 'disable'
139COREDUMP_DECODE_INFO = 'info'
140
141# panic handler related messages
142PANIC_START = r'Core \s*\d+ register dump:'
143PANIC_END = b'ELF file SHA256:'
144PANIC_STACK_DUMP = b'Stack memory:'
145
146# panic handler decoding states
147PANIC_IDLE = 0
148PANIC_READING = 1
149
150# panic handler decoding options
151PANIC_DECODE_DISABLE = 'disable'
152PANIC_DECODE_BACKTRACE = 'backtrace'
153
154
155class StoppableThread(object):
156    """
157    Provide a Thread-like class which can be 'cancelled' via a subclass-provided
158    cancellation method.
159
160    Can be started and stopped multiple times.
161
162    Isn't an instance of type Thread because Python Thread objects can only be run once
163    """
164    def __init__(self):
165        self._thread = None
166
167    @property
168    def alive(self):
169        """
170        Is 'alive' whenever the internal thread object exists
171        """
172        return self._thread is not None
173
174    def start(self):
175        if self._thread is None:
176            self._thread = threading.Thread(target=self._run_outer)
177            self._thread.start()
178
179    def _cancel(self):
180        pass  # override to provide cancellation functionality
181
182    def run(self):
183        pass  # override for the main thread behaviour
184
185    def _run_outer(self):
186        try:
187            self.run()
188        finally:
189            self._thread = None
190
191    def stop(self):
192        if self._thread is not None:
193            old_thread = self._thread
194            self._thread = None
195            self._cancel()
196            old_thread.join()
197
198
199class ConsoleReader(StoppableThread):
200    """ Read input keys from the console and push them to the queue,
201    until stopped.
202    """
203    def __init__(self, console, event_queue, cmd_queue, parser, test_mode):
204        super(ConsoleReader, self).__init__()
205        self.console = console
206        self.event_queue = event_queue
207        self.cmd_queue = cmd_queue
208        self.parser = parser
209        self.test_mode = test_mode
210
211    def run(self):
212        self.console.setup()
213        try:
214            while self.alive:
215                try:
216                    if os.name == 'nt':
217                        # Windows kludge: because the console.cancel() method doesn't
218                        # seem to work to unblock getkey() on the Windows implementation.
219                        #
220                        # So we only call getkey() if we know there's a key waiting for us.
221                        import msvcrt
222                        while not msvcrt.kbhit() and self.alive:
223                            time.sleep(0.1)
224                        if not self.alive:
225                            break
226                    elif self.test_mode:
227                        # In testing mode the stdin is connected to PTY but is not used for input anything. For PTY
228                        # the canceling by fcntl.ioctl isn't working and would hang in self.console.getkey().
229                        # Therefore, we avoid calling it.
230                        while self.alive:
231                            time.sleep(0.1)
232                        break
233                    c = self.console.getkey()
234                except KeyboardInterrupt:
235                    c = '\x03'
236                if c is not None:
237                    ret = self.parser.parse(c)
238                    if ret is not None:
239                        (tag, cmd) = ret
240                        # stop command should be executed last
241                        if tag == TAG_CMD and cmd != CMD_STOP:
242                            self.cmd_queue.put(ret)
243                        else:
244                            self.event_queue.put(ret)
245
246        finally:
247            self.console.cleanup()
248
249    def _cancel(self):
250        if os.name == 'posix' and not self.test_mode:
251            # this is the way cancel() is implemented in pyserial 3.3 or newer,
252            # older pyserial (3.1+) has cancellation implemented via 'select',
253            # which does not work when console sends an escape sequence response
254            #
255            # even older pyserial (<3.1) does not have this method
256            #
257            # on Windows there is a different (also hacky) fix, applied above.
258            #
259            # note that TIOCSTI is not implemented in WSL / bash-on-Windows.
260            # TODO: introduce some workaround to make it work there.
261            #
262            # Note: This would throw exception in testing mode when the stdin is connected to PTY.
263            import fcntl
264            import termios
265            fcntl.ioctl(self.console.fd, termios.TIOCSTI, b'\0')
266
267
268class ConsoleParser(object):
269
270    def __init__(self, eol='CRLF'):
271        self.translate_eol = {
272            'CRLF': lambda c: c.replace('\n', '\r\n'),
273            'CR': lambda c: c.replace('\n', '\r'),
274            'LF': lambda c: c.replace('\r', '\n'),
275        }[eol]
276        self.menu_key = CTRL_T
277        self.exit_key = CTRL_RBRACKET
278        self._pressed_menu_key = False
279
280    def parse(self, key):
281        ret = None
282        if self._pressed_menu_key:
283            ret = self._handle_menu_key(key)
284        elif key == self.menu_key:
285            self._pressed_menu_key = True
286        elif key == self.exit_key:
287            ret = (TAG_CMD, CMD_STOP)
288        else:
289            key = self.translate_eol(key)
290            ret = (TAG_KEY, key)
291        return ret
292
293    def _handle_menu_key(self, c):
294        ret = None
295        if c == self.exit_key or c == self.menu_key:  # send verbatim
296            ret = (TAG_KEY, c)
297        elif c in [CTRL_H, 'h', 'H', '?']:
298            red_print(self.get_help_text())
299        elif c == CTRL_R:  # Reset device via RTS
300            ret = (TAG_CMD, CMD_RESET)
301        elif c == CTRL_F:  # Recompile & upload
302            ret = (TAG_CMD, CMD_MAKE)
303        elif c in [CTRL_A, 'a', 'A']:  # Recompile & upload app only
304            # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
305            # instead
306            ret = (TAG_CMD, CMD_APP_FLASH)
307        elif c == CTRL_Y:  # Toggle output display
308            ret = (TAG_CMD, CMD_OUTPUT_TOGGLE)
309        elif c == CTRL_L:  # Toggle saving output into file
310            ret = (TAG_CMD, CMD_TOGGLE_LOGGING)
311        elif c == CTRL_P:
312            yellow_print('Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart')
313            # to fast trigger pause without press menu key
314            ret = (TAG_CMD, CMD_ENTER_BOOT)
315        elif c in [CTRL_X, 'x', 'X']:  # Exiting from within the menu
316            ret = (TAG_CMD, CMD_STOP)
317        else:
318            red_print('--- unknown menu character {} --'.format(key_description(c)))
319
320        self._pressed_menu_key = False
321        return ret
322
323    def get_help_text(self):
324        text = """\
325            --- idf_monitor ({version}) - ESP-IDF monitor tool
326            --- based on miniterm from pySerial
327            ---
328            --- {exit:8} Exit program
329            --- {menu:8} Menu escape key, followed by:
330            --- Menu keys:
331            ---    {menu:14} Send the menu character itself to remote
332            ---    {exit:14} Send the exit character itself to remote
333            ---    {reset:14} Reset target board via RTS line
334            ---    {makecmd:14} Build & flash project
335            ---    {appmake:14} Build & flash app only
336            ---    {output:14} Toggle output display
337            ---    {log:14} Toggle saving output into file
338            ---    {pause:14} Reset target into bootloader to pause app via RTS line
339            ---    {menuexit:14} Exit program
340        """.format(version=__version__,
341                   exit=key_description(self.exit_key),
342                   menu=key_description(self.menu_key),
343                   reset=key_description(CTRL_R),
344                   makecmd=key_description(CTRL_F),
345                   appmake=key_description(CTRL_A) + ' (or A)',
346                   output=key_description(CTRL_Y),
347                   log=key_description(CTRL_L),
348                   pause=key_description(CTRL_P),
349                   menuexit=key_description(CTRL_X) + ' (or X)')
350        return textwrap.dedent(text)
351
352    def get_next_action_text(self):
353        text = """\
354            --- Press {} to exit monitor.
355            --- Press {} to build & flash project.
356            --- Press {} to build & flash app.
357            --- Press any other key to resume monitor (resets target).
358        """.format(key_description(self.exit_key),
359                   key_description(CTRL_F),
360                   key_description(CTRL_A))
361        return textwrap.dedent(text)
362
363    def parse_next_action_key(self, c):
364        ret = None
365        if c == self.exit_key:
366            ret = (TAG_CMD, CMD_STOP)
367        elif c == CTRL_F:  # Recompile & upload
368            ret = (TAG_CMD, CMD_MAKE)
369        elif c in [CTRL_A, 'a', 'A']:  # Recompile & upload app only
370            # "CTRL-A" cannot be captured with the default settings of the Windows command line, therefore, "A" can be used
371            # instead
372            ret = (TAG_CMD, CMD_APP_FLASH)
373        return ret
374
375
376class SerialReader(StoppableThread):
377    """ Read serial data from the serial port and push to the
378    event queue, until stopped.
379    """
380    def __init__(self, serial, event_queue):
381        super(SerialReader, self).__init__()
382        self.baud = serial.baudrate
383        self.serial = serial
384        self.event_queue = event_queue
385        if not hasattr(self.serial, 'cancel_read'):
386            # enable timeout for checking alive flag,
387            # if cancel_read not available
388            self.serial.timeout = 0.25
389
390    def run(self):
391        if not self.serial.is_open:
392            self.serial.baudrate = self.baud
393            self.serial.rts = True  # Force an RTS reset on open
394            self.serial.open()
395            time.sleep(0.005)  # Add a delay to meet the requirements of minimal EN low time (2ms for ESP32-C3)
396            self.serial.rts = False
397            self.serial.dtr = self.serial.dtr   # usbser.sys workaround
398        try:
399            while self.alive:
400                try:
401                    data = self.serial.read(self.serial.in_waiting or 1)
402                except (serial.serialutil.SerialException, IOError) as e:
403                    data = b''
404                    # self.serial.open() was successful before, therefore, this is an issue related to
405                    # the disappearance of the device
406                    red_print(e)
407                    yellow_print('Waiting for the device to reconnect', newline='')
408                    self.serial.close()
409                    while self.alive:  # so that exiting monitor works while waiting
410                        try:
411                            time.sleep(0.5)
412                            self.serial.open()
413                            break  # device connected
414                        except serial.serialutil.SerialException:
415                            yellow_print('.', newline='')
416                            sys.stderr.flush()
417                    yellow_print('')  # go to new line
418                if len(data):
419                    self.event_queue.put((TAG_SERIAL, data), False)
420        finally:
421            self.serial.close()
422
423    def _cancel(self):
424        if hasattr(self.serial, 'cancel_read'):
425            try:
426                self.serial.cancel_read()
427            except Exception:
428                pass
429
430
431class LineMatcher(object):
432    """
433    Assembles a dictionary of filtering rules based on the --print_filter
434    argument of idf_monitor. Then later it is used to match lines and
435    determine whether they should be shown on screen or not.
436    """
437    LEVEL_N = 0
438    LEVEL_E = 1
439    LEVEL_W = 2
440    LEVEL_I = 3
441    LEVEL_D = 4
442    LEVEL_V = 5
443
444    level = {'N': LEVEL_N, 'E': LEVEL_E, 'W': LEVEL_W, 'I': LEVEL_I, 'D': LEVEL_D,
445             'V': LEVEL_V, '*': LEVEL_V, '': LEVEL_V}
446
447    def __init__(self, print_filter):
448        self._dict = dict()
449        self._re = re.compile(r'^(?:\033\[[01];?[0-9]+m?)?([EWIDV]) \([0-9]+\) ([^:]+): ')
450        items = print_filter.split()
451        if len(items) == 0:
452            self._dict['*'] = self.LEVEL_V  # default is to print everything
453        for f in items:
454            s = f.split(r':')
455            if len(s) == 1:
456                # specifying no warning level defaults to verbose level
457                lev = self.LEVEL_V
458            elif len(s) == 2:
459                if len(s[0]) == 0:
460                    raise ValueError('No tag specified in filter ' + f)
461                try:
462                    lev = self.level[s[1].upper()]
463                except KeyError:
464                    raise ValueError('Unknown warning level in filter ' + f)
465            else:
466                raise ValueError('Missing ":" in filter ' + f)
467            self._dict[s[0]] = lev
468
469    def match(self, line):
470        try:
471            m = self._re.search(line)
472            if m:
473                lev = self.level[m.group(1)]
474                if m.group(2) in self._dict:
475                    return self._dict[m.group(2)] >= lev
476                return self._dict.get('*', self.LEVEL_N) >= lev
477        except (KeyError, IndexError):
478            # Regular line written with something else than ESP_LOG*
479            # or an empty line.
480            pass
481        # We need something more than "*.N" for printing.
482        return self._dict.get('*', self.LEVEL_N) > self.LEVEL_N
483
484
485class SerialStopException(Exception):
486    """
487    This exception is used for stopping the IDF monitor in testing mode.
488    """
489    pass
490
491
492class Monitor(object):
493    """
494    Monitor application main class.
495
496    This was originally derived from miniterm.Miniterm, but it turned out to be easier to write from scratch for this
497    purpose.
498
499    Main difference is that all event processing happens in the main thread, not the worker threads.
500    """
501    def __init__(self, serial_instance, elf_file, print_filter, make='make', encrypted=False,
502                 toolchain_prefix=DEFAULT_TOOLCHAIN_PREFIX, eol='CRLF',
503                 decode_coredumps=COREDUMP_DECODE_INFO,
504                 decode_panic=PANIC_DECODE_DISABLE,
505                 target=None,
506                 websocket_client=None,
507                 enable_address_decoding=True):
508        super(Monitor, self).__init__()
509        self.event_queue = queue.Queue()
510        self.cmd_queue = queue.Queue()
511        self.console = miniterm.Console()
512        self.enable_address_decoding = enable_address_decoding
513        if os.name == 'nt':
514            sys.stderr = ANSIColorConverter(sys.stderr, decode_output=True)
515            self.console.output = ANSIColorConverter(self.console.output)
516            self.console.byte_output = ANSIColorConverter(self.console.byte_output)
517
518        if StrictVersion(serial.VERSION) < StrictVersion('3.3.0'):
519            # Use Console.getkey implementation from 3.3.0 (to be in sync with the ConsoleReader._cancel patch above)
520            def getkey_patched(self):
521                c = self.enc_stdin.read(1)
522                if c == chr(0x7f):
523                    c = chr(8)    # map the BS key (which yields DEL) to backspace
524                return c
525
526            self.console.getkey = types.MethodType(getkey_patched, self.console)
527
528        socket_mode = serial_instance.port.startswith('socket://')  # testing hook - data from serial can make exit the monitor
529        self.serial = serial_instance
530        self.console_parser = ConsoleParser(eol)
531        self.console_reader = ConsoleReader(self.console, self.event_queue, self.cmd_queue, self.console_parser, socket_mode)
532        self.serial_reader = SerialReader(self.serial, self.event_queue)
533        self.elf_file = elf_file
534        if not os.path.exists(make):
535            self.make = shlex.split(make)  # allow for possibility the "make" arg is a list of arguments (for idf.py)
536        else:
537            self.make = make
538        self.encrypted = encrypted
539        self.toolchain_prefix = toolchain_prefix
540        self.websocket_client = websocket_client
541        self.target = target
542
543        # internal state
544        self._last_line_part = b''
545        self._gdb_buffer = b''
546        self._pc_address_buffer = b''
547        self._line_matcher = LineMatcher(print_filter)
548        self._invoke_processing_last_line_timer = None
549        self._force_line_print = False
550        self._output_enabled = True
551        self._serial_check_exit = socket_mode
552        self._log_file = None
553        self._decode_coredumps = decode_coredumps
554        self._reading_coredump = COREDUMP_IDLE
555        self._coredump_buffer = b''
556        self._decode_panic = decode_panic
557        self._reading_panic = PANIC_IDLE
558        self._panic_buffer = b''
559
560    def invoke_processing_last_line(self):
561        self.event_queue.put((TAG_SERIAL_FLUSH, b''), False)
562
563    def main_loop(self):
564        self.console_reader.start()
565        self.serial_reader.start()
566        try:
567            while self.console_reader.alive and self.serial_reader.alive:
568                try:
569                    item = self.cmd_queue.get_nowait()
570                except queue.Empty:
571                    try:
572                        item = self.event_queue.get(True, 0.03)
573                    except queue.Empty:
574                        continue
575
576                (event_tag, data) = item
577                if event_tag == TAG_CMD:
578                    self.handle_commands(data)
579                elif event_tag == TAG_KEY:
580                    try:
581                        self.serial.write(codecs.encode(data))
582                    except serial.SerialException:
583                        pass  # this shouldn't happen, but sometimes port has closed in serial thread
584                    except UnicodeEncodeError:
585                        pass  # this can happen if a non-ascii character was passed, ignoring
586                elif event_tag == TAG_SERIAL:
587                    self.handle_serial_input(data)
588                    if self._invoke_processing_last_line_timer is not None:
589                        self._invoke_processing_last_line_timer.cancel()
590                    self._invoke_processing_last_line_timer = threading.Timer(0.1, self.invoke_processing_last_line)
591                    self._invoke_processing_last_line_timer.start()
592                    # If no further data is received in the next short period
593                    # of time then the _invoke_processing_last_line_timer
594                    # generates an event which will result in the finishing of
595                    # the last line. This is fix for handling lines sent
596                    # without EOL.
597                elif event_tag == TAG_SERIAL_FLUSH:
598                    self.handle_serial_input(data, finalize_line=True)
599                else:
600                    raise RuntimeError('Bad event data %r' % ((event_tag,data),))
601        except SerialStopException:
602            sys.stderr.write(ANSI_NORMAL + 'Stopping condition has been received\n')
603        finally:
604            try:
605                self.console_reader.stop()
606                self.serial_reader.stop()
607                self.stop_logging()
608                # Cancelling _invoke_processing_last_line_timer is not
609                # important here because receiving empty data doesn't matter.
610                self._invoke_processing_last_line_timer = None
611            except Exception:
612                pass
613            sys.stderr.write(ANSI_NORMAL + '\n')
614
615    def handle_serial_input(self, data, finalize_line=False):
616        sp = data.split(b'\n')
617        if self._last_line_part != b'':
618            # add unprocessed part from previous "data" to the first line
619            sp[0] = self._last_line_part + sp[0]
620            self._last_line_part = b''
621        if sp[-1] != b'':
622            # last part is not a full line
623            self._last_line_part = sp.pop()
624        for line in sp:
625            if line != b'':
626                if self._serial_check_exit and line == self.console_parser.exit_key.encode('latin-1'):
627                    raise SerialStopException()
628                self.check_panic_decode_trigger(line)
629                self.check_coredump_trigger_before_print(line)
630                if self._force_line_print or self._line_matcher.match(line.decode(errors='ignore')):
631                    self._print(line + b'\n')
632                    self.handle_possible_pc_address_in_line(line)
633                self.check_coredump_trigger_after_print(line)
634                self.check_gdbstub_trigger(line)
635                self._force_line_print = False
636        # Now we have the last part (incomplete line) in _last_line_part. By
637        # default we don't touch it and just wait for the arrival of the rest
638        # of the line. But after some time when we didn't received it we need
639        # to make a decision.
640        if self._last_line_part != b'':
641            if self._force_line_print or (finalize_line and self._line_matcher.match(self._last_line_part.decode(errors='ignore'))):
642                self._force_line_print = True
643                self._print(self._last_line_part)
644                self.handle_possible_pc_address_in_line(self._last_line_part)
645                self.check_gdbstub_trigger(self._last_line_part)
646                # It is possible that the incomplete line cuts in half the PC
647                # address. A small buffer is kept and will be used the next time
648                # handle_possible_pc_address_in_line is invoked to avoid this problem.
649                # MATCH_PCADDR matches 10 character long addresses. Therefore, we
650                # keep the last 9 characters.
651                self._pc_address_buffer = self._last_line_part[-9:]
652                # GDB sequence can be cut in half also. GDB sequence is 7
653                # characters long, therefore, we save the last 6 characters.
654                self._gdb_buffer = self._last_line_part[-6:]
655                self._last_line_part = b''
656        # else: keeping _last_line_part and it will be processed the next time
657        # handle_serial_input is invoked
658
659    def handle_possible_pc_address_in_line(self, line):
660        line = self._pc_address_buffer + line
661        self._pc_address_buffer = b''
662        if self.enable_address_decoding:
663            for m in re.finditer(MATCH_PCADDR, line.decode(errors='ignore')):
664                self.lookup_pc_address(m.group())
665
666    def __enter__(self):
667        """ Use 'with self' to temporarily disable monitoring behaviour """
668        self.serial_reader.stop()
669        self.console_reader.stop()
670
671    def __exit__(self, *args, **kwargs):
672        """ Use 'with self' to temporarily disable monitoring behaviour """
673        self.console_reader.start()
674        self.serial_reader.start()
675
676    def prompt_next_action(self, reason):
677        self.console.setup()  # set up console to trap input characters
678        try:
679            red_print('--- {}'.format(reason))
680            red_print(self.console_parser.get_next_action_text())
681
682            k = CTRL_T  # ignore CTRL-T here, so people can muscle-memory Ctrl-T Ctrl-F, etc.
683            while k == CTRL_T:
684                k = self.console.getkey()
685        finally:
686            self.console.cleanup()
687        ret = self.console_parser.parse_next_action_key(k)
688        if ret is not None:
689            cmd = ret[1]
690            if cmd == CMD_STOP:
691                # the stop command should be handled last
692                self.event_queue.put(ret)
693            else:
694                self.cmd_queue.put(ret)
695
696    def run_make(self, target):
697        with self:
698            if isinstance(self.make, list):
699                popen_args = self.make + [target]
700            else:
701                popen_args = [self.make, target]
702            yellow_print('Running %s...' % ' '.join(popen_args))
703            p = subprocess.Popen(popen_args, env=os.environ)
704            try:
705                p.wait()
706            except KeyboardInterrupt:
707                p.wait()
708            if p.returncode != 0:
709                self.prompt_next_action('Build failed')
710            else:
711                self.output_enable(True)
712
713    def lookup_pc_address(self, pc_addr):
714        cmd = ['%saddr2line' % self.toolchain_prefix,
715               '-pfiaC', '-e', self.elf_file, pc_addr]
716        try:
717            translation = subprocess.check_output(cmd, cwd='.')
718            if b'?? ??:0' not in translation:
719                self._print(translation.decode(), console_printer=yellow_print)
720        except OSError as e:
721            red_print('%s: %s' % (' '.join(cmd), e))
722
723    def check_gdbstub_trigger(self, line):
724        line = self._gdb_buffer + line
725        self._gdb_buffer = b''
726        m = re.search(b'\\$(T..)#(..)', line)  # look for a gdb "reason" for a break
727        if m is not None:
728            try:
729                chsum = sum(ord(bytes([p])) for p in m.group(1)) & 0xFF
730                calc_chsum = int(m.group(2), 16)
731            except ValueError:
732                return  # payload wasn't valid hex digits
733            if chsum == calc_chsum:
734                if self.websocket_client:
735                    yellow_print('Communicating through WebSocket')
736                    self.websocket_client.send({'event': 'gdb_stub',
737                                                'port': self.serial.port,
738                                                'prog': self.elf_file})
739                    yellow_print('Waiting for debug finished event')
740                    self.websocket_client.wait([('event', 'debug_finished')])
741                    yellow_print('Communications through WebSocket is finished')
742                else:
743                    self.run_gdb()
744            else:
745                red_print('Malformed gdb message... calculated checksum %02x received %02x' % (chsum, calc_chsum))
746
747    def check_coredump_trigger_before_print(self, line):
748        if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
749            return
750
751        if COREDUMP_UART_PROMPT in line:
752            yellow_print('Initiating core dump!')
753            self.event_queue.put((TAG_KEY, '\n'))
754            return
755
756        if COREDUMP_UART_START in line:
757            yellow_print('Core dump started (further output muted)')
758            self._reading_coredump = COREDUMP_READING
759            self._coredump_buffer = b''
760            self._output_enabled = False
761            return
762
763        if COREDUMP_UART_END in line:
764            self._reading_coredump = COREDUMP_DONE
765            yellow_print('\nCore dump finished!')
766            self.process_coredump()
767            return
768
769        if self._reading_coredump == COREDUMP_READING:
770            kb = 1024
771            buffer_len_kb = len(self._coredump_buffer) // kb
772            self._coredump_buffer += line.replace(b'\r', b'') + b'\n'
773            new_buffer_len_kb = len(self._coredump_buffer) // kb
774            if new_buffer_len_kb > buffer_len_kb:
775                yellow_print('Received %3d kB...' % (new_buffer_len_kb), newline='\r')
776
777    def check_coredump_trigger_after_print(self, line):
778        if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
779            return
780
781        # Re-enable output after the last line of core dump has been consumed
782        if not self._output_enabled and self._reading_coredump == COREDUMP_DONE:
783            self._reading_coredump = COREDUMP_IDLE
784            self._output_enabled = True
785            self._coredump_buffer = b''
786
787    def process_coredump(self):
788        if self._decode_coredumps != COREDUMP_DECODE_INFO:
789            raise NotImplementedError('process_coredump: %s not implemented' % self._decode_coredumps)
790
791        coredump_script = os.path.join(os.path.dirname(__file__), '..', 'components', 'espcoredump', 'espcoredump.py')
792        coredump_file = None
793        try:
794            # On Windows, the temporary file can't be read unless it is closed.
795            # Set delete=False and delete the file manually later.
796            with tempfile.NamedTemporaryFile(mode='wb', delete=False) as coredump_file:
797                coredump_file.write(self._coredump_buffer)
798                coredump_file.flush()
799
800            if self.websocket_client:
801                self._output_enabled = True
802                yellow_print('Communicating through WebSocket')
803                self.websocket_client.send({'event': 'coredump',
804                                            'file': coredump_file.name,
805                                            'prog': self.elf_file})
806                yellow_print('Waiting for debug finished event')
807                self.websocket_client.wait([('event', 'debug_finished')])
808                yellow_print('Communications through WebSocket is finished')
809            else:
810                cmd = [sys.executable,
811                       coredump_script,
812                       'info_corefile',
813                       '--core', coredump_file.name,
814                       '--core-format', 'b64',
815                       self.elf_file
816                       ]
817                output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
818                self._output_enabled = True
819                self._print(output)
820                self._output_enabled = False  # Will be reenabled in check_coredump_trigger_after_print
821        except subprocess.CalledProcessError as e:
822            yellow_print('Failed to run espcoredump script: {}\n{}\n\n'.format(e, e.output))
823            self._output_enabled = True
824            self._print(COREDUMP_UART_START + b'\n')
825            self._print(self._coredump_buffer)
826            # end line will be printed in handle_serial_input
827        finally:
828            if coredump_file is not None:
829                try:
830                    os.unlink(coredump_file.name)
831                except OSError as e:
832                    yellow_print("Couldn't remote temporary core dump file ({})".format(e))
833
834    def check_panic_decode_trigger(self, line):
835        if self._decode_panic == PANIC_DECODE_DISABLE:
836            return
837
838        if self._reading_panic == PANIC_IDLE and re.search(PANIC_START, line.decode('ascii', errors='ignore')):
839            self._reading_panic = PANIC_READING
840            yellow_print('Stack dump detected')
841
842        if self._reading_panic == PANIC_READING and PANIC_STACK_DUMP in line:
843            self._output_enabled = False
844
845        if self._reading_panic == PANIC_READING:
846            self._panic_buffer += line.replace(b'\r', b'') + b'\n'
847
848        if self._reading_panic == PANIC_READING and PANIC_END in line:
849            self._reading_panic = PANIC_IDLE
850            self._output_enabled = True
851            self.process_panic_output(self._panic_buffer)
852            self._panic_buffer = b''
853
854    def process_panic_output(self, panic_output):
855        panic_output_decode_script = os.path.join(os.path.dirname(__file__), '..', 'tools', 'gdb_panic_server.py')
856        panic_output_file = None
857        try:
858            # On Windows, the temporary file can't be read unless it is closed.
859            # Set delete=False and delete the file manually later.
860            with tempfile.NamedTemporaryFile(mode='wb', delete=False) as panic_output_file:
861                panic_output_file.write(panic_output)
862                panic_output_file.flush()
863
864            cmd = [self.toolchain_prefix + 'gdb',
865                   '--batch', '-n',
866                   self.elf_file,
867                   '-ex', "target remote | \"{python}\" \"{script}\" --target {target} \"{output_file}\""
868                   .format(python=sys.executable,
869                           script=panic_output_decode_script,
870                           target=self.target,
871                           output_file=panic_output_file.name),
872                   '-ex', 'bt']
873
874            output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
875            yellow_print('\nBacktrace:\n\n')
876            self._print(output)
877        except subprocess.CalledProcessError as e:
878            yellow_print('Failed to run gdb_panic_server.py script: {}\n{}\n\n'.format(e, e.output))
879            self._print(panic_output)
880        finally:
881            if panic_output_file is not None:
882                try:
883                    os.unlink(panic_output_file.name)
884                except OSError as e:
885                    yellow_print("Couldn't remove temporary panic output file ({})".format(e))
886
887    def run_gdb(self):
888        with self:  # disable console control
889            sys.stderr.write(ANSI_NORMAL)
890            try:
891                cmd = ['%sgdb' % self.toolchain_prefix,
892                       '-ex', 'set serial baud %d' % self.serial.baudrate,
893                       '-ex', 'target remote %s' % self.serial.port,
894                       '-ex', 'interrupt',  # monitor has already parsed the first 'reason' command, need a second
895                       self.elf_file]
896                process = subprocess.Popen(cmd, cwd='.')
897                process.wait()
898            except OSError as e:
899                red_print('%s: %s' % (' '.join(cmd), e))
900            except KeyboardInterrupt:
901                pass  # happens on Windows, maybe other OSes
902            finally:
903                try:
904                    # on Linux, maybe other OSes, gdb sometimes seems to be alive even after wait() returns...
905                    process.terminate()
906                except Exception:
907                    pass
908                try:
909                    # also on Linux, maybe other OSes, gdb sometimes exits uncleanly and breaks the tty mode
910                    subprocess.call(['stty', 'sane'])
911                except Exception:
912                    pass  # don't care if there's no stty, we tried...
913            self.prompt_next_action('gdb exited')
914
915    def output_enable(self, enable):
916        self._output_enabled = enable
917
918    def output_toggle(self):
919        self._output_enabled = not self._output_enabled
920        yellow_print('\nToggle output display: {}, Type Ctrl-T Ctrl-Y to show/disable output again.'.format(self._output_enabled))
921
922    def toggle_logging(self):
923        if self._log_file:
924            self.stop_logging()
925        else:
926            self.start_logging()
927
928    def start_logging(self):
929        if not self._log_file:
930            try:
931                name = 'log.{}.{}.txt'.format(os.path.splitext(os.path.basename(self.elf_file))[0],
932                                              datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
933                self._log_file = open(name, 'wb+')
934                yellow_print('\nLogging is enabled into file {}'.format(name))
935            except Exception as e:
936                red_print('\nLog file {} cannot be created: {}'.format(name, e))
937
938    def stop_logging(self):
939        if self._log_file:
940            try:
941                name = self._log_file.name
942                self._log_file.close()
943                yellow_print('\nLogging is disabled and file {} has been closed'.format(name))
944            except Exception as e:
945                red_print('\nLog file cannot be closed: {}'.format(e))
946            finally:
947                self._log_file = None
948
949    def _print(self, string, console_printer=None):
950        if console_printer is None:
951            console_printer = self.console.write_bytes
952        if self._output_enabled:
953            console_printer(string)
954        if self._log_file:
955            try:
956                if isinstance(string, type(u'')):
957                    string = string.encode()
958                self._log_file.write(string)
959            except Exception as e:
960                red_print('\nCannot write to file: {}'.format(e))
961                # don't fill-up the screen with the previous errors (probably consequent prints would fail also)
962                self.stop_logging()
963
964    def handle_commands(self, cmd):
965        if cmd == CMD_STOP:
966            self.console_reader.stop()
967            self.serial_reader.stop()
968        elif cmd == CMD_RESET:
969            self.serial.setRTS(True)
970            self.serial.setDTR(self.serial.dtr)  # usbser.sys workaround
971            time.sleep(0.2)
972            self.serial.setRTS(False)
973            self.serial.setDTR(self.serial.dtr)  # usbser.sys workaround
974            self.output_enable(True)
975        elif cmd == CMD_MAKE:
976            self.run_make('encrypted-flash' if self.encrypted else 'flash')
977        elif cmd == CMD_APP_FLASH:
978            self.run_make('encrypted-app-flash' if self.encrypted else 'app-flash')
979        elif cmd == CMD_OUTPUT_TOGGLE:
980            self.output_toggle()
981        elif cmd == CMD_TOGGLE_LOGGING:
982            self.toggle_logging()
983        elif cmd == CMD_ENTER_BOOT:
984            self.serial.setDTR(False)  # IO0=HIGH
985            self.serial.setRTS(True)   # EN=LOW, chip in reset
986            self.serial.setDTR(self.serial.dtr)  # usbser.sys workaround
987            time.sleep(1.3)  # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
988            self.serial.setDTR(True)   # IO0=LOW
989            self.serial.setRTS(False)  # EN=HIGH, chip out of reset
990            self.serial.setDTR(self.serial.dtr)  # usbser.sys workaround
991            time.sleep(0.45)  # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
992            self.serial.setDTR(False)  # IO0=HIGH, done
993        else:
994            raise RuntimeError('Bad command data %d' % (cmd))
995
996
997def main():
998    parser = argparse.ArgumentParser('idf_monitor - a serial output monitor for esp-idf')
999
1000    parser.add_argument(
1001        '--port', '-p',
1002        help='Serial port device',
1003        default=os.environ.get('ESPTOOL_PORT', '/dev/ttyUSB0')
1004    )
1005
1006    parser.add_argument(
1007        '--disable-address-decoding', '-d',
1008        help="Don't print lines about decoded addresses from the application ELF file.",
1009        action='store_true',
1010        default=True if os.environ.get('ESP_MONITOR_DECODE') == 0 else False
1011    )
1012
1013    parser.add_argument(
1014        '--baud', '-b',
1015        help='Serial port baud rate',
1016        type=int,
1017        default=os.getenv('IDF_MONITOR_BAUD', os.getenv('MONITORBAUD', 115200)))
1018
1019    parser.add_argument(
1020        '--make', '-m',
1021        help='Command to run make',
1022        type=str, default='make')
1023
1024    parser.add_argument(
1025        '--encrypted',
1026        help='Use encrypted targets while running make',
1027        action='store_true')
1028
1029    parser.add_argument(
1030        '--toolchain-prefix',
1031        help='Triplet prefix to add before cross-toolchain names',
1032        default=DEFAULT_TOOLCHAIN_PREFIX)
1033
1034    parser.add_argument(
1035        '--eol',
1036        choices=['CR', 'LF', 'CRLF'],
1037        type=lambda c: c.upper(),
1038        help='End of line to use when sending to the serial port',
1039        default='CR')
1040
1041    parser.add_argument(
1042        'elf_file', help='ELF file of application',
1043        type=argparse.FileType('rb'))
1044
1045    parser.add_argument(
1046        '--print_filter',
1047        help='Filtering string',
1048        default=DEFAULT_PRINT_FILTER)
1049
1050    parser.add_argument(
1051        '--decode-coredumps',
1052        choices=[COREDUMP_DECODE_INFO, COREDUMP_DECODE_DISABLE],
1053        default=COREDUMP_DECODE_INFO,
1054        help='Handling of core dumps found in serial output'
1055    )
1056
1057    parser.add_argument(
1058        '--decode-panic',
1059        choices=[PANIC_DECODE_BACKTRACE, PANIC_DECODE_DISABLE],
1060        default=PANIC_DECODE_DISABLE,
1061        help='Handling of panic handler info found in serial output'
1062    )
1063
1064    parser.add_argument(
1065        '--target',
1066        required=False,
1067        help='Target name (used when stack dump decoding is enabled)'
1068    )
1069
1070    parser.add_argument(
1071        '--ws',
1072        default=os.environ.get('ESP_IDF_MONITOR_WS', None),
1073        help='WebSocket URL for communicating with IDE tools for debugging purposes'
1074    )
1075
1076    args = parser.parse_args()
1077
1078    # GDB uses CreateFile to open COM port, which requires the COM name to be r'\\.\COMx' if the COM
1079    # number is larger than 10
1080    if os.name == 'nt' and args.port.startswith('COM'):
1081        args.port = args.port.replace('COM', r'\\.\COM')
1082        yellow_print('--- WARNING: GDB cannot open serial ports accessed as COMx')
1083        yellow_print('--- Using %s instead...' % args.port)
1084    elif args.port.startswith('/dev/tty.') and sys.platform == 'darwin':
1085        args.port = args.port.replace('/dev/tty.', '/dev/cu.')
1086        yellow_print('--- WARNING: Serial ports accessed as /dev/tty.* will hang gdb if launched.')
1087        yellow_print('--- Using %s instead...' % args.port)
1088
1089    serial_instance = serial.serial_for_url(args.port, args.baud,
1090                                            do_not_open=True)
1091    serial_instance.dtr = False
1092    serial_instance.rts = False
1093
1094    args.elf_file.close()  # don't need this as a file
1095
1096    # remove the parallel jobserver arguments from MAKEFLAGS, as any
1097    # parent make is only running 1 job (monitor), so we can re-spawn
1098    # all of the child makes we need (the -j argument remains part of
1099    # MAKEFLAGS)
1100    try:
1101        makeflags = os.environ['MAKEFLAGS']
1102        makeflags = re.sub(r'--jobserver[^ =]*=[0-9,]+ ?', '', makeflags)
1103        os.environ['MAKEFLAGS'] = makeflags
1104    except KeyError:
1105        pass  # not running a make jobserver
1106
1107    # Pass the actual used port to callee of idf_monitor (e.g. make) through `ESPPORT` environment
1108    # variable
1109    # To make sure the key as well as the value are str type, by the requirements of subprocess
1110    espport_key = str('ESPPORT')
1111    espport_val = str(args.port)
1112    os.environ.update({espport_key: espport_val})
1113
1114    ws = WebSocketClient(args.ws) if args.ws else None
1115    try:
1116        monitor = Monitor(serial_instance, args.elf_file.name, args.print_filter, args.make, args.encrypted,
1117                          args.toolchain_prefix, args.eol,
1118                          args.decode_coredumps, args.decode_panic, args.target,
1119                          ws, enable_address_decoding=not args.disable_address_decoding)
1120
1121        yellow_print('--- idf_monitor on {p.name} {p.baudrate} ---'.format(
1122            p=serial_instance))
1123        yellow_print('--- Quit: {} | Menu: {} | Help: {} followed by {} ---'.format(
1124            key_description(monitor.console_parser.exit_key),
1125            key_description(monitor.console_parser.menu_key),
1126            key_description(monitor.console_parser.menu_key),
1127            key_description(CTRL_H)))
1128        if args.print_filter != DEFAULT_PRINT_FILTER:
1129            yellow_print('--- Print filter: {} ---'.format(args.print_filter))
1130
1131        monitor.main_loop()
1132    finally:
1133        if ws:
1134            ws.close()
1135
1136
1137class WebSocketClient(object):
1138    """
1139    WebSocket client used to advertise debug events to WebSocket server by sending and receiving JSON-serialized
1140    dictionaries.
1141
1142    Advertisement of debug event:
1143    {'event': 'gdb_stub', 'port': '/dev/ttyUSB1', 'prog': 'build/elf_file'} for GDB Stub, or
1144    {'event': 'coredump', 'file': '/tmp/xy', 'prog': 'build/elf_file'} for coredump,
1145    where 'port' is the port for the connected device, 'prog' is the full path to the ELF file and 'file' is the
1146    generated coredump file.
1147
1148    Expected end of external debugging:
1149    {'event': 'debug_finished'}
1150    """
1151
1152    RETRIES = 3
1153    CONNECTION_RETRY_DELAY = 1
1154
1155    def __init__(self, url):
1156        self.url = url
1157        self._connect()
1158
1159    def _connect(self):
1160        """
1161        Connect to WebSocket server at url
1162        """
1163        self.close()
1164
1165        for _ in range(self.RETRIES):
1166            try:
1167                self.ws = websocket.create_connection(self.url)
1168                break  # success
1169            except NameError:
1170                raise RuntimeError('Please install the websocket_client package for IDE integration!')
1171            except Exception as e:
1172                red_print('WebSocket connection error: {}'.format(e))
1173            time.sleep(self.CONNECTION_RETRY_DELAY)
1174        else:
1175            raise RuntimeError('Cannot connect to WebSocket server')
1176
1177    def close(self):
1178        try:
1179            self.ws.close()
1180        except AttributeError:
1181            # Not yet connected
1182            pass
1183        except Exception as e:
1184            red_print('WebSocket close error: {}'.format(e))
1185
1186    def send(self, payload_dict):
1187        """
1188        Serialize payload_dict in JSON format and send it to the server
1189        """
1190        for _ in range(self.RETRIES):
1191            try:
1192                self.ws.send(json.dumps(payload_dict))
1193                yellow_print('WebSocket sent: {}'.format(payload_dict))
1194                break
1195            except Exception as e:
1196                red_print('WebSocket send error: {}'.format(e))
1197                self._connect()
1198        else:
1199            raise RuntimeError('Cannot send to WebSocket server')
1200
1201    def wait(self, expect_iterable):
1202        """
1203        Wait until a dictionary in JSON format is received from the server with all (key, value) tuples from
1204        expect_iterable.
1205        """
1206        for _ in range(self.RETRIES):
1207            try:
1208                r = self.ws.recv()
1209            except Exception as e:
1210                red_print('WebSocket receive error: {}'.format(e))
1211                self._connect()
1212                continue
1213            obj = json.loads(r)
1214            if all([k in obj and obj[k] == v for k, v in expect_iterable]):
1215                yellow_print('WebSocket received: {}'.format(obj))
1216                break
1217            red_print('WebSocket expected: {}, received: {}'.format(dict(expect_iterable), obj))
1218        else:
1219            raise RuntimeError('Cannot receive from WebSocket server')
1220
1221
1222if os.name == 'nt':
1223    # Windows console stuff
1224
1225    STD_OUTPUT_HANDLE = -11
1226    STD_ERROR_HANDLE = -12
1227
1228    # wincon.h values
1229    FOREGROUND_INTENSITY = 8
1230    FOREGROUND_GREY = 7
1231
1232    # matches the ANSI color change sequences that IDF sends
1233    RE_ANSI_COLOR = re.compile(b'\033\\[([01]);3([0-7])m')
1234
1235    # list mapping the 8 ANSI colors (the indexes) to Windows Console colors
1236    ANSI_TO_WINDOWS_COLOR = [0, 4, 2, 6, 1, 5, 3, 7]
1237
1238    GetStdHandle = ctypes.windll.kernel32.GetStdHandle
1239    SetConsoleTextAttribute = ctypes.windll.kernel32.SetConsoleTextAttribute
1240
1241    class ANSIColorConverter(object):
1242        """Class to wrap a file-like output stream, intercept ANSI color codes,
1243        and convert them into calls to Windows SetConsoleTextAttribute.
1244
1245        Doesn't support all ANSI terminal code escape sequences, only the sequences IDF uses.
1246
1247        Ironically, in Windows this console output is normally wrapped by winpty which will then detect the console text
1248        color changes and convert these back to ANSI color codes for MSYS' terminal to display. However this is the
1249        least-bad working solution, as winpty doesn't support any "passthrough" mode for raw output.
1250        """
1251
1252        def __init__(self, output=None, decode_output=False):
1253            self.output = output
1254            self.decode_output = decode_output
1255            self.handle = GetStdHandle(STD_ERROR_HANDLE if self.output == sys.stderr else STD_OUTPUT_HANDLE)
1256            self.matched = b''
1257
1258        def _output_write(self, data):
1259            try:
1260                if self.decode_output:
1261                    self.output.write(data.decode())
1262                else:
1263                    self.output.write(data)
1264            except (IOError, OSError):
1265                # Windows 10 bug since the Fall Creators Update, sometimes writing to console randomly throws
1266                # an exception (however, the character is still written to the screen)
1267                # Ref https://github.com/espressif/esp-idf/issues/1163
1268                #
1269                # Also possible for Windows to throw an OSError error if the data is invalid for the console
1270                # (garbage bytes, etc)
1271                pass
1272
1273        def write(self, data):
1274            if isinstance(data, bytes):
1275                data = bytearray(data)
1276            else:
1277                data = bytearray(data, 'utf-8')
1278            for b in data:
1279                b = bytes([b])
1280                length = len(self.matched)
1281                if b == b'\033':  # ESC
1282                    self.matched = b
1283                elif (length == 1 and b == b'[') or (1 < length < 7):
1284                    self.matched += b
1285                    if self.matched == ANSI_NORMAL.encode('latin-1'):  # reset console
1286                        # Flush is required only with Python3 - switching color before it is printed would mess up the console
1287                        self.flush()
1288                        SetConsoleTextAttribute(self.handle, FOREGROUND_GREY)
1289                        self.matched = b''
1290                    elif len(self.matched) == 7:     # could be an ANSI sequence
1291                        m = re.match(RE_ANSI_COLOR, self.matched)
1292                        if m is not None:
1293                            color = ANSI_TO_WINDOWS_COLOR[int(m.group(2))]
1294                            if m.group(1) == b'1':
1295                                color |= FOREGROUND_INTENSITY
1296                            # Flush is required only with Python3 - switching color before it is printed would mess up the console
1297                            self.flush()
1298                            SetConsoleTextAttribute(self.handle, color)
1299                        else:
1300                            self._output_write(self.matched)  # not an ANSI color code, display verbatim
1301                        self.matched = b''
1302                else:
1303                    self._output_write(b)
1304                    self.matched = b''
1305
1306        def flush(self):
1307            try:
1308                self.output.flush()
1309            except OSError:
1310                # Account for Windows Console refusing to accept garbage bytes (serial noise, etc)
1311                pass
1312
1313
1314if __name__ == '__main__':
1315    main()
1316