1# SPDX-FileCopyrightText: 2014-2024 Fredrik Ahlberg, Angus Gratton,
2# Espressif Systems (Shanghai) CO LTD, other contributors as noted.
3#
4# SPDX-License-Identifier: BSD-3-Clause
5import os
6import threading
7from esptool.reset import (
8    ClassicReset,
9    CustomReset,
10    DEFAULT_RESET_DELAY,
11    HardReset,
12    UnixTightReset,
13)
14import serial
15import serial.rfc2217
16from serial.rfc2217 import (
17    COM_PORT_OPTION,
18    SET_CONTROL,
19    SET_CONTROL_DTR_OFF,
20    SET_CONTROL_DTR_ON,
21    SET_CONTROL_RTS_OFF,
22    SET_CONTROL_RTS_ON,
23)
24
25from esptool.config import load_config_file
26
27cfg, _ = load_config_file(verbose=True)
28cfg = cfg["esptool"]
29
30
31class EspPortManager(serial.rfc2217.PortManager):
32    """
33    The beginning of the reset sequence is detected and the proper reset sequence
34    is applied in a thread. The rest of the reset sequence received is just ignored
35    and not sent to the serial port.
36    """
37
38    def __init__(self, serial_port, connection, esp32r0_delay, logger=None):
39        self.esp32r0_delay = esp32r0_delay
40        self.is_download_mode = False
41        super(EspPortManager, self).__init__(serial_port, connection, logger)
42
43    def _telnet_process_subnegotiation(self, suboption):
44        if suboption[0:1] == COM_PORT_OPTION and suboption[1:2] == SET_CONTROL:
45            if suboption[2:3] == SET_CONTROL_DTR_OFF:
46                self.is_download_mode = False
47                self.serial.dtr = False
48                return
49            elif suboption[2:3] == SET_CONTROL_RTS_OFF and not self.is_download_mode:
50                reset_thread = threading.Thread(target=self._hard_reset_thread)
51                reset_thread.daemon = True
52                reset_thread.name = "hard_reset_thread"
53                reset_thread.start()
54                return
55            elif suboption[2:3] == SET_CONTROL_DTR_ON and not self.is_download_mode:
56                self.is_download_mode = True
57                reset_thread = threading.Thread(target=self._reset_thread)
58                reset_thread.daemon = True
59                reset_thread.name = "reset_thread"
60                reset_thread.start()
61                return
62            elif suboption[2:3] in [
63                SET_CONTROL_DTR_ON,
64                SET_CONTROL_RTS_ON,
65                SET_CONTROL_RTS_OFF,
66            ]:
67                return
68        # only in cases not handled above do the original implementation in PortManager
69        super(EspPortManager, self)._telnet_process_subnegotiation(suboption)
70
71    def _hard_reset_thread(self):
72        """
73        The reset logic used for hard resetting the chip.
74        """
75        if self.logger:
76            self.logger.info("Activating hard reset in thread")
77        HardReset(self.serial)()
78
79    def _reset_thread(self):
80        """
81        The reset logic is used from esptool.py because the RTS and DTR signals
82        cannot be retransmitted through RFC 2217 with proper timing.
83        """
84        if self.logger:
85            self.logger.info("Activating reset in thread")
86
87        delay = DEFAULT_RESET_DELAY
88        if self.esp32r0_delay:
89            delay += 0.5
90
91        cfg_custom_reset_sequence = cfg.get("custom_reset_sequence")
92        if cfg_custom_reset_sequence is not None:
93            CustomReset(self.serial, cfg_custom_reset_sequence)()
94        elif os.name != "nt":
95            UnixTightReset(self.serial, delay)()
96        else:
97            ClassicReset(self.serial, delay)()
98