1# SPDX-FileCopyrightText: 2015-2021 Espressif Systems (Shanghai) CO LTD
2# SPDX-License-Identifier: Apache-2.0
3
4import os
5import queue  # noqa: F401
6import re
7import subprocess
8import time
9from typing import Callable, Optional
10
11import serial  # noqa: F401
12from serial.tools import miniterm  # noqa: F401
13
14from .chip_specific_config import get_chip_config
15from .console_parser import ConsoleParser, prompt_next_action  # noqa: F401
16from .console_reader import ConsoleReader  # noqa: F401
17from .constants import (CMD_APP_FLASH, CMD_ENTER_BOOT, CMD_MAKE, CMD_OUTPUT_TOGGLE, CMD_RESET, CMD_STOP,
18                        CMD_TOGGLE_LOGGING, CMD_TOGGLE_TIMESTAMPS, PANIC_DECODE_DISABLE, PANIC_END, PANIC_IDLE,
19                        PANIC_READING, PANIC_STACK_DUMP, PANIC_START)
20from .coredump import CoreDump
21from .exceptions import SerialStopException
22from .gdbhelper import GDBHelper
23from .line_matcher import LineMatcher
24from .logger import Logger
25from .output_helpers import yellow_print
26from .serial_reader import Reader
27
28
29def run_make(target, make, console, console_parser, event_queue, cmd_queue, logger):
30    # type: (str, str, miniterm.Console, ConsoleParser, queue.Queue, queue.Queue, Logger) -> None
31    if isinstance(make, list):
32        popen_args = make + [target]
33    else:
34        popen_args = [make, target]
35    yellow_print('Running %s...' % ' '.join(popen_args))
36    p = subprocess.Popen(popen_args, env=os.environ)
37    try:
38        p.wait()
39    except KeyboardInterrupt:
40        p.wait()
41    if p.returncode != 0:
42        prompt_next_action('Build failed', console, console_parser, event_queue, cmd_queue)
43    else:
44        logger.output_enabled = True
45
46
47class SerialHandler:
48    """
49    The class is responsible for buffering serial input and performing corresponding commands.
50    """
51    def __init__(self, last_line_part, serial_check_exit, logger, decode_panic, reading_panic, panic_buffer, target,
52                 force_line_print, start_cmd_sent, serial_instance, encrypted):
53        # type: (bytes, bool, Logger, str, int, bytes,str, bool, bool, serial.Serial, bool) -> None
54        self._last_line_part = last_line_part
55        self._serial_check_exit = serial_check_exit
56        self.logger = logger
57        self._decode_panic = decode_panic
58        self._reading_panic = reading_panic
59        self._panic_buffer = panic_buffer
60        self.target = target
61        self._force_line_print = force_line_print
62        self.start_cmd_sent = start_cmd_sent
63        self.serial_instance = serial_instance
64        self.encrypted = encrypted
65
66    def handle_serial_input(self, data, console_parser, coredump, gdb_helper, line_matcher,
67                            check_gdb_stub_and_run, finalize_line=False):
68        #  type: (bytes, ConsoleParser, CoreDump, Optional[GDBHelper], LineMatcher, Callable, bool) -> None
69        # Remove "+" after Continue command
70        if self.start_cmd_sent:
71            self.start_cmd_sent = False
72            pos = data.find(b'+')
73            if pos != -1:
74                data = data[(pos + 1):]
75
76        sp = data.split(console_parser.eol)
77        if self._last_line_part != b'':
78            # add unprocessed part from previous "data" to the first line
79            sp[0] = self._last_line_part + sp[0]
80            self._last_line_part = b''
81        if sp[-1] != b'':
82            # last part is not a full line
83            self._last_line_part = sp.pop()
84        for line in sp:
85            if line == b'':
86                continue
87            if self._serial_check_exit and line == console_parser.exit_key.encode('latin-1'):
88                raise SerialStopException()
89            if gdb_helper:
90                self.check_panic_decode_trigger(line, gdb_helper)
91            with coredump.check(line):
92                if self._force_line_print or line_matcher.match(line.decode(errors='ignore')):
93                    self.logger.print(line + b'\n')
94                    self.logger.handle_possible_pc_address_in_line(line)
95            check_gdb_stub_and_run(line)
96            self._force_line_print = False
97        # Now we have the last part (incomplete line) in _last_line_part. By
98        # default we don't touch it and just wait for the arrival of the rest
99        # of the line. But after some time when we didn't received it we need
100        # to make a decision.
101        force_print_or_matched = any((
102            self._force_line_print,
103            (finalize_line and line_matcher.match(self._last_line_part.decode(errors='ignore')))
104        ))
105        if self._last_line_part != b'' and force_print_or_matched:
106            self._force_line_print = True
107            self.logger.print(self._last_line_part)
108            self.logger.handle_possible_pc_address_in_line(self._last_line_part)
109            check_gdb_stub_and_run(self._last_line_part)
110            # It is possible that the incomplete line cuts in half the PC
111            # address. A small buffer is kept and will be used the next time
112            # handle_possible_pc_address_in_line is invoked to avoid this problem.
113            # MATCH_PCADDR matches 10 character long addresses. Therefore, we
114            # keep the last 9 characters.
115            self.logger.pc_address_buffer = self._last_line_part[-9:]
116            # GDB sequence can be cut in half also. GDB sequence is 7
117            # characters long, therefore, we save the last 6 characters.
118            if gdb_helper:
119                gdb_helper.gdb_buffer = self._last_line_part[-6:]
120            self._last_line_part = b''
121        # else: keeping _last_line_part and it will be processed the next time
122        # handle_serial_input is invoked
123
124    def check_panic_decode_trigger(self, line, gdb_helper):  # type: (bytes, GDBHelper) -> None
125        if self._decode_panic == PANIC_DECODE_DISABLE:
126            return
127
128        if self._reading_panic == PANIC_IDLE and re.search(PANIC_START, line.decode('ascii', errors='ignore')):
129            self._reading_panic = PANIC_READING
130            yellow_print('Stack dump detected')
131
132        if self._reading_panic == PANIC_READING and PANIC_STACK_DUMP in line:
133            self.logger.output_enabled = False
134
135        if self._reading_panic == PANIC_READING:
136            self._panic_buffer += line.replace(b'\r', b'') + b'\n'
137
138        if self._reading_panic == PANIC_READING and PANIC_END in line:
139            self._reading_panic = PANIC_IDLE
140            self.logger.output_enabled = True
141            gdb_helper.process_panic_output(self._panic_buffer, self.logger, self.target)
142            self._panic_buffer = b''
143
144    def handle_commands(self, cmd, chip, run_make_func, console_reader, serial_reader):
145        # type: (int, str, Callable, ConsoleReader, Reader) -> None
146        config = get_chip_config(chip)
147        reset_delay = config['reset']
148        enter_boot_set = config['enter_boot_set']
149        enter_boot_unset = config['enter_boot_unset']
150
151        high = False
152        low = True
153
154        if chip == 'linux':
155            if cmd in [CMD_RESET,
156                       CMD_MAKE,
157                       CMD_APP_FLASH,
158                       CMD_ENTER_BOOT]:
159                yellow_print('linux target does not support this command')
160                return
161
162        if cmd == CMD_STOP:
163            console_reader.stop()
164            serial_reader.stop()
165        elif cmd == CMD_RESET:
166            self.serial_instance.setRTS(low)
167            self.serial_instance.setDTR(self.serial_instance.dtr)  # usbser.sys workaround
168            time.sleep(reset_delay)
169            self.serial_instance.setRTS(high)
170            self.serial_instance.setDTR(self.serial_instance.dtr)  # usbser.sys workaround
171            self.logger.output_enabled = True
172        elif cmd == CMD_MAKE:
173            run_make_func('encrypted-flash' if self.encrypted else 'flash')
174        elif cmd == CMD_APP_FLASH:
175            run_make_func('encrypted-app-flash' if self.encrypted else 'app-flash')
176        elif cmd == CMD_OUTPUT_TOGGLE:
177            self.logger.output_toggle()
178        elif cmd == CMD_TOGGLE_LOGGING:
179            self.logger.toggle_logging()
180        elif cmd == CMD_TOGGLE_TIMESTAMPS:
181            self.logger.toggle_timestamps()
182        elif cmd == CMD_ENTER_BOOT:
183            yellow_print('Pause app (enter bootloader mode), press Ctrl-T Ctrl-R to restart')
184            self.serial_instance.setDTR(high)  # IO0=HIGH
185            self.serial_instance.setRTS(low)  # EN=LOW, chip in reset
186            self.serial_instance.setDTR(self.serial_instance.dtr)  # usbser.sys workaround
187            time.sleep(enter_boot_set)  # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.1
188            self.serial_instance.setDTR(low)  # IO0=LOW
189            self.serial_instance.setRTS(high)  # EN=HIGH, chip out of reset
190            self.serial_instance.setDTR(self.serial_instance.dtr)  # usbser.sys workaround
191            time.sleep(enter_boot_unset)  # timeouts taken from esptool.py, includes esp32r0 workaround. defaults: 0.05
192            self.serial_instance.setDTR(high)  # IO0=HIGH, done
193        else:
194            raise RuntimeError('Bad command data %d' % cmd)  # type: ignore
195