1# SPDX-FileCopyrightText: 2015-2021 Espressif Systems (Shanghai) CO LTD 2# SPDX-License-Identifier: Apache-2.0 3 4import os 5import queue # noqa: F401 6import re 7import subprocess 8import time 9from typing import Callable, Optional 10 11import serial # noqa: F401 12from serial.tools import miniterm # noqa: F401 13 14from .chip_specific_config import get_chip_config 15from .console_parser import ConsoleParser, prompt_next_action # noqa: F401 16from .console_reader import ConsoleReader # noqa: F401 17from .constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET, CMD_STOP, 18 CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, PANIC_DECODE_DISABLE, PANIC_END, PANIC_IDLE, 19 PANIC_READING, PANIC_STACK_DUMP, PANIC_START) 20from .coredump import CoreDump 21from .exceptions import SerialStopException 22from .gdbhelper import GDBHelper 23from .line_matcher import LineMatcher 24from .logger import Logger 25from .output_helpers import yellow_print 26from .serial_reader import Reader 27 28 29def run_make(target, make, console, console_parser, event_queue, cmd_queue, logger): 30 # type: (str, str, miniterm.Console, ConsoleParser, queue.Queue, queue.Queue, Logger) -> None 31 if isinstance(make, list): 32 popen_args = make + [target] 33 else: 34 popen_args = [make, target] 35 yellow_print('Running %s...' % ' '.join(popen_args)) 36 p = subprocess.Popen(popen_args, env=os.environ) 37 try: 38 p.wait() 39 except KeyboardInterrupt: 40 p.wait() 41 if p.returncode != 0: 42 prompt_next_action('Build failed', console, console_parser, event_queue, cmd_queue) 43 else: 44 logger.output_enabled = True 45 46 47class SerialHandler: 48 """ 49 The class is responsible for buffering serial input and performing corresponding commands. 50 """ 51 def __init__(self, last_line_part, serial_check_exit, logger, decode_panic, reading_panic, panic_buffer, target, 52 force_line_print, start_cmd_sent, serial_instance, encrypted): 53 # type: (bytes, bool, Logger, str, int, bytes,str, bool, bool, serial.Serial, bool) -> None 54 self._last_line_part = last_line_part 55 self._serial_check_exit = serial_check_exit 56 self.logger = logger 57 self._decode_panic = decode_panic 58 self._reading_panic = reading_panic 59 self._panic_buffer = panic_buffer 60 self.target = target 61 self._force_line_print = force_line_print 62 self.start_cmd_sent = start_cmd_sent 63 self.serial_instance = serial_instance 64 self.encrypted = encrypted 65 66 def handle_serial_input(self, data, console_parser, coredump, gdb_helper, line_matcher, 67 check_gdb_stub_and_run, finalize_line=False): 68 # type: (bytes, ConsoleParser, CoreDump, Optional[GDBHelper], LineMatcher, Callable, bool) -> None 69 # Remove "+" after Continue command 70 if self.start_cmd_sent: 71 self.start_cmd_sent = False 72 pos = data.find(b'+') 73 if pos != -1: 74 data = data[(pos + 1):] 75 76 sp = data.split(console_parser.eol) 77 if self._last_line_part != b'': 78 # add unprocessed part from previous "data" to the first line 79 sp[0] = self._last_line_part + sp[0] 80 self._last_line_part = b'' 81 if sp[-1] != b'': 82 # last part is not a full line 83 self._last_line_part = sp.pop() 84 for line in sp: 85 if line == b'': 86 continue 87 if self._serial_check_exit and line == console_parser.exit_key.encode('latin-1'): 88 raise SerialStopException() 89 if gdb_helper: 90 self.check_panic_decode_trigger(line, gdb_helper) 91 with coredump.check(line): 92 if self._force_line_print or line_matcher.match(line.decode(errors='ignore')): 93 self.logger.print(line + b'\n') 94 self.logger.handle_possible_pc_address_in_line(line) 95 check_gdb_stub_and_run(line) 96 self._force_line_print = False 97 # Now we have the last part (incomplete line) in _last_line_part. By 98 # default we don't touch it and just wait for the arrival of the rest 99 # of the line. But after some time when we didn't received it we need 100 # to make a decision. 101 force_print_or_matched = any(( 102 self._force_line_print, 103 (finalize_line and line_matcher.match(self._last_line_part.decode(errors='ignore'))) 104 )) 105 if self._last_line_part != b'' and force_print_or_matched: 106 self._force_line_print = True 107 self.logger.print(self._last_line_part) 108 self.logger.handle_possible_pc_address_in_line(self._last_line_part) 109 check_gdb_stub_and_run(self._last_line_part) 110 # It is possible that the incomplete line cuts in half the PC 111 # address. A small buffer is kept and will be used the next time 112 # handle_possible_pc_address_in_line is invoked to avoid this problem. 113 # MATCH_PCADDR matches 10 character long addresses. Therefore, we 114 # keep the last 9 characters. 115 self.logger.pc_address_buffer = self._last_line_part[-9:] 116 # GDB sequence can be cut in half also. GDB sequence is 7 117 # characters long, therefore, we save the last 6 characters. 118 if gdb_helper: 119 gdb_helper.gdb_buffer = self._last_line_part[-6:] 120 self._last_line_part = b'' 121 # else: keeping _last_line_part and it will be processed the next time 122 # handle_serial_input is invoked 123 124 def check_panic_decode_trigger(self, line, gdb_helper): # type: (bytes, GDBHelper) -> None 125 if self._decode_panic == PANIC_DECODE_DISABLE: 126 return 127 128 if self._reading_panic == PANIC_IDLE and re.search(PANIC_START, line.decode('ascii', errors='ignore')): 129 self._reading_panic = PANIC_READING 130 yellow_print('Stack dump detected') 131 132 if self._reading_panic == PANIC_READING and PANIC_STACK_DUMP in line: 133 self.logger.output_enabled = False 134 135 if self._reading_panic == PANIC_READING: 136 self._panic_buffer += line.replace(b'\r', b'') + b'\n' 137 138 if self._reading_panic == PANIC_READING and PANIC_END in line: 139 self._reading_panic = PANIC_IDLE 140 self.logger.output_enabled = True 141 gdb_helper.process_panic_output(self._panic_buffer, self.logger, self.target) 142 self._panic_buffer = b'' 143 144 def handle_commands(self, cmd, chip, run_make_func, console_reader, serial_reader): 145 # type: (int, str, Callable, ConsoleReader, Reader) -> None 146 config = get_chip_config(chip) 147 reset_delay = config['reset'] 148 enter_boot_set = config['enter_boot_set'] 149 enter_boot_unset = config['enter_boot_unset'] 150 151 high = False 152 low = True 153 154 if chip == 'linux': 155 if cmd in [CMD_RESET, 156 CMD_MAKE, 157 CMD_APP_FLASH, 158 CMD_ENTER_BOOT]: 159 yellow_print('linux target does not support this command') 160 return 161 162 if cmd == CMD_STOP: 163 console_reader.stop() 164 serial_reader.stop() 165 elif cmd == CMD_RESET: 166 self.serial_instance.setRTS(low) 167 self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround 168 time.sleep(reset_delay) 169 self.serial_instance.setRTS(high) 170 self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround 171 self.logger.output_enabled = True 172 elif cmd == CMD_MAKE: 173 run_make_func('encrypted-flash' if self.encrypted else 'flash') 174 elif cmd == CMD_APP_FLASH: 175 run_make_func('encrypted-app-flash' if self.encrypted else 'app-flash') 176 elif cmd == CMD_OUTPUT_TOGGLE: 177 self.logger.output_toggle() 178 elif cmd == CMD_TOGGLE_LOGGING: 179 self.logger.toggle_logging() 180 elif cmd == CMD_TOGGLE_TIMESTAMPS: 181 self.logger.toggle_timestamps() 182 elif cmd == CMD_ENTER_BOOT: 183 yellow_print('Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart') 184 self.serial_instance.setDTR(high) # IO0=HIGH 185 self.serial_instance.setRTS(low) # EN=LOW, chip in reset 186 self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround 187 time.sleep(enter_boot_set) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1 188 self.serial_instance.setDTR(low) # IO0=LOW 189 self.serial_instance.setRTS(high) # EN=HIGH, chip out of reset 190 self.serial_instance.setDTR(self.serial_instance.dtr) # usbser.sys workaround 191 time.sleep(enter_boot_unset) # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05 192 self.serial_instance.setDTR(high) # IO0=HIGH, done 193 else: 194 raise RuntimeError('Bad command data %d' % cmd) # type: ignore 195