1# SPDX-FileCopyrightText: 2014-2023 Fredrik Ahlberg, Angus Gratton,
2# Espressif Systems (Shanghai) CO LTD, other contributors as noted.
3#
4# SPDX-License-Identifier: GPL-2.0-or-later
5
6import base64
7import hashlib
8import itertools
9import json
10import os
11import re
12import string
13import struct
14import sys
15import time
16from typing import Optional
17
18
19from .config import load_config_file
20from .reset import (
21    ClassicReset,
22    CustomReset,
23    DEFAULT_RESET_DELAY,
24    HardReset,
25    USBJTAGSerialReset,
26    UnixTightReset,
27)
28from .util import FatalError, NotImplementedInROMError, UnsupportedCommandError
29from .util import byte, hexify, mask_to_shift, pad_to, strip_chip_name
30
31try:
32    import serial
33except ImportError:
34    print(
35        "Pyserial is not installed for %s. "
36        "Check the README for installation instructions." % (sys.executable)
37    )
38    raise
39
40# check 'serial' is 'pyserial' and not 'serial'
41# ref. https://github.com/espressif/esptool/issues/269
42try:
43    if "serialization" in serial.__doc__ and "deserialization" in serial.__doc__:
44        raise ImportError(
45            "esptool.py depends on pyserial, but there is a conflict with a currently "
46            "installed package named 'serial'.\n"
47            "You may work around this by 'pip uninstall serial; pip install pyserial' "
48            "but this may break other installed Python software "
49            "that depends on 'serial'.\n"
50            "There is no good fix for this right now, "
51            "apart from configuring virtualenvs. "
52            "See https://github.com/espressif/esptool/issues/269#issuecomment-385298196"
53            " for discussion of the underlying issue(s)."
54        )
55except TypeError:
56    pass  # __doc__ returns None for pyserial
57
58try:
59    import serial.tools.list_ports as list_ports
60except ImportError:
61    print(
62        "The installed version (%s) of pyserial appears to be too old for esptool.py "
63        "(Python interpreter %s). Check the README for installation instructions."
64        % (serial.VERSION, sys.executable)
65    )
66    raise
67except Exception:
68    if sys.platform == "darwin":
69        # swallow the exception, this is a known issue in pyserial+macOS Big Sur preview
70        # ref https://github.com/espressif/esptool/issues/540
71        list_ports = None
72    else:
73        raise
74
75
76cfg, _ = load_config_file()
77cfg = cfg["esptool"]
78
79# Timeout for most flash operations
80DEFAULT_TIMEOUT = cfg.getfloat("timeout", 3)
81# Timeout for full chip erase
82CHIP_ERASE_TIMEOUT = cfg.getfloat("chip_erase_timeout", 120)
83# Longest any command can run
84MAX_TIMEOUT = cfg.getfloat("max_timeout", CHIP_ERASE_TIMEOUT * 2)
85# Timeout for syncing with bootloader
86SYNC_TIMEOUT = cfg.getfloat("sync_timeout", 0.1)
87# Timeout (per megabyte) for calculating md5sum
88MD5_TIMEOUT_PER_MB = cfg.getfloat("md5_timeout_per_mb", 8)
89# Timeout (per megabyte) for erasing a region
90ERASE_REGION_TIMEOUT_PER_MB = cfg.getfloat("erase_region_timeout_per_mb", 30)
91# Timeout (per megabyte) for erasing and writing data
92ERASE_WRITE_TIMEOUT_PER_MB = cfg.getfloat("erase_write_timeout_per_mb", 40)
93# Short timeout for ESP_MEM_END, as it may never respond
94MEM_END_ROM_TIMEOUT = cfg.getfloat("mem_end_rom_timeout", 0.2)
95# Timeout for serial port write
96DEFAULT_SERIAL_WRITE_TIMEOUT = cfg.getfloat("serial_write_timeout", 10)
97# Default number of times to try connection
98DEFAULT_CONNECT_ATTEMPTS = cfg.getint("connect_attempts", 7)
99# Number of times to try writing a data block
100WRITE_BLOCK_ATTEMPTS = cfg.getint("write_block_attempts", 3)
101# Number of times to try opening the serial port
102DEFAULT_OPEN_PORT_ATTEMPTS = cfg.getint("open_port_attempts", 1)
103
104
105def timeout_per_mb(seconds_per_mb, size_bytes):
106    """Scales timeouts which are size-specific"""
107    result = seconds_per_mb * (size_bytes / 1e6)
108    if result < DEFAULT_TIMEOUT:
109        return DEFAULT_TIMEOUT
110    return result
111
112
113def check_supported_function(func, check_func):
114    """
115    Decorator implementation that wraps a check around an ESPLoader
116    bootloader function to check if it's supported.
117
118    This is used to capture the multidimensional differences in
119    functionality between the ESP8266 & ESP32 (and later chips) ROM loaders, and the
120    software stub that runs on these. Not possible to do this cleanly
121    via inheritance alone.
122    """
123
124    def inner(*args, **kwargs):
125        obj = args[0]
126        if check_func(obj):
127            return func(*args, **kwargs)
128        else:
129            raise NotImplementedInROMError(obj, func)
130
131    return inner
132
133
134def stub_function_only(func):
135    """Attribute for a function only supported in the software stub loader"""
136    return check_supported_function(func, lambda o: o.IS_STUB)
137
138
139def stub_and_esp32_function_only(func):
140    """Attribute for a function only supported by stubs or ESP32 and later chips ROM"""
141    return check_supported_function(
142        func, lambda o: o.IS_STUB or o.CHIP_NAME not in ["ESP8266"]
143    )
144
145
146def esp32s3_or_newer_function_only(func):
147    """Attribute for a function only supported by ESP32S3 and later chips ROM"""
148    return check_supported_function(
149        func, lambda o: o.CHIP_NAME not in ["ESP8266", "ESP32", "ESP32-S2"]
150    )
151
152
153class StubFlasher:
154    STUB_DIR = os.path.join(os.path.dirname(__file__), "targets", "stub_flasher")
155    # directories will be searched in the order of STUB_SUBDIRS
156    STUB_SUBDIRS = ["1", "2"]
157
158    def __init__(self, chip_name):
159        with open(self.get_json_path(chip_name)) as json_file:
160            stub = json.load(json_file)
161
162        self.text = base64.b64decode(stub["text"])
163        self.text_start = stub["text_start"]
164        self.entry = stub["entry"]
165
166        try:
167            self.data = base64.b64decode(stub["data"])
168            self.data_start = stub["data_start"]
169        except KeyError:
170            self.data = None
171            self.data_start = None
172
173        self.bss_start = stub.get("bss_start")
174
175    def get_json_path(self, chip_name):
176        chip_name = strip_chip_name(chip_name)
177        for i, subdir in enumerate(self.STUB_SUBDIRS):
178            json_path = os.path.join(self.STUB_DIR, subdir, f"{chip_name}.json")
179            if os.path.exists(json_path):
180                if i:
181                    print(
182                        f"Warning: Stub version {self.STUB_SUBDIRS[0]} doesn't exist, using {subdir} instead"
183                    )
184
185                return json_path
186        else:
187            raise FileNotFoundError(f"Stub flasher JSON file for {chip_name} not found")
188
189    @classmethod
190    def set_preferred_stub_subdir(cls, subdir):
191        if subdir in cls.STUB_SUBDIRS:
192            cls.STUB_SUBDIRS.remove(subdir)
193            cls.STUB_SUBDIRS.insert(0, subdir)
194
195
196class ESPLoader(object):
197    """Base class providing access to ESP ROM & software stub bootloaders.
198    Subclasses provide ESP8266 & ESP32 Family specific functionality.
199
200    Don't instantiate this base class directly, either instantiate a subclass or
201    call cmds.detect_chip() which will interrogate the chip and return the
202    appropriate subclass instance. You can also use a context manager as
203    "with detect_chip() as esp:" to ensure the serial port is closed when done.
204
205    """
206
207    CHIP_NAME = "Espressif device"
208    IS_STUB = False
209    STUB_CLASS: Optional[object] = None
210    BOOTLOADER_IMAGE: Optional[object] = None
211
212    DEFAULT_PORT = "/dev/ttyUSB0"
213
214    USES_RFC2217 = False
215
216    # Commands supported by ESP8266 ROM bootloader
217    ESP_FLASH_BEGIN = 0x02
218    ESP_FLASH_DATA = 0x03
219    ESP_FLASH_END = 0x04
220    ESP_MEM_BEGIN = 0x05
221    ESP_MEM_END = 0x06
222    ESP_MEM_DATA = 0x07
223    ESP_SYNC = 0x08
224    ESP_WRITE_REG = 0x09
225    ESP_READ_REG = 0x0A
226
227    # Some commands supported by ESP32 and later chips ROM bootloader (or -8266 w/ stub)
228    ESP_SPI_SET_PARAMS = 0x0B
229    ESP_SPI_ATTACH = 0x0D
230    ESP_READ_FLASH_SLOW = 0x0E  # ROM only, much slower than the stub flash read
231    ESP_CHANGE_BAUDRATE = 0x0F
232    ESP_FLASH_DEFL_BEGIN = 0x10
233    ESP_FLASH_DEFL_DATA = 0x11
234    ESP_FLASH_DEFL_END = 0x12
235    ESP_SPI_FLASH_MD5 = 0x13
236
237    # Commands supported by ESP32-S2 and later chips ROM bootloader only
238    ESP_GET_SECURITY_INFO = 0x14
239
240    # Some commands supported by stub only
241    ESP_ERASE_FLASH = 0xD0
242    ESP_ERASE_REGION = 0xD1
243    ESP_READ_FLASH = 0xD2
244    ESP_RUN_USER_CODE = 0xD3
245
246    # Flash encryption encrypted data command
247    ESP_FLASH_ENCRYPT_DATA = 0xD4
248
249    # Response code(s) sent by ROM
250    ROM_INVALID_RECV_MSG = 0x05  # response if an invalid message is received
251
252    # Maximum block sized for RAM and Flash writes, respectively.
253    ESP_RAM_BLOCK = 0x1800
254
255    FLASH_WRITE_SIZE = 0x400
256
257    # Default baudrate. The ROM auto-bauds, so we can use more or less whatever we want.
258    ESP_ROM_BAUD = 115200
259
260    # First byte of the application image
261    ESP_IMAGE_MAGIC = 0xE9
262
263    # Initial state for the checksum routine
264    ESP_CHECKSUM_MAGIC = 0xEF
265
266    # Flash sector size, minimum unit of erase.
267    FLASH_SECTOR_SIZE = 0x1000
268
269    UART_DATE_REG_ADDR = 0x60000078
270
271    # Whether the SPI peripheral sends from MSB of 32-bit register, or the MSB of valid LSB bits.
272    SPI_ADDR_REG_MSB = True
273
274    # This ROM address has a different value on each chip model
275    CHIP_DETECT_MAGIC_REG_ADDR = 0x40001000
276
277    UART_CLKDIV_MASK = 0xFFFFF
278
279    # Memory addresses
280    IROM_MAP_START = 0x40200000
281    IROM_MAP_END = 0x40300000
282
283    # The number of bytes in the UART response that signify command status
284    STATUS_BYTES_LENGTH = 2
285
286    # Bootloader flashing offset
287    BOOTLOADER_FLASH_OFFSET = 0x0
288
289    # ROM supports an encrypted flashing mode
290    SUPPORTS_ENCRYPTED_FLASH = False
291
292    # Response to ESP_SYNC might indicate that flasher stub is running
293    # instead of the ROM bootloader
294    sync_stub_detected = False
295
296    # Device PIDs
297    USB_JTAG_SERIAL_PID = 0x1001
298
299    # Chip IDs that are no longer supported by esptool
300    UNSUPPORTED_CHIPS = {6: "ESP32-S3(beta 3)"}
301
302    # Number of attempts to write flash data
303    WRITE_FLASH_ATTEMPTS = 2
304
305    def __init__(self, port=DEFAULT_PORT, baud=ESP_ROM_BAUD, trace_enabled=False):
306        """Base constructor for ESPLoader bootloader interaction
307
308        Don't call this constructor, either instantiate a specific
309        ROM class directly, or use cmds.detect_chip(). You can use the with
310        statement to ensure the serial port is closed when done.
311
312        This base class has all of the instance methods for bootloader
313        functionality supported across various chips & stub
314        loaders. Subclasses replace the functions they don't support
315        with ones which throw NotImplementedInROMError().
316
317        """
318        # True if esptool detects the ROM is in Secure Download Mode
319        self.secure_download_mode = False
320        # True if esptool detects conditions which require the stub to be disabled
321        self.stub_is_disabled = False
322
323        # Device-and-runtime-specific cache
324        self.cache = {
325            "flash_id": None,
326            "chip_id": None,
327            "uart_no": None,
328            "usb_pid": None,
329        }
330
331        if isinstance(port, str):
332            try:
333                self._port = serial.serial_for_url(
334                    port, exclusive=True, do_not_open=True
335                )
336                if sys.platform == "win32":
337                    # When opening a port on Windows,
338                    # the RTS/DTR (active low) lines
339                    # need to be set to False (pulled high)
340                    # to avoid unwanted chip reset
341                    self._port.rts = False
342                    self._port.dtr = False
343                self._port.open()
344            except serial.serialutil.SerialException as e:
345                port_issues = [
346                    [  # does not exist error
347                        re.compile(r"Errno 2|FileNotFoundError", re.IGNORECASE),
348                        "Check if the port is correct and ESP connected",
349                    ],
350                    [  # busy port error
351                        re.compile(r"Access is denied", re.IGNORECASE),
352                        "Check if the port is not used by another task",
353                    ],
354                ]
355                if sys.platform.startswith("linux"):
356                    port_issues.append(
357                        [  # permission denied error
358                            re.compile(r"Permission denied", re.IGNORECASE),
359                            ("Try to add user into dialout or uucp group."),
360                        ],
361                    )
362
363                hint_msg = ""
364                for port_issue in port_issues:
365                    if port_issue[0].search(str(e)):
366                        hint_msg = f"\nHint: {port_issue[1]}\n"
367                        break
368
369                raise FatalError(
370                    f"Could not open {port}, the port is busy or doesn't exist."
371                    f"\n({e})\n"
372                    f"{hint_msg}"
373                )
374        else:
375            self._port = port
376        self._slip_reader = slip_reader(self._port, self.trace)
377        # setting baud rate in a separate step is a workaround for
378        # CH341 driver on some Linux versions (this opens at 9600 then
379        # sets), shouldn't matter for other platforms/drivers. See
380        # https://github.com/espressif/esptool/issues/44#issuecomment-107094446
381        self._set_port_baudrate(baud)
382        self._trace_enabled = trace_enabled
383        # set write timeout, to prevent esptool blocked at write forever.
384        try:
385            self._port.write_timeout = DEFAULT_SERIAL_WRITE_TIMEOUT
386        except NotImplementedError:
387            # no write timeout for RFC2217 ports
388            # need to set the property back to None or it will continue to fail
389            self._port.write_timeout = None
390
391    def __enter__(self):
392        return self
393
394    def __exit__(self, exc_type, exc_value, traceback):
395        self._port.close()
396
397    @property
398    def serial_port(self):
399        return self._port.port
400
401    def _set_port_baudrate(self, baud):
402        try:
403            self._port.baudrate = baud
404        except IOError:
405            raise FatalError(
406                "Failed to set baud rate %d. The driver may not support this rate."
407                % baud
408            )
409
410    def read(self):
411        """Read a SLIP packet from the serial port"""
412        return next(self._slip_reader)
413
414    def write(self, packet):
415        """Write bytes to the serial port while performing SLIP escaping"""
416        buf = (
417            b"\xc0"
418            + (packet.replace(b"\xdb", b"\xdb\xdd").replace(b"\xc0", b"\xdb\xdc"))
419            + b"\xc0"
420        )
421        self.trace("Write %d bytes: %s", len(buf), HexFormatter(buf))
422        self._port.write(buf)
423
424    def trace(self, message, *format_args):
425        if self._trace_enabled:
426            now = time.time()
427            try:
428                delta = now - self._last_trace
429            except AttributeError:
430                delta = 0.0
431            self._last_trace = now
432            prefix = "TRACE +%.3f " % delta
433            print(prefix + (message % format_args))
434
435    @staticmethod
436    def checksum(data, state=ESP_CHECKSUM_MAGIC):
437        """Calculate checksum of a blob, as it is defined by the ROM"""
438        for b in data:
439            state ^= b
440
441        return state
442
443    def command(
444        self,
445        op=None,
446        data=b"",
447        chk=0,
448        wait_response=True,
449        timeout=DEFAULT_TIMEOUT,
450    ):
451        """Send a request and read the response"""
452        saved_timeout = self._port.timeout
453        new_timeout = min(timeout, MAX_TIMEOUT)
454        if new_timeout != saved_timeout:
455            self._port.timeout = new_timeout
456
457        try:
458            if op is not None:
459                self.trace(
460                    "command op=0x%02x data len=%s wait_response=%d "
461                    "timeout=%.3f data=%s",
462                    op,
463                    len(data),
464                    1 if wait_response else 0,
465                    timeout,
466                    HexFormatter(data),
467                )
468                pkt = struct.pack(b"<BBHI", 0x00, op, len(data), chk) + data
469                self.write(pkt)
470
471            if not wait_response:
472                return
473
474            # tries to get a response until that response has the
475            # same operation as the request or a retries limit has
476            # exceeded. This is needed for some esp8266s that
477            # reply with more sync responses than expected.
478            for retry in range(100):
479                p = self.read()
480                if len(p) < 8:
481                    continue
482                (resp, op_ret, len_ret, val) = struct.unpack("<BBHI", p[:8])
483                if resp != 1:
484                    continue
485                data = p[8:]
486
487                if op is None or op_ret == op:
488                    return val, data
489                if byte(data, 0) != 0 and byte(data, 1) == self.ROM_INVALID_RECV_MSG:
490                    # Unsupported read_reg can result in
491                    # more than one error response for some reason
492                    self.flush_input()
493                    raise UnsupportedCommandError(self, op)
494
495        finally:
496            if new_timeout != saved_timeout:
497                self._port.timeout = saved_timeout
498
499        raise FatalError("Response doesn't match request")
500
501    def check_command(
502        self, op_description, op=None, data=b"", chk=0, timeout=DEFAULT_TIMEOUT
503    ):
504        """
505        Execute a command with 'command', check the result code and throw an appropriate
506        FatalError if it fails.
507
508        Returns the "result" of a successful command.
509        """
510        val, data = self.command(op, data, chk, timeout=timeout)
511
512        # things are a bit weird here, bear with us
513
514        # the status bytes are the last 2/4 bytes in the data (depending on chip)
515        if len(data) < self.STATUS_BYTES_LENGTH:
516            raise FatalError(
517                "Failed to %s. Only got %d byte status response."
518                % (op_description, len(data))
519            )
520        status_bytes = data[-self.STATUS_BYTES_LENGTH :]
521        # only care if the first one is non-zero. If it is, the second byte is a reason.
522        if byte(status_bytes, 0) != 0:
523            raise FatalError.WithResult("Failed to %s" % op_description, status_bytes)
524
525        # if we had more data than just the status bytes, return it as the result
526        # (this is used by the md5sum command, maybe other commands?)
527        if len(data) > self.STATUS_BYTES_LENGTH:
528            return data[: -self.STATUS_BYTES_LENGTH]
529        else:
530            # otherwise, just return the 'val' field which comes from the reply header
531            # (this is used by read_reg)
532            return val
533
534    def flush_input(self):
535        self._port.flushInput()
536        self._slip_reader = slip_reader(self._port, self.trace)
537
538    def sync(self):
539        val, _ = self.command(
540            self.ESP_SYNC, b"\x07\x07\x12\x20" + 32 * b"\x55", timeout=SYNC_TIMEOUT
541        )
542
543        # ROM bootloaders send some non-zero "val" response. The flasher stub sends 0.
544        # If we receive 0 then it probably indicates that the chip wasn't or couldn't be
545        # reset properly and esptool is talking to the flasher stub.
546        self.sync_stub_detected = val == 0
547
548        for _ in range(7):
549            val, _ = self.command()
550            self.sync_stub_detected &= val == 0
551
552    def _get_pid(self):
553        if self.cache["usb_pid"] is not None:
554            return self.cache["usb_pid"]
555
556        if list_ports is None:
557            print(
558                "\nListing all serial ports is currently not available. "
559                "Can't get device PID."
560            )
561            return
562        active_port = self._port.port
563
564        # Pyserial only identifies regular ports, URL handlers are not supported
565        if not active_port.lower().startswith(("com", "/dev/")):
566            print(
567                "\nDevice PID identification is only supported on "
568                "COM and /dev/ serial ports."
569            )
570            return
571        # Return the real path if the active port is a symlink
572        if active_port.startswith("/dev/") and os.path.islink(active_port):
573            active_port = os.path.realpath(active_port)
574
575        active_ports = [active_port]
576
577        # The "cu" (call-up) device has to be used for outgoing communication on MacOS
578        if sys.platform == "darwin" and "tty" in active_port:
579            active_ports.append(active_port.replace("tty", "cu"))
580        ports = list_ports.comports()
581        for p in ports:
582            if p.device in active_ports:
583                self.cache["usb_pid"] = p.pid
584                return p.pid
585        print(
586            f"\nFailed to get PID of a device on {active_port}, "
587            "using standard reset sequence."
588        )
589
590    def _connect_attempt(self, reset_strategy, mode="default_reset"):
591        """A single connection attempt"""
592        last_error = None
593        boot_log_detected = False
594        download_mode = False
595
596        # If we're doing no_sync, we're likely communicating as a pass through
597        # with an intermediate device to the ESP32
598        if mode == "no_reset_no_sync":
599            return last_error
600
601        if mode != "no_reset":
602            if not self.USES_RFC2217:  # Might block on rfc2217 ports
603                # Empty serial buffer to isolate boot log
604                self._port.reset_input_buffer()
605
606            reset_strategy()  # Reset the chip to bootloader (download mode)
607
608            # Detect the ROM boot log and check actual boot mode (ESP32 and later only)
609            waiting = self._port.inWaiting()
610            read_bytes = self._port.read(waiting)
611            data = re.search(
612                b"boot:(0x[0-9a-fA-F]+)(.*waiting for download)?", read_bytes, re.DOTALL
613            )
614            if data is not None:
615                boot_log_detected = True
616                boot_mode = data.group(1)
617                download_mode = data.group(2) is not None
618
619        for _ in range(5):
620            try:
621                self.flush_input()
622                self._port.flushOutput()
623                self.sync()
624                return None
625            except FatalError as e:
626                print(".", end="")
627                sys.stdout.flush()
628                time.sleep(0.05)
629                last_error = e
630
631        if boot_log_detected:
632            last_error = FatalError(
633                "Wrong boot mode detected ({})! "
634                "The chip needs to be in download mode.".format(
635                    boot_mode.decode("utf-8")
636                )
637            )
638            if download_mode:
639                last_error = FatalError(
640                    "Download mode successfully detected, but getting no sync reply: "
641                    "The serial TX path seems to be down."
642                )
643        return last_error
644
645    def get_memory_region(self, name):
646        """
647        Returns a tuple of (start, end) for the memory map entry with the given name,
648        or None if it doesn't exist
649        """
650        try:
651            return [(start, end) for (start, end, n) in self.MEMORY_MAP if n == name][0]
652        except IndexError:
653            return None
654
655    def _construct_reset_strategy_sequence(self, mode):
656        """
657        Constructs a sequence of reset strategies based on the OS,
658        used ESP chip, external settings, and environment variables.
659        Returns a tuple of one or more reset strategies to be tried sequentially.
660        """
661        cfg_custom_reset_sequence = cfg.get("custom_reset_sequence")
662        if cfg_custom_reset_sequence is not None:
663            return (CustomReset(self._port, cfg_custom_reset_sequence),)
664
665        cfg_reset_delay = cfg.getfloat("reset_delay")
666        if cfg_reset_delay is not None:
667            delay = extra_delay = cfg_reset_delay
668        else:
669            delay = DEFAULT_RESET_DELAY
670            extra_delay = DEFAULT_RESET_DELAY + 0.5
671
672        # This FPGA delay is for Espressif internal use
673        if (
674            self.CHIP_NAME == "ESP32"
675            and os.environ.get("ESPTOOL_ENV_FPGA", "").strip() == "1"
676        ):
677            delay = extra_delay = 7
678
679        # USB-JTAG/Serial mode
680        if mode == "usb_reset" or self._get_pid() == self.USB_JTAG_SERIAL_PID:
681            return (USBJTAGSerialReset(self._port),)
682
683        # USB-to-Serial bridge
684        if os.name != "nt" and not self._port.name.startswith("rfc2217:"):
685            return (
686                UnixTightReset(self._port, delay),
687                UnixTightReset(self._port, extra_delay),
688                ClassicReset(self._port, delay),
689                ClassicReset(self._port, extra_delay),
690            )
691
692        return (
693            ClassicReset(self._port, delay),
694            ClassicReset(self._port, extra_delay),
695        )
696
697    def connect(
698        self,
699        mode="default_reset",
700        attempts=DEFAULT_CONNECT_ATTEMPTS,
701        detecting=False,
702        warnings=True,
703    ):
704        """Try connecting repeatedly until successful, or giving up"""
705        if warnings and mode in ["no_reset", "no_reset_no_sync"]:
706            print(
707                'WARNING: Pre-connection option "{}" was selected.'.format(mode),
708                "Connection may fail if the chip is not in bootloader "
709                "or flasher stub mode.",
710            )
711
712        if self._port.name.startswith("socket:"):
713            mode = "no_reset"  # not possible to toggle DTR/RTS over a TCP socket
714            print(
715                "Note: It's not possible to reset the chip over a TCP socket. "
716                "Automatic resetting to bootloader has been disabled, "
717                "reset the chip manually."
718            )
719
720        print("Connecting...", end="")
721        sys.stdout.flush()
722        last_error = None
723
724        reset_sequence = self._construct_reset_strategy_sequence(mode)
725        try:
726            for _, reset_strategy in zip(
727                range(attempts) if attempts > 0 else itertools.count(),
728                itertools.cycle(reset_sequence),
729            ):
730                last_error = self._connect_attempt(reset_strategy, mode)
731                if last_error is None:
732                    break
733        finally:
734            print("")  # end 'Connecting...' line
735
736        if last_error is not None:
737            additional_msg = ""
738            if self.CHIP_NAME == "ESP32-C2" and self._port.baudrate < 115200:
739                additional_msg = (
740                    "\nNote: Please set a higher baud rate (--baud)"
741                    " if ESP32-C2 doesn't connect"
742                    " (at least 115200 Bd is recommended)."
743                )
744
745            raise FatalError(
746                "Failed to connect to {}: {}"
747                f"{additional_msg}"
748                "\nFor troubleshooting steps visit: "
749                "https://docs.espressif.com/projects/esptool/en/latest/troubleshooting.html".format(  # noqa E501
750                    self.CHIP_NAME, last_error
751                )
752            )
753
754        if not detecting:
755            try:
756                from .targets import ROM_LIST
757
758                # check the date code registers match what we expect to see
759                chip_magic_value = self.read_reg(ESPLoader.CHIP_DETECT_MAGIC_REG_ADDR)
760                if chip_magic_value not in self.CHIP_DETECT_MAGIC_VALUE:
761                    actually = None
762                    for cls in ROM_LIST:
763                        if chip_magic_value in cls.CHIP_DETECT_MAGIC_VALUE:
764                            actually = cls
765                            break
766                    if warnings and actually is None:
767                        print(
768                            "WARNING: This chip doesn't appear to be a %s "
769                            "(chip magic value 0x%08x). "
770                            "Probably it is unsupported by this version of esptool."
771                            % (self.CHIP_NAME, chip_magic_value)
772                        )
773                    else:
774                        raise FatalError(
775                            "This chip is %s not %s. Wrong --chip argument?"
776                            % (actually.CHIP_NAME, self.CHIP_NAME)
777                        )
778            except UnsupportedCommandError:
779                self.secure_download_mode = True
780
781            try:
782                self.check_chip_id()
783            except UnsupportedCommandError:
784                # Fix for ROM not responding in SDM, reconnect and try again
785                if self.secure_download_mode:
786                    self._connect_attempt(mode, reset_sequence[0])
787                    self.check_chip_id()
788                else:
789                    raise
790            self._post_connect()
791
792    def _post_connect(self):
793        """
794        Additional initialization hook, may be overridden by the chip-specific class.
795        Gets called after connect, and after auto-detection.
796        """
797        pass
798
799    def read_reg(self, addr, timeout=DEFAULT_TIMEOUT):
800        """Read memory address in target"""
801        # we don't call check_command here because read_reg() function is called
802        # when detecting chip type, and the way we check for success
803        # (STATUS_BYTES_LENGTH) is different for different chip types (!)
804        val, data = self.command(
805            self.ESP_READ_REG, struct.pack("<I", addr), timeout=timeout
806        )
807        if byte(data, 0) != 0:
808            raise FatalError.WithResult(
809                "Failed to read register address %08x" % addr, data
810            )
811        return val
812
813    def write_reg(self, addr, value, mask=0xFFFFFFFF, delay_us=0, delay_after_us=0):
814        """Write to memory address in target"""
815        command = struct.pack("<IIII", addr, value, mask, delay_us)
816        if delay_after_us > 0:
817            # add a dummy write to a date register as an excuse to have a delay
818            command += struct.pack(
819                "<IIII", self.UART_DATE_REG_ADDR, 0, 0, delay_after_us
820            )
821
822        return self.check_command("write target memory", self.ESP_WRITE_REG, command)
823
824    def update_reg(self, addr, mask, new_val):
825        """
826        Update register at 'addr', replace the bits masked out by 'mask'
827        with new_val. new_val is shifted left to match the LSB of 'mask'
828
829        Returns just-written value of register.
830        """
831        shift = mask_to_shift(mask)
832        val = self.read_reg(addr)
833        val &= ~mask
834        val |= (new_val << shift) & mask
835        self.write_reg(addr, val)
836
837        return val
838
839    def mem_begin(self, size, blocks, blocksize, offset):
840        """Start downloading an application image to RAM"""
841        # check we're not going to overwrite a running stub with this data
842        if self.IS_STUB:
843            stub = StubFlasher(self.CHIP_NAME)
844            load_start = offset
845            load_end = offset + size
846            for stub_start, stub_end in [
847                (
848                    stub.bss_start or stub.data_start,
849                    stub.data_start + len(stub.data),
850                ),  # DRAM = bss+data
851                (stub.text_start, stub.text_start + len(stub.text)),  # IRAM
852            ]:
853                if load_start < stub_end and load_end > stub_start:
854                    raise FatalError(
855                        "Software loader is resident at 0x%08x-0x%08x. "
856                        "Can't load binary at overlapping address range 0x%08x-0x%08x. "
857                        "Either change binary loading address, or use the --no-stub "
858                        "option to disable the software loader."
859                        % (stub_start, stub_end, load_start, load_end)
860                    )
861
862        return self.check_command(
863            "enter RAM download mode",
864            self.ESP_MEM_BEGIN,
865            struct.pack("<IIII", size, blocks, blocksize, offset),
866        )
867
868    def mem_block(self, data, seq):
869        """Send a block of an image to RAM"""
870        return self.check_command(
871            "write to target RAM",
872            self.ESP_MEM_DATA,
873            struct.pack("<IIII", len(data), seq, 0, 0) + data,
874            self.checksum(data),
875        )
876
877    def mem_finish(self, entrypoint=0):
878        """Leave download mode and run the application"""
879        # Sending ESP_MEM_END usually sends a correct response back, however sometimes
880        # (with ROM loader) the executed code may reset the UART or change the baud rate
881        # before the transmit FIFO is empty. So in these cases we set a short timeout
882        # and ignore errors.
883        timeout = DEFAULT_TIMEOUT if self.IS_STUB else MEM_END_ROM_TIMEOUT
884        data = struct.pack("<II", int(entrypoint == 0), entrypoint)
885        try:
886            return self.check_command(
887                "leave RAM download mode", self.ESP_MEM_END, data=data, timeout=timeout
888            )
889        except FatalError:
890            if self.IS_STUB:
891                raise
892            pass
893
894    def flash_begin(self, size, offset, begin_rom_encrypted=False):
895        """
896        Start downloading to Flash (performs an erase)
897
898        Returns number of blocks (of size self.FLASH_WRITE_SIZE) to write.
899        """
900        num_blocks = (size + self.FLASH_WRITE_SIZE - 1) // self.FLASH_WRITE_SIZE
901        erase_size = self.get_erase_size(offset, size)
902
903        t = time.time()
904        if self.IS_STUB:
905            timeout = DEFAULT_TIMEOUT
906        else:
907            timeout = timeout_per_mb(
908                ERASE_REGION_TIMEOUT_PER_MB, size
909            )  # ROM performs the erase up front
910
911        params = struct.pack(
912            "<IIII", erase_size, num_blocks, self.FLASH_WRITE_SIZE, offset
913        )
914        if self.SUPPORTS_ENCRYPTED_FLASH and not self.IS_STUB:
915            params += struct.pack("<I", 1 if begin_rom_encrypted else 0)
916        self.check_command(
917            "enter Flash download mode", self.ESP_FLASH_BEGIN, params, timeout=timeout
918        )
919        if size != 0 and not self.IS_STUB:
920            print("Took %.2fs to erase flash block" % (time.time() - t))
921        return num_blocks
922
923    def flash_block(self, data, seq, timeout=DEFAULT_TIMEOUT):
924        """Write block to flash, retry if fail"""
925        for attempts_left in range(WRITE_BLOCK_ATTEMPTS - 1, -1, -1):
926            try:
927                self.check_command(
928                    "write to target Flash after seq %d" % seq,
929                    self.ESP_FLASH_DATA,
930                    struct.pack("<IIII", len(data), seq, 0, 0) + data,
931                    self.checksum(data),
932                    timeout=timeout,
933                )
934                break
935            except FatalError:
936                if attempts_left:
937                    self.trace(
938                        "Block write failed, "
939                        f"retrying with {attempts_left} attempts left"
940                    )
941                else:
942                    raise
943
944    def flash_encrypt_block(self, data, seq, timeout=DEFAULT_TIMEOUT):
945        """Encrypt, write block to flash, retry if fail"""
946        if self.SUPPORTS_ENCRYPTED_FLASH and not self.IS_STUB:
947            # ROM support performs the encrypted writes via the normal write command,
948            # triggered by flash_begin(begin_rom_encrypted=True)
949            return self.flash_block(data, seq, timeout)
950
951        for attempts_left in range(WRITE_BLOCK_ATTEMPTS - 1, -1, -1):
952            try:
953                self.check_command(
954                    "Write encrypted to target Flash after seq %d" % seq,
955                    self.ESP_FLASH_ENCRYPT_DATA,
956                    struct.pack("<IIII", len(data), seq, 0, 0) + data,
957                    self.checksum(data),
958                    timeout=timeout,
959                )
960                break
961            except FatalError:
962                if attempts_left:
963                    self.trace(
964                        "Encrypted block write failed, "
965                        f"retrying with {attempts_left} attempts left"
966                    )
967                else:
968                    raise
969
970    def flash_finish(self, reboot=False):
971        """Leave flash mode and run/reboot"""
972        pkt = struct.pack("<I", int(not reboot))
973        # stub sends a reply to this command
974        self.check_command("leave Flash mode", self.ESP_FLASH_END, pkt)
975
976    def run(self, reboot=False):
977        """Run application code in flash"""
978        # Fake flash begin immediately followed by flash end
979        self.flash_begin(0, 0)
980        self.flash_finish(reboot)
981
982    def flash_id(self):
983        """Read SPI flash manufacturer and device id"""
984        if self.cache["flash_id"] is None:
985            SPIFLASH_RDID = 0x9F
986            self.cache["flash_id"] = self.run_spiflash_command(SPIFLASH_RDID, b"", 24)
987        return self.cache["flash_id"]
988
989    def flash_type(self):
990        """Read flash type bit field from eFuse. Returns 0, 1, None (not present)"""
991        return None  # not implemented for all chip targets
992
993    def get_security_info(self):
994        res = self.check_command("get security info", self.ESP_GET_SECURITY_INFO, b"")
995        esp32s2 = True if len(res) == 12 else False
996        res = struct.unpack("<IBBBBBBBB" if esp32s2 else "<IBBBBBBBBII", res)
997        return {
998            "flags": res[0],
999            "flash_crypt_cnt": res[1],
1000            "key_purposes": res[2:9],
1001            "chip_id": None if esp32s2 else res[9],
1002            "api_version": None if esp32s2 else res[10],
1003        }
1004
1005    @esp32s3_or_newer_function_only
1006    def get_chip_id(self):
1007        if self.cache["chip_id"] is None:
1008            res = self.check_command(
1009                "get security info", self.ESP_GET_SECURITY_INFO, b""
1010            )
1011            res = struct.unpack(
1012                "<IBBBBBBBBI", res[:16]
1013            )  # 4b flags, 1b flash_crypt_cnt, 7*1b key_purposes, 4b chip_id
1014            self.cache["chip_id"] = res[9]  # 2/4 status bytes invariant
1015        return self.cache["chip_id"]
1016
1017    def get_uart_no(self):
1018        """
1019        Read the UARTDEV_BUF_NO register to get the number of the currently used console
1020        """
1021        if self.cache["uart_no"] is None:
1022            self.cache["uart_no"] = self.read_reg(self.UARTDEV_BUF_NO) & 0xFF
1023        return self.cache["uart_no"]
1024
1025    @classmethod
1026    def parse_flash_size_arg(cls, arg):
1027        try:
1028            return cls.FLASH_SIZES[arg]
1029        except KeyError:
1030            raise FatalError(
1031                "Flash size '%s' is not supported by this chip type. "
1032                "Supported sizes: %s" % (arg, ", ".join(cls.FLASH_SIZES.keys()))
1033            )
1034
1035    @classmethod
1036    def parse_flash_freq_arg(cls, arg):
1037        if arg is None:
1038            # The encoding of the default flash frequency in FLASH_FREQUENCY is always 0
1039            return 0
1040        try:
1041            return cls.FLASH_FREQUENCY[arg]
1042        except KeyError:
1043            raise FatalError(
1044                "Flash frequency '%s' is not supported by this chip type. "
1045                "Supported frequencies: %s"
1046                % (arg, ", ".join(cls.FLASH_FREQUENCY.keys()))
1047            )
1048
1049    def run_stub(self, stub=None):
1050        if stub is None:
1051            stub = StubFlasher(self.CHIP_NAME)
1052
1053        if self.sync_stub_detected:
1054            print("Stub is already running. No upload is necessary.")
1055            return self.STUB_CLASS(self)
1056
1057        # Upload
1058        print("Uploading stub...")
1059        for field in [stub.text, stub.data]:
1060            if field is not None:
1061                offs = stub.text_start if field == stub.text else stub.data_start
1062                length = len(field)
1063                blocks = (length + self.ESP_RAM_BLOCK - 1) // self.ESP_RAM_BLOCK
1064                self.mem_begin(length, blocks, self.ESP_RAM_BLOCK, offs)
1065                for seq in range(blocks):
1066                    from_offs = seq * self.ESP_RAM_BLOCK
1067                    to_offs = from_offs + self.ESP_RAM_BLOCK
1068                    self.mem_block(field[from_offs:to_offs], seq)
1069        print("Running stub...")
1070        self.mem_finish(stub.entry)
1071        try:
1072            p = self.read()
1073        except StopIteration:
1074            raise FatalError(
1075                "Failed to start stub. There was no response."
1076                "\nTry increasing timeouts, for more information see: "
1077                "https://docs.espressif.com/projects/esptool/en/latest/esptool/configuration-file.html"  # noqa E501
1078            )
1079
1080        if p != b"OHAI":
1081            raise FatalError(f"Failed to start stub. Unexpected response: {p}")
1082        print("Stub running...")
1083        return self.STUB_CLASS(self)
1084
1085    @stub_and_esp32_function_only
1086    def flash_defl_begin(self, size, compsize, offset):
1087        """
1088        Start downloading compressed data to Flash (performs an erase)
1089
1090        Returns number of blocks (size self.FLASH_WRITE_SIZE) to write.
1091        """
1092        num_blocks = (compsize + self.FLASH_WRITE_SIZE - 1) // self.FLASH_WRITE_SIZE
1093        erase_blocks = (size + self.FLASH_WRITE_SIZE - 1) // self.FLASH_WRITE_SIZE
1094
1095        t = time.time()
1096        if self.IS_STUB:
1097            write_size = (
1098                size  # stub expects number of bytes here, manages erasing internally
1099            )
1100            timeout = DEFAULT_TIMEOUT
1101        else:
1102            write_size = (
1103                erase_blocks * self.FLASH_WRITE_SIZE
1104            )  # ROM expects rounded up to erase block size
1105            timeout = timeout_per_mb(
1106                ERASE_REGION_TIMEOUT_PER_MB, write_size
1107            )  # ROM performs the erase up front
1108        print("Compressed %d bytes to %d..." % (size, compsize))
1109        params = struct.pack(
1110            "<IIII", write_size, num_blocks, self.FLASH_WRITE_SIZE, offset
1111        )
1112        if self.SUPPORTS_ENCRYPTED_FLASH and not self.IS_STUB:
1113            # extra param is to enter encrypted flash mode via ROM
1114            # (not supported currently)
1115            params += struct.pack("<I", 0)
1116        self.check_command(
1117            "enter compressed flash mode",
1118            self.ESP_FLASH_DEFL_BEGIN,
1119            params,
1120            timeout=timeout,
1121        )
1122        if size != 0 and not self.IS_STUB:
1123            # (stub erases as it writes, but ROM loaders erase on begin)
1124            print("Took %.2fs to erase flash block" % (time.time() - t))
1125        return num_blocks
1126
1127    @stub_and_esp32_function_only
1128    def flash_defl_block(self, data, seq, timeout=DEFAULT_TIMEOUT):
1129        """Write block to flash, send compressed, retry if fail"""
1130        for attempts_left in range(WRITE_BLOCK_ATTEMPTS - 1, -1, -1):
1131            try:
1132                self.check_command(
1133                    "write compressed data to flash after seq %d" % seq,
1134                    self.ESP_FLASH_DEFL_DATA,
1135                    struct.pack("<IIII", len(data), seq, 0, 0) + data,
1136                    self.checksum(data),
1137                    timeout=timeout,
1138                )
1139                break
1140            except FatalError:
1141                if attempts_left:
1142                    self.trace(
1143                        "Compressed block write failed, "
1144                        f"retrying with {attempts_left} attempts left"
1145                    )
1146                else:
1147                    raise
1148
1149    @stub_and_esp32_function_only
1150    def flash_defl_finish(self, reboot=False):
1151        """Leave compressed flash mode and run/reboot"""
1152        if not reboot and not self.IS_STUB:
1153            # skip sending flash_finish to ROM loader, as this
1154            # exits the bootloader. Stub doesn't do this.
1155            return
1156        pkt = struct.pack("<I", int(not reboot))
1157        self.check_command("leave compressed flash mode", self.ESP_FLASH_DEFL_END, pkt)
1158        self.in_bootloader = False
1159
1160    @stub_and_esp32_function_only
1161    def flash_md5sum(self, addr, size):
1162        # the MD5 command returns additional bytes in the standard
1163        # command reply slot
1164        timeout = timeout_per_mb(MD5_TIMEOUT_PER_MB, size)
1165        res = self.check_command(
1166            "calculate md5sum",
1167            self.ESP_SPI_FLASH_MD5,
1168            struct.pack("<IIII", addr, size, 0, 0),
1169            timeout=timeout,
1170        )
1171
1172        if len(res) == 32:
1173            return res.decode("utf-8")  # already hex formatted
1174        elif len(res) == 16:
1175            return hexify(res).lower()
1176        else:
1177            raise FatalError("MD5Sum command returned unexpected result: %r" % res)
1178
1179    @stub_and_esp32_function_only
1180    def change_baud(self, baud):
1181        print("Changing baud rate to %d" % baud)
1182        # stub takes the new baud rate and the old one
1183        second_arg = self._port.baudrate if self.IS_STUB else 0
1184        self.command(self.ESP_CHANGE_BAUDRATE, struct.pack("<II", baud, second_arg))
1185        print("Changed.")
1186        self._set_port_baudrate(baud)
1187        time.sleep(0.05)  # get rid of crap sent during baud rate change
1188        self.flush_input()
1189
1190    @stub_function_only
1191    def erase_flash(self):
1192        # depending on flash chip model the erase may take this long (maybe longer!)
1193        self.check_command(
1194            "erase flash", self.ESP_ERASE_FLASH, timeout=CHIP_ERASE_TIMEOUT
1195        )
1196
1197    @stub_function_only
1198    def erase_region(self, offset, size):
1199        if offset % self.FLASH_SECTOR_SIZE != 0:
1200            raise FatalError("Offset to erase from must be a multiple of 4096")
1201        if size % self.FLASH_SECTOR_SIZE != 0:
1202            raise FatalError("Size of data to erase must be a multiple of 4096")
1203        timeout = timeout_per_mb(ERASE_REGION_TIMEOUT_PER_MB, size)
1204        self.check_command(
1205            "erase region",
1206            self.ESP_ERASE_REGION,
1207            struct.pack("<II", offset, size),
1208            timeout=timeout,
1209        )
1210
1211    def read_flash_slow(self, offset, length, progress_fn):
1212        raise NotImplementedInROMError(self, self.read_flash_slow)
1213
1214    def read_flash(self, offset, length, progress_fn=None):
1215        if not self.IS_STUB:
1216            return self.read_flash_slow(offset, length, progress_fn)  # ROM-only routine
1217
1218        # issue a standard bootloader command to trigger the read
1219        self.check_command(
1220            "read flash",
1221            self.ESP_READ_FLASH,
1222            struct.pack("<IIII", offset, length, self.FLASH_SECTOR_SIZE, 64),
1223        )
1224        # now we expect (length // block_size) SLIP frames with the data
1225        data = b""
1226        while len(data) < length:
1227            p = self.read()
1228            data += p
1229            if len(data) < length and len(p) < self.FLASH_SECTOR_SIZE:
1230                raise FatalError(
1231                    "Corrupt data, expected 0x%x bytes but received 0x%x bytes"
1232                    % (self.FLASH_SECTOR_SIZE, len(p))
1233                )
1234            self.write(struct.pack("<I", len(data)))
1235            if progress_fn and (len(data) % 1024 == 0 or len(data) == length):
1236                progress_fn(len(data), length)
1237        if progress_fn:
1238            progress_fn(len(data), length)
1239        if len(data) > length:
1240            raise FatalError("Read more than expected")
1241
1242        digest_frame = self.read()
1243        if len(digest_frame) != 16:
1244            raise FatalError("Expected digest, got: %s" % hexify(digest_frame))
1245        expected_digest = hexify(digest_frame).upper()
1246        digest = hashlib.md5(data).hexdigest().upper()
1247        if digest != expected_digest:
1248            raise FatalError(
1249                "Digest mismatch: expected %s, got %s" % (expected_digest, digest)
1250            )
1251        return data
1252
1253    def flash_spi_attach(self, hspi_arg):
1254        """Send SPI attach command to enable the SPI flash pins
1255
1256        ESP8266 ROM does this when you send flash_begin, ESP32 ROM
1257        has it as a SPI command.
1258        """
1259        # last 3 bytes in ESP_SPI_ATTACH argument are reserved values
1260        arg = struct.pack("<I", hspi_arg)
1261        if not self.IS_STUB:
1262            # ESP32 ROM loader takes additional 'is legacy' arg, which is not
1263            # currently supported in the stub loader or esptool.py
1264            # (as it's not usually needed.)
1265            is_legacy = 0
1266            arg += struct.pack("BBBB", is_legacy, 0, 0, 0)
1267        self.check_command("configure SPI flash pins", self.ESP_SPI_ATTACH, arg)
1268
1269    def flash_set_parameters(self, size):
1270        """Tell the ESP bootloader the parameters of the chip
1271
1272        Corresponds to the "flashchip" data structure that the ROM
1273        has in RAM.
1274
1275        'size' is in bytes.
1276
1277        All other flash parameters are currently hardcoded (on ESP8266
1278        these are mostly ignored by ROM code, on ESP32 I'm not sure.)
1279        """
1280        fl_id = 0
1281        total_size = size
1282        block_size = 64 * 1024
1283        sector_size = 4 * 1024
1284        page_size = 256
1285        status_mask = 0xFFFF
1286        self.check_command(
1287            "set SPI params",
1288            self.ESP_SPI_SET_PARAMS,
1289            struct.pack(
1290                "<IIIIII",
1291                fl_id,
1292                total_size,
1293                block_size,
1294                sector_size,
1295                page_size,
1296                status_mask,
1297            ),
1298        )
1299
1300    def run_spiflash_command(
1301        self,
1302        spiflash_command,
1303        data=b"",
1304        read_bits=0,
1305        addr=None,
1306        addr_len=0,
1307        dummy_len=0,
1308    ):
1309        """Run an arbitrary SPI flash command.
1310
1311        This function uses the "USR_COMMAND" functionality in the ESP
1312        SPI hardware, rather than the precanned commands supported by
1313        hardware. So the value of spiflash_command is an actual command
1314        byte, sent over the wire.
1315
1316        After writing command byte, writes 'data' to MOSI and then
1317        reads back 'read_bits' of reply on MISO. Result is a number.
1318        """
1319
1320        # SPI_USR register flags
1321        SPI_USR_COMMAND = 1 << 31
1322        SPI_USR_ADDR = 1 << 30
1323        SPI_USR_DUMMY = 1 << 29
1324        SPI_USR_MISO = 1 << 28
1325        SPI_USR_MOSI = 1 << 27
1326
1327        # SPI registers, base address differs ESP32* vs 8266
1328        base = self.SPI_REG_BASE
1329        SPI_CMD_REG = base + 0x00
1330        SPI_ADDR_REG = base + 0x04
1331        SPI_USR_REG = base + self.SPI_USR_OFFS
1332        SPI_USR1_REG = base + self.SPI_USR1_OFFS
1333        SPI_USR2_REG = base + self.SPI_USR2_OFFS
1334        SPI_W0_REG = base + self.SPI_W0_OFFS
1335
1336        # following two registers are ESP32 and later chips only
1337        if self.SPI_MOSI_DLEN_OFFS is not None:
1338            # ESP32 and later chips have a more sophisticated way
1339            # to set up "user" commands
1340            def set_data_lengths(mosi_bits, miso_bits):
1341                SPI_MOSI_DLEN_REG = base + self.SPI_MOSI_DLEN_OFFS
1342                SPI_MISO_DLEN_REG = base + self.SPI_MISO_DLEN_OFFS
1343                if mosi_bits > 0:
1344                    self.write_reg(SPI_MOSI_DLEN_REG, mosi_bits - 1)
1345                if miso_bits > 0:
1346                    self.write_reg(SPI_MISO_DLEN_REG, miso_bits - 1)
1347                flags = 0
1348                if dummy_len > 0:
1349                    flags |= dummy_len - 1
1350                if addr_len > 0:
1351                    flags |= (addr_len - 1) << SPI_USR_ADDR_LEN_SHIFT
1352                if flags:
1353                    self.write_reg(SPI_USR1_REG, flags)
1354
1355        else:
1356
1357            def set_data_lengths(mosi_bits, miso_bits):
1358                SPI_DATA_LEN_REG = SPI_USR1_REG
1359                SPI_MOSI_BITLEN_S = 17
1360                SPI_MISO_BITLEN_S = 8
1361                mosi_mask = 0 if (mosi_bits == 0) else (mosi_bits - 1)
1362                miso_mask = 0 if (miso_bits == 0) else (miso_bits - 1)
1363                flags = (miso_mask << SPI_MISO_BITLEN_S) | (
1364                    mosi_mask << SPI_MOSI_BITLEN_S
1365                )
1366                if dummy_len > 0:
1367                    flags |= dummy_len - 1
1368                if addr_len > 0:
1369                    flags |= (addr_len - 1) << SPI_USR_ADDR_LEN_SHIFT
1370                self.write_reg(SPI_DATA_LEN_REG, flags)
1371
1372        # SPI peripheral "command" bitmasks for SPI_CMD_REG
1373        SPI_CMD_USR = 1 << 18
1374
1375        # shift values
1376        SPI_USR2_COMMAND_LEN_SHIFT = 28
1377        SPI_USR_ADDR_LEN_SHIFT = 26
1378
1379        if read_bits > 32:
1380            raise FatalError(
1381                "Reading more than 32 bits back from a SPI flash "
1382                "operation is unsupported"
1383            )
1384        if len(data) > 64:
1385            raise FatalError(
1386                "Writing more than 64 bytes of data with one SPI "
1387                "command is unsupported"
1388            )
1389
1390        data_bits = len(data) * 8
1391        old_spi_usr = self.read_reg(SPI_USR_REG)
1392        old_spi_usr2 = self.read_reg(SPI_USR2_REG)
1393        flags = SPI_USR_COMMAND
1394        if read_bits > 0:
1395            flags |= SPI_USR_MISO
1396        if data_bits > 0:
1397            flags |= SPI_USR_MOSI
1398        if addr_len > 0:
1399            flags |= SPI_USR_ADDR
1400        if dummy_len > 0:
1401            flags |= SPI_USR_DUMMY
1402        set_data_lengths(data_bits, read_bits)
1403        self.write_reg(SPI_USR_REG, flags)
1404        self.write_reg(
1405            SPI_USR2_REG, (7 << SPI_USR2_COMMAND_LEN_SHIFT) | spiflash_command
1406        )
1407        if addr_len > 0:
1408            if self.SPI_ADDR_REG_MSB:
1409                addr = addr << (32 - addr_len)
1410            self.write_reg(SPI_ADDR_REG, addr)
1411        if data_bits == 0:
1412            self.write_reg(SPI_W0_REG, 0)  # clear data register before we read it
1413        else:
1414            data = pad_to(data, 4, b"\00")  # pad to 32-bit multiple
1415            words = struct.unpack("I" * (len(data) // 4), data)
1416            next_reg = SPI_W0_REG
1417            for word in words:
1418                self.write_reg(next_reg, word)
1419                next_reg += 4
1420        self.write_reg(SPI_CMD_REG, SPI_CMD_USR)
1421
1422        def wait_done():
1423            for _ in range(10):
1424                if (self.read_reg(SPI_CMD_REG) & SPI_CMD_USR) == 0:
1425                    return
1426            raise FatalError("SPI command did not complete in time")
1427
1428        wait_done()
1429
1430        status = self.read_reg(SPI_W0_REG)
1431        # restore some SPI controller registers
1432        self.write_reg(SPI_USR_REG, old_spi_usr)
1433        self.write_reg(SPI_USR2_REG, old_spi_usr2)
1434        return status
1435
1436    def read_spiflash_sfdp(self, addr, read_bits):
1437        CMD_RDSFDP = 0x5A
1438        return self.run_spiflash_command(
1439            CMD_RDSFDP, read_bits=read_bits, addr=addr, addr_len=24, dummy_len=8
1440        )
1441
1442    def read_status(self, num_bytes=2):
1443        """Read up to 24 bits (num_bytes) of SPI flash status register contents
1444        via RDSR, RDSR2, RDSR3 commands
1445
1446        Not all SPI flash supports all three commands. The upper 1 or 2
1447        bytes may be 0xFF.
1448        """
1449        SPIFLASH_RDSR = 0x05
1450        SPIFLASH_RDSR2 = 0x35
1451        SPIFLASH_RDSR3 = 0x15
1452
1453        status = 0
1454        shift = 0
1455        for cmd in [SPIFLASH_RDSR, SPIFLASH_RDSR2, SPIFLASH_RDSR3][0:num_bytes]:
1456            status += self.run_spiflash_command(cmd, read_bits=8) << shift
1457            shift += 8
1458        return status
1459
1460    def write_status(self, new_status, num_bytes=2, set_non_volatile=False):
1461        """Write up to 24 bits (num_bytes) of new status register
1462
1463        num_bytes can be 1, 2 or 3.
1464
1465        Not all flash supports the additional commands to write the
1466        second and third byte of the status register. When writing 2
1467        bytes, esptool also sends a 16-byte WRSR command (as some
1468        flash types use this instead of WRSR2.)
1469
1470        If the set_non_volatile flag is set, non-volatile bits will
1471        be set as well as volatile ones (WREN used instead of WEVSR).
1472
1473        """
1474        SPIFLASH_WRSR = 0x01
1475        SPIFLASH_WRSR2 = 0x31
1476        SPIFLASH_WRSR3 = 0x11
1477        SPIFLASH_WEVSR = 0x50
1478        SPIFLASH_WREN = 0x06
1479        SPIFLASH_WRDI = 0x04
1480
1481        enable_cmd = SPIFLASH_WREN if set_non_volatile else SPIFLASH_WEVSR
1482
1483        # try using a 16-bit WRSR (not supported by all chips)
1484        # this may be redundant, but shouldn't hurt
1485        if num_bytes == 2:
1486            self.run_spiflash_command(enable_cmd)
1487            self.run_spiflash_command(SPIFLASH_WRSR, struct.pack("<H", new_status))
1488
1489        # also try using individual commands
1490        # (also not supported by all chips for num_bytes 2 & 3)
1491        for cmd in [SPIFLASH_WRSR, SPIFLASH_WRSR2, SPIFLASH_WRSR3][0:num_bytes]:
1492            self.run_spiflash_command(enable_cmd)
1493            self.run_spiflash_command(cmd, struct.pack("B", new_status & 0xFF))
1494            new_status >>= 8
1495
1496        self.run_spiflash_command(SPIFLASH_WRDI)
1497
1498    def get_crystal_freq(self):
1499        """
1500        Figure out the crystal frequency from the UART clock divider
1501
1502        Returns a normalized value in integer MHz (only values 40 or 26 are supported)
1503        """
1504        # The logic here is:
1505        # - We know that our baud rate and the ESP UART baud rate are roughly the same,
1506        #   or we couldn't communicate
1507        # - We can read the UART clock divider register to know how the ESP derives this
1508        #   from the APB bus frequency
1509        # - Multiplying these two together gives us the bus frequency which is either
1510        #   the crystal frequency (ESP32) or double the crystal frequency (ESP8266).
1511        #   See the self.XTAL_CLK_DIVIDER parameter for this factor.
1512        uart_div = self.read_reg(self.UART_CLKDIV_REG) & self.UART_CLKDIV_MASK
1513        est_xtal = (self._port.baudrate * uart_div) / 1e6 / self.XTAL_CLK_DIVIDER
1514        if est_xtal > 45:
1515            norm_xtal = 48
1516        elif est_xtal > 33:
1517            norm_xtal = 40
1518        else:
1519            norm_xtal = 26
1520        if abs(norm_xtal - est_xtal) > 1:
1521            print(
1522                "WARNING: Detected crystal freq %.2fMHz is quite different to "
1523                "normalized freq %dMHz. Unsupported crystal in use?"
1524                % (est_xtal, norm_xtal)
1525            )
1526        return norm_xtal
1527
1528    def hard_reset(self):
1529        print("Hard resetting via RTS pin...")
1530        HardReset(self._port)()
1531
1532    def soft_reset(self, stay_in_bootloader):
1533        if not self.IS_STUB:
1534            if stay_in_bootloader:
1535                return  # ROM bootloader is already in bootloader!
1536            else:
1537                # 'run user code' is as close to a soft reset as we can do
1538                self.flash_begin(0, 0)
1539                self.flash_finish(False)
1540        else:
1541            if stay_in_bootloader:
1542                # soft resetting from the stub loader
1543                # will re-load the ROM bootloader
1544                self.flash_begin(0, 0)
1545                self.flash_finish(True)
1546            elif self.CHIP_NAME != "ESP8266":
1547                raise FatalError(
1548                    "Soft resetting is currently only supported on ESP8266"
1549                )
1550            else:
1551                # running user code from stub loader requires some hacks
1552                # in the stub loader
1553                self.command(self.ESP_RUN_USER_CODE, wait_response=False)
1554
1555    def check_chip_id(self):
1556        try:
1557            chip_id = self.get_chip_id()
1558            if chip_id != self.IMAGE_CHIP_ID:
1559                print(
1560                    "WARNING: Chip ID {} ({}) doesn't match expected Chip ID {}. "
1561                    "esptool may not work correctly.".format(
1562                        chip_id,
1563                        self.UNSUPPORTED_CHIPS.get(chip_id, "Unknown"),
1564                        self.IMAGE_CHIP_ID,
1565                    )
1566                )
1567                # Try to flash anyways by disabling stub
1568                self.stub_is_disabled = True
1569        except NotImplementedInROMError:
1570            pass
1571
1572
1573def slip_reader(port, trace_function):
1574    """Generator to read SLIP packets from a serial port.
1575    Yields one full SLIP packet at a time, raises exception on timeout or invalid data.
1576
1577    Designed to avoid too many calls to serial.read(1), which can bog
1578    down on slow systems.
1579    """
1580
1581    def detect_panic_handler(input):
1582        """
1583        Checks the input bytes for panic handler messages.
1584        Raises a FatalError if Guru Meditation or Fatal Exception is found, as both
1585        of these are used between different ROM versions.
1586        Tries to also parse the error cause (e.g. IllegalInstruction).
1587        """
1588
1589        guru_meditation = (
1590            rb"G?uru Meditation Error: (?:Core \d panic'ed \(([a-zA-Z ]*)\))?"
1591        )
1592        fatal_exception = rb"F?atal exception \(\d+\): (?:([a-zA-Z ]*)?.*epc)?"
1593
1594        # Search either for Guru Meditation or Fatal Exception
1595        data = re.search(
1596            rb"".join([rb"(?:", guru_meditation, rb"|", fatal_exception, rb")"]),
1597            input,
1598            re.DOTALL,
1599        )
1600        if data is not None:
1601            cause = [
1602                "({})".format(i.decode("utf-8"))
1603                for i in [data.group(1), data.group(2)]
1604                if i is not None
1605            ]
1606            cause = f" {cause[0]}" if len(cause) else ""
1607            msg = f"Guru Meditation Error detected{cause}"
1608            raise FatalError(msg)
1609
1610    partial_packet = None
1611    in_escape = False
1612    successful_slip = False
1613    while True:
1614        waiting = port.inWaiting()
1615        read_bytes = port.read(1 if waiting == 0 else waiting)
1616        if read_bytes == b"":
1617            if partial_packet is None:  # fail due to no data
1618                msg = (
1619                    "Serial data stream stopped: Possible serial noise or corruption."
1620                    if successful_slip
1621                    else "No serial data received."
1622                )
1623            else:  # fail during packet transfer
1624                msg = "Packet content transfer stopped (received {} bytes)".format(
1625                    len(partial_packet)
1626                )
1627            trace_function(msg)
1628            raise FatalError(msg)
1629        trace_function("Read %d bytes: %s", len(read_bytes), HexFormatter(read_bytes))
1630        for b in read_bytes:
1631            b = bytes([b])
1632            if partial_packet is None:  # waiting for packet header
1633                if b == b"\xc0":
1634                    partial_packet = b""
1635                else:
1636                    trace_function("Read invalid data: %s", HexFormatter(read_bytes))
1637                    remaining_data = port.read(port.inWaiting())
1638                    trace_function(
1639                        "Remaining data in serial buffer: %s",
1640                        HexFormatter(remaining_data),
1641                    )
1642                    detect_panic_handler(read_bytes + remaining_data)
1643                    raise FatalError(
1644                        "Invalid head of packet (0x%s): "
1645                        "Possible serial noise or corruption." % hexify(b)
1646                    )
1647            elif in_escape:  # part-way through escape sequence
1648                in_escape = False
1649                if b == b"\xdc":
1650                    partial_packet += b"\xc0"
1651                elif b == b"\xdd":
1652                    partial_packet += b"\xdb"
1653                else:
1654                    trace_function("Read invalid data: %s", HexFormatter(read_bytes))
1655                    remaining_data = port.read(port.inWaiting())
1656                    trace_function(
1657                        "Remaining data in serial buffer: %s",
1658                        HexFormatter(remaining_data),
1659                    )
1660                    detect_panic_handler(read_bytes + remaining_data)
1661                    raise FatalError("Invalid SLIP escape (0xdb, 0x%s)" % (hexify(b)))
1662            elif b == b"\xdb":  # start of escape sequence
1663                in_escape = True
1664            elif b == b"\xc0":  # end of packet
1665                trace_function("Received full packet: %s", HexFormatter(partial_packet))
1666                yield partial_packet
1667                partial_packet = None
1668                successful_slip = True
1669            else:  # normal byte in packet
1670                partial_packet += b
1671
1672
1673class HexFormatter(object):
1674    """
1675    Wrapper class which takes binary data in its constructor
1676    and returns a hex string as it's __str__ method.
1677
1678    This is intended for "lazy formatting" of trace() output
1679    in hex format. Avoids overhead (significant on slow computers)
1680    of generating long hex strings even if tracing is disabled.
1681
1682    Note that this doesn't save any overhead if passed as an
1683    argument to "%", only when passed to trace()
1684
1685    If auto_split is set (default), any long line (> 16 bytes) will be
1686    printed as separately indented lines, with ASCII decoding at the end
1687    of each line.
1688    """
1689
1690    def __init__(self, binary_string, auto_split=True):
1691        self._s = binary_string
1692        self._auto_split = auto_split
1693
1694    def __str__(self):
1695        if self._auto_split and len(self._s) > 16:
1696            result = ""
1697            s = self._s
1698            while len(s) > 0:
1699                line = s[:16]
1700                ascii_line = "".join(
1701                    (
1702                        c
1703                        if (
1704                            c == " "
1705                            or (c in string.printable and c not in string.whitespace)
1706                        )
1707                        else "."
1708                    )
1709                    for c in line.decode("ascii", "replace")
1710                )
1711                s = s[16:]
1712                result += "\n    %-16s %-16s | %s" % (
1713                    hexify(line[:8], False),
1714                    hexify(line[8:], False),
1715                    ascii_line,
1716                )
1717            return result
1718        else:
1719            return hexify(self._s, False)
1720