1# SPDX-FileCopyrightText: 2014-2023 Fredrik Ahlberg, Angus Gratton, 2# Espressif Systems (Shanghai) CO LTD, other contributors as noted. 3# 4# SPDX-License-Identifier: GPL-2.0-or-later 5 6import base64 7import hashlib 8import itertools 9import json 10import os 11import re 12import string 13import struct 14import sys 15import time 16from typing import Optional 17 18 19from .config import load_config_file 20from .reset import ( 21 ClassicReset, 22 CustomReset, 23 DEFAULT_RESET_DELAY, 24 HardReset, 25 USBJTAGSerialReset, 26 UnixTightReset, 27) 28from .util import FatalError, NotImplementedInROMError, UnsupportedCommandError 29from .util import byte, hexify, mask_to_shift, pad_to, strip_chip_name 30 31try: 32 import serial 33except ImportError: 34 print( 35 "Pyserial is not installed for %s. " 36 "Check the README for installation instructions." % (sys.executable) 37 ) 38 raise 39 40# check 'serial' is 'pyserial' and not 'serial' 41# ref. https://github.com/espressif/esptool/issues/269 42try: 43 if "serialization" in serial.__doc__ and "deserialization" in serial.__doc__: 44 raise ImportError( 45 "esptool.py depends on pyserial, but there is a conflict with a currently " 46 "installed package named 'serial'.\n" 47 "You may work around this by 'pip uninstall serial; pip install pyserial' " 48 "but this may break other installed Python software " 49 "that depends on 'serial'.\n" 50 "There is no good fix for this right now, " 51 "apart from configuring virtualenvs. " 52 "See https://github.com/espressif/esptool/issues/269#issuecomment-385298196" 53 " for discussion of the underlying issue(s)." 54 ) 55except TypeError: 56 pass # __doc__ returns None for pyserial 57 58try: 59 import serial.tools.list_ports as list_ports 60except ImportError: 61 print( 62 "The installed version (%s) of pyserial appears to be too old for esptool.py " 63 "(Python interpreter %s). Check the README for installation instructions." 64 % (serial.VERSION, sys.executable) 65 ) 66 raise 67except Exception: 68 if sys.platform == "darwin": 69 # swallow the exception, this is a known issue in pyserial+macOS Big Sur preview 70 # ref https://github.com/espressif/esptool/issues/540 71 list_ports = None 72 else: 73 raise 74 75 76cfg, _ = load_config_file() 77cfg = cfg["esptool"] 78 79# Timeout for most flash operations 80DEFAULT_TIMEOUT = cfg.getfloat("timeout", 3) 81# Timeout for full chip erase 82CHIP_ERASE_TIMEOUT = cfg.getfloat("chip_erase_timeout", 120) 83# Longest any command can run 84MAX_TIMEOUT = cfg.getfloat("max_timeout", CHIP_ERASE_TIMEOUT * 2) 85# Timeout for syncing with bootloader 86SYNC_TIMEOUT = cfg.getfloat("sync_timeout", 0.1) 87# Timeout (per megabyte) for calculating md5sum 88MD5_TIMEOUT_PER_MB = cfg.getfloat("md5_timeout_per_mb", 8) 89# Timeout (per megabyte) for erasing a region 90ERASE_REGION_TIMEOUT_PER_MB = cfg.getfloat("erase_region_timeout_per_mb", 30) 91# Timeout (per megabyte) for erasing and writing data 92ERASE_WRITE_TIMEOUT_PER_MB = cfg.getfloat("erase_write_timeout_per_mb", 40) 93# Short timeout for ESP_MEM_END, as it may never respond 94MEM_END_ROM_TIMEOUT = cfg.getfloat("mem_end_rom_timeout", 0.2) 95# Timeout for serial port write 96DEFAULT_SERIAL_WRITE_TIMEOUT = cfg.getfloat("serial_write_timeout", 10) 97# Default number of times to try connection 98DEFAULT_CONNECT_ATTEMPTS = cfg.getint("connect_attempts", 7) 99# Number of times to try writing a data block 100WRITE_BLOCK_ATTEMPTS = cfg.getint("write_block_attempts", 3) 101# Number of times to try opening the serial port 102DEFAULT_OPEN_PORT_ATTEMPTS = cfg.getint("open_port_attempts", 1) 103 104 105def timeout_per_mb(seconds_per_mb, size_bytes): 106 """Scales timeouts which are size-specific""" 107 result = seconds_per_mb * (size_bytes / 1e6) 108 if result < DEFAULT_TIMEOUT: 109 return DEFAULT_TIMEOUT 110 return result 111 112 113def check_supported_function(func, check_func): 114 """ 115 Decorator implementation that wraps a check around an ESPLoader 116 bootloader function to check if it's supported. 117 118 This is used to capture the multidimensional differences in 119 functionality between the ESP8266 & ESP32 (and later chips) ROM loaders, and the 120 software stub that runs on these. Not possible to do this cleanly 121 via inheritance alone. 122 """ 123 124 def inner(*args, **kwargs): 125 obj = args[0] 126 if check_func(obj): 127 return func(*args, **kwargs) 128 else: 129 raise NotImplementedInROMError(obj, func) 130 131 return inner 132 133 134def stub_function_only(func): 135 """Attribute for a function only supported in the software stub loader""" 136 return check_supported_function(func, lambda o: o.IS_STUB) 137 138 139def stub_and_esp32_function_only(func): 140 """Attribute for a function only supported by stubs or ESP32 and later chips ROM""" 141 return check_supported_function( 142 func, lambda o: o.IS_STUB or o.CHIP_NAME not in ["ESP8266"] 143 ) 144 145 146def esp32s3_or_newer_function_only(func): 147 """Attribute for a function only supported by ESP32S3 and later chips ROM""" 148 return check_supported_function( 149 func, lambda o: o.CHIP_NAME not in ["ESP8266", "ESP32", "ESP32-S2"] 150 ) 151 152 153class StubFlasher: 154 STUB_DIR = os.path.join(os.path.dirname(__file__), "targets", "stub_flasher") 155 # directories will be searched in the order of STUB_SUBDIRS 156 STUB_SUBDIRS = ["1", "2"] 157 158 def __init__(self, chip_name): 159 with open(self.get_json_path(chip_name)) as json_file: 160 stub = json.load(json_file) 161 162 self.text = base64.b64decode(stub["text"]) 163 self.text_start = stub["text_start"] 164 self.entry = stub["entry"] 165 166 try: 167 self.data = base64.b64decode(stub["data"]) 168 self.data_start = stub["data_start"] 169 except KeyError: 170 self.data = None 171 self.data_start = None 172 173 self.bss_start = stub.get("bss_start") 174 175 def get_json_path(self, chip_name): 176 chip_name = strip_chip_name(chip_name) 177 for i, subdir in enumerate(self.STUB_SUBDIRS): 178 json_path = os.path.join(self.STUB_DIR, subdir, f"{chip_name}.json") 179 if os.path.exists(json_path): 180 if i: 181 print( 182 f"Warning: Stub version {self.STUB_SUBDIRS[0]} doesn't exist, using {subdir} instead" 183 ) 184 185 return json_path 186 else: 187 raise FileNotFoundError(f"Stub flasher JSON file for {chip_name} not found") 188 189 @classmethod 190 def set_preferred_stub_subdir(cls, subdir): 191 if subdir in cls.STUB_SUBDIRS: 192 cls.STUB_SUBDIRS.remove(subdir) 193 cls.STUB_SUBDIRS.insert(0, subdir) 194 195 196class ESPLoader(object): 197 """Base class providing access to ESP ROM & software stub bootloaders. 198 Subclasses provide ESP8266 & ESP32 Family specific functionality. 199 200 Don't instantiate this base class directly, either instantiate a subclass or 201 call cmds.detect_chip() which will interrogate the chip and return the 202 appropriate subclass instance. You can also use a context manager as 203 "with detect_chip() as esp:" to ensure the serial port is closed when done. 204 205 """ 206 207 CHIP_NAME = "Espressif device" 208 IS_STUB = False 209 STUB_CLASS: Optional[object] = None 210 BOOTLOADER_IMAGE: Optional[object] = None 211 212 DEFAULT_PORT = "/dev/ttyUSB0" 213 214 USES_RFC2217 = False 215 216 # Commands supported by ESP8266 ROM bootloader 217 ESP_FLASH_BEGIN = 0x02 218 ESP_FLASH_DATA = 0x03 219 ESP_FLASH_END = 0x04 220 ESP_MEM_BEGIN = 0x05 221 ESP_MEM_END = 0x06 222 ESP_MEM_DATA = 0x07 223 ESP_SYNC = 0x08 224 ESP_WRITE_REG = 0x09 225 ESP_READ_REG = 0x0A 226 227 # Some commands supported by ESP32 and later chips ROM bootloader (or -8266 w/ stub) 228 ESP_SPI_SET_PARAMS = 0x0B 229 ESP_SPI_ATTACH = 0x0D 230 ESP_READ_FLASH_SLOW = 0x0E # ROM only, much slower than the stub flash read 231 ESP_CHANGE_BAUDRATE = 0x0F 232 ESP_FLASH_DEFL_BEGIN = 0x10 233 ESP_FLASH_DEFL_DATA = 0x11 234 ESP_FLASH_DEFL_END = 0x12 235 ESP_SPI_FLASH_MD5 = 0x13 236 237 # Commands supported by ESP32-S2 and later chips ROM bootloader only 238 ESP_GET_SECURITY_INFO = 0x14 239 240 # Some commands supported by stub only 241 ESP_ERASE_FLASH = 0xD0 242 ESP_ERASE_REGION = 0xD1 243 ESP_READ_FLASH = 0xD2 244 ESP_RUN_USER_CODE = 0xD3 245 246 # Flash encryption encrypted data command 247 ESP_FLASH_ENCRYPT_DATA = 0xD4 248 249 # Response code(s) sent by ROM 250 ROM_INVALID_RECV_MSG = 0x05 # response if an invalid message is received 251 252 # Maximum block sized for RAM and Flash writes, respectively. 253 ESP_RAM_BLOCK = 0x1800 254 255 FLASH_WRITE_SIZE = 0x400 256 257 # Default baudrate. The ROM auto-bauds, so we can use more or less whatever we want. 258 ESP_ROM_BAUD = 115200 259 260 # First byte of the application image 261 ESP_IMAGE_MAGIC = 0xE9 262 263 # Initial state for the checksum routine 264 ESP_CHECKSUM_MAGIC = 0xEF 265 266 # Flash sector size, minimum unit of erase. 267 FLASH_SECTOR_SIZE = 0x1000 268 269 UART_DATE_REG_ADDR = 0x60000078 270 271 # Whether the SPI peripheral sends from MSB of 32-bit register, or the MSB of valid LSB bits. 272 SPI_ADDR_REG_MSB = True 273 274 # This ROM address has a different value on each chip model 275 CHIP_DETECT_MAGIC_REG_ADDR = 0x40001000 276 277 UART_CLKDIV_MASK = 0xFFFFF 278 279 # Memory addresses 280 IROM_MAP_START = 0x40200000 281 IROM_MAP_END = 0x40300000 282 283 # The number of bytes in the UART response that signify command status 284 STATUS_BYTES_LENGTH = 2 285 286 # Bootloader flashing offset 287 BOOTLOADER_FLASH_OFFSET = 0x0 288 289 # ROM supports an encrypted flashing mode 290 SUPPORTS_ENCRYPTED_FLASH = False 291 292 # Response to ESP_SYNC might indicate that flasher stub is running 293 # instead of the ROM bootloader 294 sync_stub_detected = False 295 296 # Device PIDs 297 USB_JTAG_SERIAL_PID = 0x1001 298 299 # Chip IDs that are no longer supported by esptool 300 UNSUPPORTED_CHIPS = {6: "ESP32-S3(beta 3)"} 301 302 # Number of attempts to write flash data 303 WRITE_FLASH_ATTEMPTS = 2 304 305 def __init__(self, port=DEFAULT_PORT, baud=ESP_ROM_BAUD, trace_enabled=False): 306 """Base constructor for ESPLoader bootloader interaction 307 308 Don't call this constructor, either instantiate a specific 309 ROM class directly, or use cmds.detect_chip(). You can use the with 310 statement to ensure the serial port is closed when done. 311 312 This base class has all of the instance methods for bootloader 313 functionality supported across various chips & stub 314 loaders. Subclasses replace the functions they don't support 315 with ones which throw NotImplementedInROMError(). 316 317 """ 318 # True if esptool detects the ROM is in Secure Download Mode 319 self.secure_download_mode = False 320 # True if esptool detects conditions which require the stub to be disabled 321 self.stub_is_disabled = False 322 323 # Device-and-runtime-specific cache 324 self.cache = { 325 "flash_id": None, 326 "chip_id": None, 327 "uart_no": None, 328 "usb_pid": None, 329 } 330 331 if isinstance(port, str): 332 try: 333 self._port = serial.serial_for_url( 334 port, exclusive=True, do_not_open=True 335 ) 336 if sys.platform == "win32": 337 # When opening a port on Windows, 338 # the RTS/DTR (active low) lines 339 # need to be set to False (pulled high) 340 # to avoid unwanted chip reset 341 self._port.rts = False 342 self._port.dtr = False 343 self._port.open() 344 except serial.serialutil.SerialException as e: 345 port_issues = [ 346 [ # does not exist error 347 re.compile(r"Errno 2|FileNotFoundError", re.IGNORECASE), 348 "Check if the port is correct and ESP connected", 349 ], 350 [ # busy port error 351 re.compile(r"Access is denied", re.IGNORECASE), 352 "Check if the port is not used by another task", 353 ], 354 ] 355 if sys.platform.startswith("linux"): 356 port_issues.append( 357 [ # permission denied error 358 re.compile(r"Permission denied", re.IGNORECASE), 359 ("Try to add user into dialout or uucp group."), 360 ], 361 ) 362 363 hint_msg = "" 364 for port_issue in port_issues: 365 if port_issue[0].search(str(e)): 366 hint_msg = f"\nHint: {port_issue[1]}\n" 367 break 368 369 raise FatalError( 370 f"Could not open {port}, the port is busy or doesn't exist." 371 f"\n({e})\n" 372 f"{hint_msg}" 373 ) 374 else: 375 self._port = port 376 self._slip_reader = slip_reader(self._port, self.trace) 377 # setting baud rate in a separate step is a workaround for 378 # CH341 driver on some Linux versions (this opens at 9600 then 379 # sets), shouldn't matter for other platforms/drivers. See 380 # https://github.com/espressif/esptool/issues/44#issuecomment-107094446 381 self._set_port_baudrate(baud) 382 self._trace_enabled = trace_enabled 383 # set write timeout, to prevent esptool blocked at write forever. 384 try: 385 self._port.write_timeout = DEFAULT_SERIAL_WRITE_TIMEOUT 386 except NotImplementedError: 387 # no write timeout for RFC2217 ports 388 # need to set the property back to None or it will continue to fail 389 self._port.write_timeout = None 390 391 def __enter__(self): 392 return self 393 394 def __exit__(self, exc_type, exc_value, traceback): 395 self._port.close() 396 397 @property 398 def serial_port(self): 399 return self._port.port 400 401 def _set_port_baudrate(self, baud): 402 try: 403 self._port.baudrate = baud 404 except IOError: 405 raise FatalError( 406 "Failed to set baud rate %d. The driver may not support this rate." 407 % baud 408 ) 409 410 def read(self): 411 """Read a SLIP packet from the serial port""" 412 return next(self._slip_reader) 413 414 def write(self, packet): 415 """Write bytes to the serial port while performing SLIP escaping""" 416 buf = ( 417 b"\xc0" 418 + (packet.replace(b"\xdb", b"\xdb\xdd").replace(b"\xc0", b"\xdb\xdc")) 419 + b"\xc0" 420 ) 421 self.trace("Write %d bytes: %s", len(buf), HexFormatter(buf)) 422 self._port.write(buf) 423 424 def trace(self, message, *format_args): 425 if self._trace_enabled: 426 now = time.time() 427 try: 428 delta = now - self._last_trace 429 except AttributeError: 430 delta = 0.0 431 self._last_trace = now 432 prefix = "TRACE +%.3f " % delta 433 print(prefix + (message % format_args)) 434 435 @staticmethod 436 def checksum(data, state=ESP_CHECKSUM_MAGIC): 437 """Calculate checksum of a blob, as it is defined by the ROM""" 438 for b in data: 439 state ^= b 440 441 return state 442 443 def command( 444 self, 445 op=None, 446 data=b"", 447 chk=0, 448 wait_response=True, 449 timeout=DEFAULT_TIMEOUT, 450 ): 451 """Send a request and read the response""" 452 saved_timeout = self._port.timeout 453 new_timeout = min(timeout, MAX_TIMEOUT) 454 if new_timeout != saved_timeout: 455 self._port.timeout = new_timeout 456 457 try: 458 if op is not None: 459 self.trace( 460 "command op=0x%02x data len=%s wait_response=%d " 461 "timeout=%.3f data=%s", 462 op, 463 len(data), 464 1 if wait_response else 0, 465 timeout, 466 HexFormatter(data), 467 ) 468 pkt = struct.pack(b"<BBHI", 0x00, op, len(data), chk) + data 469 self.write(pkt) 470 471 if not wait_response: 472 return 473 474 # tries to get a response until that response has the 475 # same operation as the request or a retries limit has 476 # exceeded. This is needed for some esp8266s that 477 # reply with more sync responses than expected. 478 for retry in range(100): 479 p = self.read() 480 if len(p) < 8: 481 continue 482 (resp, op_ret, len_ret, val) = struct.unpack("<BBHI", p[:8]) 483 if resp != 1: 484 continue 485 data = p[8:] 486 487 if op is None or op_ret == op: 488 return val, data 489 if byte(data, 0) != 0 and byte(data, 1) == self.ROM_INVALID_RECV_MSG: 490 # Unsupported read_reg can result in 491 # more than one error response for some reason 492 self.flush_input() 493 raise UnsupportedCommandError(self, op) 494 495 finally: 496 if new_timeout != saved_timeout: 497 self._port.timeout = saved_timeout 498 499 raise FatalError("Response doesn't match request") 500 501 def check_command( 502 self, op_description, op=None, data=b"", chk=0, timeout=DEFAULT_TIMEOUT 503 ): 504 """ 505 Execute a command with 'command', check the result code and throw an appropriate 506 FatalError if it fails. 507 508 Returns the "result" of a successful command. 509 """ 510 val, data = self.command(op, data, chk, timeout=timeout) 511 512 # things are a bit weird here, bear with us 513 514 # the status bytes are the last 2/4 bytes in the data (depending on chip) 515 if len(data) < self.STATUS_BYTES_LENGTH: 516 raise FatalError( 517 "Failed to %s. Only got %d byte status response." 518 % (op_description, len(data)) 519 ) 520 status_bytes = data[-self.STATUS_BYTES_LENGTH :] 521 # only care if the first one is non-zero. If it is, the second byte is a reason. 522 if byte(status_bytes, 0) != 0: 523 raise FatalError.WithResult("Failed to %s" % op_description, status_bytes) 524 525 # if we had more data than just the status bytes, return it as the result 526 # (this is used by the md5sum command, maybe other commands?) 527 if len(data) > self.STATUS_BYTES_LENGTH: 528 return data[: -self.STATUS_BYTES_LENGTH] 529 else: 530 # otherwise, just return the 'val' field which comes from the reply header 531 # (this is used by read_reg) 532 return val 533 534 def flush_input(self): 535 self._port.flushInput() 536 self._slip_reader = slip_reader(self._port, self.trace) 537 538 def sync(self): 539 val, _ = self.command( 540 self.ESP_SYNC, b"\x07\x07\x12\x20" + 32 * b"\x55", timeout=SYNC_TIMEOUT 541 ) 542 543 # ROM bootloaders send some non-zero "val" response. The flasher stub sends 0. 544 # If we receive 0 then it probably indicates that the chip wasn't or couldn't be 545 # reset properly and esptool is talking to the flasher stub. 546 self.sync_stub_detected = val == 0 547 548 for _ in range(7): 549 val, _ = self.command() 550 self.sync_stub_detected &= val == 0 551 552 def _get_pid(self): 553 if self.cache["usb_pid"] is not None: 554 return self.cache["usb_pid"] 555 556 if list_ports is None: 557 print( 558 "\nListing all serial ports is currently not available. " 559 "Can't get device PID." 560 ) 561 return 562 active_port = self._port.port 563 564 # Pyserial only identifies regular ports, URL handlers are not supported 565 if not active_port.lower().startswith(("com", "/dev/")): 566 print( 567 "\nDevice PID identification is only supported on " 568 "COM and /dev/ serial ports." 569 ) 570 return 571 # Return the real path if the active port is a symlink 572 if active_port.startswith("/dev/") and os.path.islink(active_port): 573 active_port = os.path.realpath(active_port) 574 575 active_ports = [active_port] 576 577 # The "cu" (call-up) device has to be used for outgoing communication on MacOS 578 if sys.platform == "darwin" and "tty" in active_port: 579 active_ports.append(active_port.replace("tty", "cu")) 580 ports = list_ports.comports() 581 for p in ports: 582 if p.device in active_ports: 583 self.cache["usb_pid"] = p.pid 584 return p.pid 585 print( 586 f"\nFailed to get PID of a device on {active_port}, " 587 "using standard reset sequence." 588 ) 589 590 def _connect_attempt(self, reset_strategy, mode="default_reset"): 591 """A single connection attempt""" 592 last_error = None 593 boot_log_detected = False 594 download_mode = False 595 596 # If we're doing no_sync, we're likely communicating as a pass through 597 # with an intermediate device to the ESP32 598 if mode == "no_reset_no_sync": 599 return last_error 600 601 if mode != "no_reset": 602 if not self.USES_RFC2217: # Might block on rfc2217 ports 603 # Empty serial buffer to isolate boot log 604 self._port.reset_input_buffer() 605 606 reset_strategy() # Reset the chip to bootloader (download mode) 607 608 # Detect the ROM boot log and check actual boot mode (ESP32 and later only) 609 waiting = self._port.inWaiting() 610 read_bytes = self._port.read(waiting) 611 data = re.search( 612 b"boot:(0x[0-9a-fA-F]+)(.*waiting for download)?", read_bytes, re.DOTALL 613 ) 614 if data is not None: 615 boot_log_detected = True 616 boot_mode = data.group(1) 617 download_mode = data.group(2) is not None 618 619 for _ in range(5): 620 try: 621 self.flush_input() 622 self._port.flushOutput() 623 self.sync() 624 return None 625 except FatalError as e: 626 print(".", end="") 627 sys.stdout.flush() 628 time.sleep(0.05) 629 last_error = e 630 631 if boot_log_detected: 632 last_error = FatalError( 633 "Wrong boot mode detected ({})! " 634 "The chip needs to be in download mode.".format( 635 boot_mode.decode("utf-8") 636 ) 637 ) 638 if download_mode: 639 last_error = FatalError( 640 "Download mode successfully detected, but getting no sync reply: " 641 "The serial TX path seems to be down." 642 ) 643 return last_error 644 645 def get_memory_region(self, name): 646 """ 647 Returns a tuple of (start, end) for the memory map entry with the given name, 648 or None if it doesn't exist 649 """ 650 try: 651 return [(start, end) for (start, end, n) in self.MEMORY_MAP if n == name][0] 652 except IndexError: 653 return None 654 655 def _construct_reset_strategy_sequence(self, mode): 656 """ 657 Constructs a sequence of reset strategies based on the OS, 658 used ESP chip, external settings, and environment variables. 659 Returns a tuple of one or more reset strategies to be tried sequentially. 660 """ 661 cfg_custom_reset_sequence = cfg.get("custom_reset_sequence") 662 if cfg_custom_reset_sequence is not None: 663 return (CustomReset(self._port, cfg_custom_reset_sequence),) 664 665 cfg_reset_delay = cfg.getfloat("reset_delay") 666 if cfg_reset_delay is not None: 667 delay = extra_delay = cfg_reset_delay 668 else: 669 delay = DEFAULT_RESET_DELAY 670 extra_delay = DEFAULT_RESET_DELAY + 0.5 671 672 # This FPGA delay is for Espressif internal use 673 if ( 674 self.CHIP_NAME == "ESP32" 675 and os.environ.get("ESPTOOL_ENV_FPGA", "").strip() == "1" 676 ): 677 delay = extra_delay = 7 678 679 # USB-JTAG/Serial mode 680 if mode == "usb_reset" or self._get_pid() == self.USB_JTAG_SERIAL_PID: 681 return (USBJTAGSerialReset(self._port),) 682 683 # USB-to-Serial bridge 684 if os.name != "nt" and not self._port.name.startswith("rfc2217:"): 685 return ( 686 UnixTightReset(self._port, delay), 687 UnixTightReset(self._port, extra_delay), 688 ClassicReset(self._port, delay), 689 ClassicReset(self._port, extra_delay), 690 ) 691 692 return ( 693 ClassicReset(self._port, delay), 694 ClassicReset(self._port, extra_delay), 695 ) 696 697 def connect( 698 self, 699 mode="default_reset", 700 attempts=DEFAULT_CONNECT_ATTEMPTS, 701 detecting=False, 702 warnings=True, 703 ): 704 """Try connecting repeatedly until successful, or giving up""" 705 if warnings and mode in ["no_reset", "no_reset_no_sync"]: 706 print( 707 'WARNING: Pre-connection option "{}" was selected.'.format(mode), 708 "Connection may fail if the chip is not in bootloader " 709 "or flasher stub mode.", 710 ) 711 712 if self._port.name.startswith("socket:"): 713 mode = "no_reset" # not possible to toggle DTR/RTS over a TCP socket 714 print( 715 "Note: It's not possible to reset the chip over a TCP socket. " 716 "Automatic resetting to bootloader has been disabled, " 717 "reset the chip manually." 718 ) 719 720 print("Connecting...", end="") 721 sys.stdout.flush() 722 last_error = None 723 724 reset_sequence = self._construct_reset_strategy_sequence(mode) 725 try: 726 for _, reset_strategy in zip( 727 range(attempts) if attempts > 0 else itertools.count(), 728 itertools.cycle(reset_sequence), 729 ): 730 last_error = self._connect_attempt(reset_strategy, mode) 731 if last_error is None: 732 break 733 finally: 734 print("") # end 'Connecting...' line 735 736 if last_error is not None: 737 additional_msg = "" 738 if self.CHIP_NAME == "ESP32-C2" and self._port.baudrate < 115200: 739 additional_msg = ( 740 "\nNote: Please set a higher baud rate (--baud)" 741 " if ESP32-C2 doesn't connect" 742 " (at least 115200 Bd is recommended)." 743 ) 744 745 raise FatalError( 746 "Failed to connect to {}: {}" 747 f"{additional_msg}" 748 "\nFor troubleshooting steps visit: " 749 "https://docs.espressif.com/projects/esptool/en/latest/troubleshooting.html".format( # noqa E501 750 self.CHIP_NAME, last_error 751 ) 752 ) 753 754 if not detecting: 755 try: 756 from .targets import ROM_LIST 757 758 # check the date code registers match what we expect to see 759 chip_magic_value = self.read_reg(ESPLoader.CHIP_DETECT_MAGIC_REG_ADDR) 760 if chip_magic_value not in self.CHIP_DETECT_MAGIC_VALUE: 761 actually = None 762 for cls in ROM_LIST: 763 if chip_magic_value in cls.CHIP_DETECT_MAGIC_VALUE: 764 actually = cls 765 break 766 if warnings and actually is None: 767 print( 768 "WARNING: This chip doesn't appear to be a %s " 769 "(chip magic value 0x%08x). " 770 "Probably it is unsupported by this version of esptool." 771 % (self.CHIP_NAME, chip_magic_value) 772 ) 773 else: 774 raise FatalError( 775 "This chip is %s not %s. Wrong --chip argument?" 776 % (actually.CHIP_NAME, self.CHIP_NAME) 777 ) 778 except UnsupportedCommandError: 779 self.secure_download_mode = True 780 781 try: 782 self.check_chip_id() 783 except UnsupportedCommandError: 784 # Fix for ROM not responding in SDM, reconnect and try again 785 if self.secure_download_mode: 786 self._connect_attempt(mode, reset_sequence[0]) 787 self.check_chip_id() 788 else: 789 raise 790 self._post_connect() 791 792 def _post_connect(self): 793 """ 794 Additional initialization hook, may be overridden by the chip-specific class. 795 Gets called after connect, and after auto-detection. 796 """ 797 pass 798 799 def read_reg(self, addr, timeout=DEFAULT_TIMEOUT): 800 """Read memory address in target""" 801 # we don't call check_command here because read_reg() function is called 802 # when detecting chip type, and the way we check for success 803 # (STATUS_BYTES_LENGTH) is different for different chip types (!) 804 val, data = self.command( 805 self.ESP_READ_REG, struct.pack("<I", addr), timeout=timeout 806 ) 807 if byte(data, 0) != 0: 808 raise FatalError.WithResult( 809 "Failed to read register address %08x" % addr, data 810 ) 811 return val 812 813 def write_reg(self, addr, value, mask=0xFFFFFFFF, delay_us=0, delay_after_us=0): 814 """Write to memory address in target""" 815 command = struct.pack("<IIII", addr, value, mask, delay_us) 816 if delay_after_us > 0: 817 # add a dummy write to a date register as an excuse to have a delay 818 command += struct.pack( 819 "<IIII", self.UART_DATE_REG_ADDR, 0, 0, delay_after_us 820 ) 821 822 return self.check_command("write target memory", self.ESP_WRITE_REG, command) 823 824 def update_reg(self, addr, mask, new_val): 825 """ 826 Update register at 'addr', replace the bits masked out by 'mask' 827 with new_val. new_val is shifted left to match the LSB of 'mask' 828 829 Returns just-written value of register. 830 """ 831 shift = mask_to_shift(mask) 832 val = self.read_reg(addr) 833 val &= ~mask 834 val |= (new_val << shift) & mask 835 self.write_reg(addr, val) 836 837 return val 838 839 def mem_begin(self, size, blocks, blocksize, offset): 840 """Start downloading an application image to RAM""" 841 # check we're not going to overwrite a running stub with this data 842 if self.IS_STUB: 843 stub = StubFlasher(self.CHIP_NAME) 844 load_start = offset 845 load_end = offset + size 846 for stub_start, stub_end in [ 847 ( 848 stub.bss_start or stub.data_start, 849 stub.data_start + len(stub.data), 850 ), # DRAM = bss+data 851 (stub.text_start, stub.text_start + len(stub.text)), # IRAM 852 ]: 853 if load_start < stub_end and load_end > stub_start: 854 raise FatalError( 855 "Software loader is resident at 0x%08x-0x%08x. " 856 "Can't load binary at overlapping address range 0x%08x-0x%08x. " 857 "Either change binary loading address, or use the --no-stub " 858 "option to disable the software loader." 859 % (stub_start, stub_end, load_start, load_end) 860 ) 861 862 return self.check_command( 863 "enter RAM download mode", 864 self.ESP_MEM_BEGIN, 865 struct.pack("<IIII", size, blocks, blocksize, offset), 866 ) 867 868 def mem_block(self, data, seq): 869 """Send a block of an image to RAM""" 870 return self.check_command( 871 "write to target RAM", 872 self.ESP_MEM_DATA, 873 struct.pack("<IIII", len(data), seq, 0, 0) + data, 874 self.checksum(data), 875 ) 876 877 def mem_finish(self, entrypoint=0): 878 """Leave download mode and run the application""" 879 # Sending ESP_MEM_END usually sends a correct response back, however sometimes 880 # (with ROM loader) the executed code may reset the UART or change the baud rate 881 # before the transmit FIFO is empty. So in these cases we set a short timeout 882 # and ignore errors. 883 timeout = DEFAULT_TIMEOUT if self.IS_STUB else MEM_END_ROM_TIMEOUT 884 data = struct.pack("<II", int(entrypoint == 0), entrypoint) 885 try: 886 return self.check_command( 887 "leave RAM download mode", self.ESP_MEM_END, data=data, timeout=timeout 888 ) 889 except FatalError: 890 if self.IS_STUB: 891 raise 892 pass 893 894 def flash_begin(self, size, offset, begin_rom_encrypted=False): 895 """ 896 Start downloading to Flash (performs an erase) 897 898 Returns number of blocks (of size self.FLASH_WRITE_SIZE) to write. 899 """ 900 num_blocks = (size + self.FLASH_WRITE_SIZE - 1) // self.FLASH_WRITE_SIZE 901 erase_size = self.get_erase_size(offset, size) 902 903 t = time.time() 904 if self.IS_STUB: 905 timeout = DEFAULT_TIMEOUT 906 else: 907 timeout = timeout_per_mb( 908 ERASE_REGION_TIMEOUT_PER_MB, size 909 ) # ROM performs the erase up front 910 911 params = struct.pack( 912 "<IIII", erase_size, num_blocks, self.FLASH_WRITE_SIZE, offset 913 ) 914 if self.SUPPORTS_ENCRYPTED_FLASH and not self.IS_STUB: 915 params += struct.pack("<I", 1 if begin_rom_encrypted else 0) 916 self.check_command( 917 "enter Flash download mode", self.ESP_FLASH_BEGIN, params, timeout=timeout 918 ) 919 if size != 0 and not self.IS_STUB: 920 print("Took %.2fs to erase flash block" % (time.time() - t)) 921 return num_blocks 922 923 def flash_block(self, data, seq, timeout=DEFAULT_TIMEOUT): 924 """Write block to flash, retry if fail""" 925 for attempts_left in range(WRITE_BLOCK_ATTEMPTS - 1, -1, -1): 926 try: 927 self.check_command( 928 "write to target Flash after seq %d" % seq, 929 self.ESP_FLASH_DATA, 930 struct.pack("<IIII", len(data), seq, 0, 0) + data, 931 self.checksum(data), 932 timeout=timeout, 933 ) 934 break 935 except FatalError: 936 if attempts_left: 937 self.trace( 938 "Block write failed, " 939 f"retrying with {attempts_left} attempts left" 940 ) 941 else: 942 raise 943 944 def flash_encrypt_block(self, data, seq, timeout=DEFAULT_TIMEOUT): 945 """Encrypt, write block to flash, retry if fail""" 946 if self.SUPPORTS_ENCRYPTED_FLASH and not self.IS_STUB: 947 # ROM support performs the encrypted writes via the normal write command, 948 # triggered by flash_begin(begin_rom_encrypted=True) 949 return self.flash_block(data, seq, timeout) 950 951 for attempts_left in range(WRITE_BLOCK_ATTEMPTS - 1, -1, -1): 952 try: 953 self.check_command( 954 "Write encrypted to target Flash after seq %d" % seq, 955 self.ESP_FLASH_ENCRYPT_DATA, 956 struct.pack("<IIII", len(data), seq, 0, 0) + data, 957 self.checksum(data), 958 timeout=timeout, 959 ) 960 break 961 except FatalError: 962 if attempts_left: 963 self.trace( 964 "Encrypted block write failed, " 965 f"retrying with {attempts_left} attempts left" 966 ) 967 else: 968 raise 969 970 def flash_finish(self, reboot=False): 971 """Leave flash mode and run/reboot""" 972 pkt = struct.pack("<I", int(not reboot)) 973 # stub sends a reply to this command 974 self.check_command("leave Flash mode", self.ESP_FLASH_END, pkt) 975 976 def run(self, reboot=False): 977 """Run application code in flash""" 978 # Fake flash begin immediately followed by flash end 979 self.flash_begin(0, 0) 980 self.flash_finish(reboot) 981 982 def flash_id(self): 983 """Read SPI flash manufacturer and device id""" 984 if self.cache["flash_id"] is None: 985 SPIFLASH_RDID = 0x9F 986 self.cache["flash_id"] = self.run_spiflash_command(SPIFLASH_RDID, b"", 24) 987 return self.cache["flash_id"] 988 989 def flash_type(self): 990 """Read flash type bit field from eFuse. Returns 0, 1, None (not present)""" 991 return None # not implemented for all chip targets 992 993 def get_security_info(self): 994 res = self.check_command("get security info", self.ESP_GET_SECURITY_INFO, b"") 995 esp32s2 = True if len(res) == 12 else False 996 res = struct.unpack("<IBBBBBBBB" if esp32s2 else "<IBBBBBBBBII", res) 997 return { 998 "flags": res[0], 999 "flash_crypt_cnt": res[1], 1000 "key_purposes": res[2:9], 1001 "chip_id": None if esp32s2 else res[9], 1002 "api_version": None if esp32s2 else res[10], 1003 } 1004 1005 @esp32s3_or_newer_function_only 1006 def get_chip_id(self): 1007 if self.cache["chip_id"] is None: 1008 res = self.check_command( 1009 "get security info", self.ESP_GET_SECURITY_INFO, b"" 1010 ) 1011 res = struct.unpack( 1012 "<IBBBBBBBBI", res[:16] 1013 ) # 4b flags, 1b flash_crypt_cnt, 7*1b key_purposes, 4b chip_id 1014 self.cache["chip_id"] = res[9] # 2/4 status bytes invariant 1015 return self.cache["chip_id"] 1016 1017 def get_uart_no(self): 1018 """ 1019 Read the UARTDEV_BUF_NO register to get the number of the currently used console 1020 """ 1021 if self.cache["uart_no"] is None: 1022 self.cache["uart_no"] = self.read_reg(self.UARTDEV_BUF_NO) & 0xFF 1023 return self.cache["uart_no"] 1024 1025 @classmethod 1026 def parse_flash_size_arg(cls, arg): 1027 try: 1028 return cls.FLASH_SIZES[arg] 1029 except KeyError: 1030 raise FatalError( 1031 "Flash size '%s' is not supported by this chip type. " 1032 "Supported sizes: %s" % (arg, ", ".join(cls.FLASH_SIZES.keys())) 1033 ) 1034 1035 @classmethod 1036 def parse_flash_freq_arg(cls, arg): 1037 if arg is None: 1038 # The encoding of the default flash frequency in FLASH_FREQUENCY is always 0 1039 return 0 1040 try: 1041 return cls.FLASH_FREQUENCY[arg] 1042 except KeyError: 1043 raise FatalError( 1044 "Flash frequency '%s' is not supported by this chip type. " 1045 "Supported frequencies: %s" 1046 % (arg, ", ".join(cls.FLASH_FREQUENCY.keys())) 1047 ) 1048 1049 def run_stub(self, stub=None): 1050 if stub is None: 1051 stub = StubFlasher(self.CHIP_NAME) 1052 1053 if self.sync_stub_detected: 1054 print("Stub is already running. No upload is necessary.") 1055 return self.STUB_CLASS(self) 1056 1057 # Upload 1058 print("Uploading stub...") 1059 for field in [stub.text, stub.data]: 1060 if field is not None: 1061 offs = stub.text_start if field == stub.text else stub.data_start 1062 length = len(field) 1063 blocks = (length + self.ESP_RAM_BLOCK - 1) // self.ESP_RAM_BLOCK 1064 self.mem_begin(length, blocks, self.ESP_RAM_BLOCK, offs) 1065 for seq in range(blocks): 1066 from_offs = seq * self.ESP_RAM_BLOCK 1067 to_offs = from_offs + self.ESP_RAM_BLOCK 1068 self.mem_block(field[from_offs:to_offs], seq) 1069 print("Running stub...") 1070 self.mem_finish(stub.entry) 1071 try: 1072 p = self.read() 1073 except StopIteration: 1074 raise FatalError( 1075 "Failed to start stub. There was no response." 1076 "\nTry increasing timeouts, for more information see: " 1077 "https://docs.espressif.com/projects/esptool/en/latest/esptool/configuration-file.html" # noqa E501 1078 ) 1079 1080 if p != b"OHAI": 1081 raise FatalError(f"Failed to start stub. Unexpected response: {p}") 1082 print("Stub running...") 1083 return self.STUB_CLASS(self) 1084 1085 @stub_and_esp32_function_only 1086 def flash_defl_begin(self, size, compsize, offset): 1087 """ 1088 Start downloading compressed data to Flash (performs an erase) 1089 1090 Returns number of blocks (size self.FLASH_WRITE_SIZE) to write. 1091 """ 1092 num_blocks = (compsize + self.FLASH_WRITE_SIZE - 1) // self.FLASH_WRITE_SIZE 1093 erase_blocks = (size + self.FLASH_WRITE_SIZE - 1) // self.FLASH_WRITE_SIZE 1094 1095 t = time.time() 1096 if self.IS_STUB: 1097 write_size = ( 1098 size # stub expects number of bytes here, manages erasing internally 1099 ) 1100 timeout = DEFAULT_TIMEOUT 1101 else: 1102 write_size = ( 1103 erase_blocks * self.FLASH_WRITE_SIZE 1104 ) # ROM expects rounded up to erase block size 1105 timeout = timeout_per_mb( 1106 ERASE_REGION_TIMEOUT_PER_MB, write_size 1107 ) # ROM performs the erase up front 1108 print("Compressed %d bytes to %d..." % (size, compsize)) 1109 params = struct.pack( 1110 "<IIII", write_size, num_blocks, self.FLASH_WRITE_SIZE, offset 1111 ) 1112 if self.SUPPORTS_ENCRYPTED_FLASH and not self.IS_STUB: 1113 # extra param is to enter encrypted flash mode via ROM 1114 # (not supported currently) 1115 params += struct.pack("<I", 0) 1116 self.check_command( 1117 "enter compressed flash mode", 1118 self.ESP_FLASH_DEFL_BEGIN, 1119 params, 1120 timeout=timeout, 1121 ) 1122 if size != 0 and not self.IS_STUB: 1123 # (stub erases as it writes, but ROM loaders erase on begin) 1124 print("Took %.2fs to erase flash block" % (time.time() - t)) 1125 return num_blocks 1126 1127 @stub_and_esp32_function_only 1128 def flash_defl_block(self, data, seq, timeout=DEFAULT_TIMEOUT): 1129 """Write block to flash, send compressed, retry if fail""" 1130 for attempts_left in range(WRITE_BLOCK_ATTEMPTS - 1, -1, -1): 1131 try: 1132 self.check_command( 1133 "write compressed data to flash after seq %d" % seq, 1134 self.ESP_FLASH_DEFL_DATA, 1135 struct.pack("<IIII", len(data), seq, 0, 0) + data, 1136 self.checksum(data), 1137 timeout=timeout, 1138 ) 1139 break 1140 except FatalError: 1141 if attempts_left: 1142 self.trace( 1143 "Compressed block write failed, " 1144 f"retrying with {attempts_left} attempts left" 1145 ) 1146 else: 1147 raise 1148 1149 @stub_and_esp32_function_only 1150 def flash_defl_finish(self, reboot=False): 1151 """Leave compressed flash mode and run/reboot""" 1152 if not reboot and not self.IS_STUB: 1153 # skip sending flash_finish to ROM loader, as this 1154 # exits the bootloader. Stub doesn't do this. 1155 return 1156 pkt = struct.pack("<I", int(not reboot)) 1157 self.check_command("leave compressed flash mode", self.ESP_FLASH_DEFL_END, pkt) 1158 self.in_bootloader = False 1159 1160 @stub_and_esp32_function_only 1161 def flash_md5sum(self, addr, size): 1162 # the MD5 command returns additional bytes in the standard 1163 # command reply slot 1164 timeout = timeout_per_mb(MD5_TIMEOUT_PER_MB, size) 1165 res = self.check_command( 1166 "calculate md5sum", 1167 self.ESP_SPI_FLASH_MD5, 1168 struct.pack("<IIII", addr, size, 0, 0), 1169 timeout=timeout, 1170 ) 1171 1172 if len(res) == 32: 1173 return res.decode("utf-8") # already hex formatted 1174 elif len(res) == 16: 1175 return hexify(res).lower() 1176 else: 1177 raise FatalError("MD5Sum command returned unexpected result: %r" % res) 1178 1179 @stub_and_esp32_function_only 1180 def change_baud(self, baud): 1181 print("Changing baud rate to %d" % baud) 1182 # stub takes the new baud rate and the old one 1183 second_arg = self._port.baudrate if self.IS_STUB else 0 1184 self.command(self.ESP_CHANGE_BAUDRATE, struct.pack("<II", baud, second_arg)) 1185 print("Changed.") 1186 self._set_port_baudrate(baud) 1187 time.sleep(0.05) # get rid of crap sent during baud rate change 1188 self.flush_input() 1189 1190 @stub_function_only 1191 def erase_flash(self): 1192 # depending on flash chip model the erase may take this long (maybe longer!) 1193 self.check_command( 1194 "erase flash", self.ESP_ERASE_FLASH, timeout=CHIP_ERASE_TIMEOUT 1195 ) 1196 1197 @stub_function_only 1198 def erase_region(self, offset, size): 1199 if offset % self.FLASH_SECTOR_SIZE != 0: 1200 raise FatalError("Offset to erase from must be a multiple of 4096") 1201 if size % self.FLASH_SECTOR_SIZE != 0: 1202 raise FatalError("Size of data to erase must be a multiple of 4096") 1203 timeout = timeout_per_mb(ERASE_REGION_TIMEOUT_PER_MB, size) 1204 self.check_command( 1205 "erase region", 1206 self.ESP_ERASE_REGION, 1207 struct.pack("<II", offset, size), 1208 timeout=timeout, 1209 ) 1210 1211 def read_flash_slow(self, offset, length, progress_fn): 1212 raise NotImplementedInROMError(self, self.read_flash_slow) 1213 1214 def read_flash(self, offset, length, progress_fn=None): 1215 if not self.IS_STUB: 1216 return self.read_flash_slow(offset, length, progress_fn) # ROM-only routine 1217 1218 # issue a standard bootloader command to trigger the read 1219 self.check_command( 1220 "read flash", 1221 self.ESP_READ_FLASH, 1222 struct.pack("<IIII", offset, length, self.FLASH_SECTOR_SIZE, 64), 1223 ) 1224 # now we expect (length // block_size) SLIP frames with the data 1225 data = b"" 1226 while len(data) < length: 1227 p = self.read() 1228 data += p 1229 if len(data) < length and len(p) < self.FLASH_SECTOR_SIZE: 1230 raise FatalError( 1231 "Corrupt data, expected 0x%x bytes but received 0x%x bytes" 1232 % (self.FLASH_SECTOR_SIZE, len(p)) 1233 ) 1234 self.write(struct.pack("<I", len(data))) 1235 if progress_fn and (len(data) % 1024 == 0 or len(data) == length): 1236 progress_fn(len(data), length) 1237 if progress_fn: 1238 progress_fn(len(data), length) 1239 if len(data) > length: 1240 raise FatalError("Read more than expected") 1241 1242 digest_frame = self.read() 1243 if len(digest_frame) != 16: 1244 raise FatalError("Expected digest, got: %s" % hexify(digest_frame)) 1245 expected_digest = hexify(digest_frame).upper() 1246 digest = hashlib.md5(data).hexdigest().upper() 1247 if digest != expected_digest: 1248 raise FatalError( 1249 "Digest mismatch: expected %s, got %s" % (expected_digest, digest) 1250 ) 1251 return data 1252 1253 def flash_spi_attach(self, hspi_arg): 1254 """Send SPI attach command to enable the SPI flash pins 1255 1256 ESP8266 ROM does this when you send flash_begin, ESP32 ROM 1257 has it as a SPI command. 1258 """ 1259 # last 3 bytes in ESP_SPI_ATTACH argument are reserved values 1260 arg = struct.pack("<I", hspi_arg) 1261 if not self.IS_STUB: 1262 # ESP32 ROM loader takes additional 'is legacy' arg, which is not 1263 # currently supported in the stub loader or esptool.py 1264 # (as it's not usually needed.) 1265 is_legacy = 0 1266 arg += struct.pack("BBBB", is_legacy, 0, 0, 0) 1267 self.check_command("configure SPI flash pins", self.ESP_SPI_ATTACH, arg) 1268 1269 def flash_set_parameters(self, size): 1270 """Tell the ESP bootloader the parameters of the chip 1271 1272 Corresponds to the "flashchip" data structure that the ROM 1273 has in RAM. 1274 1275 'size' is in bytes. 1276 1277 All other flash parameters are currently hardcoded (on ESP8266 1278 these are mostly ignored by ROM code, on ESP32 I'm not sure.) 1279 """ 1280 fl_id = 0 1281 total_size = size 1282 block_size = 64 * 1024 1283 sector_size = 4 * 1024 1284 page_size = 256 1285 status_mask = 0xFFFF 1286 self.check_command( 1287 "set SPI params", 1288 self.ESP_SPI_SET_PARAMS, 1289 struct.pack( 1290 "<IIIIII", 1291 fl_id, 1292 total_size, 1293 block_size, 1294 sector_size, 1295 page_size, 1296 status_mask, 1297 ), 1298 ) 1299 1300 def run_spiflash_command( 1301 self, 1302 spiflash_command, 1303 data=b"", 1304 read_bits=0, 1305 addr=None, 1306 addr_len=0, 1307 dummy_len=0, 1308 ): 1309 """Run an arbitrary SPI flash command. 1310 1311 This function uses the "USR_COMMAND" functionality in the ESP 1312 SPI hardware, rather than the precanned commands supported by 1313 hardware. So the value of spiflash_command is an actual command 1314 byte, sent over the wire. 1315 1316 After writing command byte, writes 'data' to MOSI and then 1317 reads back 'read_bits' of reply on MISO. Result is a number. 1318 """ 1319 1320 # SPI_USR register flags 1321 SPI_USR_COMMAND = 1 << 31 1322 SPI_USR_ADDR = 1 << 30 1323 SPI_USR_DUMMY = 1 << 29 1324 SPI_USR_MISO = 1 << 28 1325 SPI_USR_MOSI = 1 << 27 1326 1327 # SPI registers, base address differs ESP32* vs 8266 1328 base = self.SPI_REG_BASE 1329 SPI_CMD_REG = base + 0x00 1330 SPI_ADDR_REG = base + 0x04 1331 SPI_USR_REG = base + self.SPI_USR_OFFS 1332 SPI_USR1_REG = base + self.SPI_USR1_OFFS 1333 SPI_USR2_REG = base + self.SPI_USR2_OFFS 1334 SPI_W0_REG = base + self.SPI_W0_OFFS 1335 1336 # following two registers are ESP32 and later chips only 1337 if self.SPI_MOSI_DLEN_OFFS is not None: 1338 # ESP32 and later chips have a more sophisticated way 1339 # to set up "user" commands 1340 def set_data_lengths(mosi_bits, miso_bits): 1341 SPI_MOSI_DLEN_REG = base + self.SPI_MOSI_DLEN_OFFS 1342 SPI_MISO_DLEN_REG = base + self.SPI_MISO_DLEN_OFFS 1343 if mosi_bits > 0: 1344 self.write_reg(SPI_MOSI_DLEN_REG, mosi_bits - 1) 1345 if miso_bits > 0: 1346 self.write_reg(SPI_MISO_DLEN_REG, miso_bits - 1) 1347 flags = 0 1348 if dummy_len > 0: 1349 flags |= dummy_len - 1 1350 if addr_len > 0: 1351 flags |= (addr_len - 1) << SPI_USR_ADDR_LEN_SHIFT 1352 if flags: 1353 self.write_reg(SPI_USR1_REG, flags) 1354 1355 else: 1356 1357 def set_data_lengths(mosi_bits, miso_bits): 1358 SPI_DATA_LEN_REG = SPI_USR1_REG 1359 SPI_MOSI_BITLEN_S = 17 1360 SPI_MISO_BITLEN_S = 8 1361 mosi_mask = 0 if (mosi_bits == 0) else (mosi_bits - 1) 1362 miso_mask = 0 if (miso_bits == 0) else (miso_bits - 1) 1363 flags = (miso_mask << SPI_MISO_BITLEN_S) | ( 1364 mosi_mask << SPI_MOSI_BITLEN_S 1365 ) 1366 if dummy_len > 0: 1367 flags |= dummy_len - 1 1368 if addr_len > 0: 1369 flags |= (addr_len - 1) << SPI_USR_ADDR_LEN_SHIFT 1370 self.write_reg(SPI_DATA_LEN_REG, flags) 1371 1372 # SPI peripheral "command" bitmasks for SPI_CMD_REG 1373 SPI_CMD_USR = 1 << 18 1374 1375 # shift values 1376 SPI_USR2_COMMAND_LEN_SHIFT = 28 1377 SPI_USR_ADDR_LEN_SHIFT = 26 1378 1379 if read_bits > 32: 1380 raise FatalError( 1381 "Reading more than 32 bits back from a SPI flash " 1382 "operation is unsupported" 1383 ) 1384 if len(data) > 64: 1385 raise FatalError( 1386 "Writing more than 64 bytes of data with one SPI " 1387 "command is unsupported" 1388 ) 1389 1390 data_bits = len(data) * 8 1391 old_spi_usr = self.read_reg(SPI_USR_REG) 1392 old_spi_usr2 = self.read_reg(SPI_USR2_REG) 1393 flags = SPI_USR_COMMAND 1394 if read_bits > 0: 1395 flags |= SPI_USR_MISO 1396 if data_bits > 0: 1397 flags |= SPI_USR_MOSI 1398 if addr_len > 0: 1399 flags |= SPI_USR_ADDR 1400 if dummy_len > 0: 1401 flags |= SPI_USR_DUMMY 1402 set_data_lengths(data_bits, read_bits) 1403 self.write_reg(SPI_USR_REG, flags) 1404 self.write_reg( 1405 SPI_USR2_REG, (7 << SPI_USR2_COMMAND_LEN_SHIFT) | spiflash_command 1406 ) 1407 if addr_len > 0: 1408 if self.SPI_ADDR_REG_MSB: 1409 addr = addr << (32 - addr_len) 1410 self.write_reg(SPI_ADDR_REG, addr) 1411 if data_bits == 0: 1412 self.write_reg(SPI_W0_REG, 0) # clear data register before we read it 1413 else: 1414 data = pad_to(data, 4, b"\00") # pad to 32-bit multiple 1415 words = struct.unpack("I" * (len(data) // 4), data) 1416 next_reg = SPI_W0_REG 1417 for word in words: 1418 self.write_reg(next_reg, word) 1419 next_reg += 4 1420 self.write_reg(SPI_CMD_REG, SPI_CMD_USR) 1421 1422 def wait_done(): 1423 for _ in range(10): 1424 if (self.read_reg(SPI_CMD_REG) & SPI_CMD_USR) == 0: 1425 return 1426 raise FatalError("SPI command did not complete in time") 1427 1428 wait_done() 1429 1430 status = self.read_reg(SPI_W0_REG) 1431 # restore some SPI controller registers 1432 self.write_reg(SPI_USR_REG, old_spi_usr) 1433 self.write_reg(SPI_USR2_REG, old_spi_usr2) 1434 return status 1435 1436 def read_spiflash_sfdp(self, addr, read_bits): 1437 CMD_RDSFDP = 0x5A 1438 return self.run_spiflash_command( 1439 CMD_RDSFDP, read_bits=read_bits, addr=addr, addr_len=24, dummy_len=8 1440 ) 1441 1442 def read_status(self, num_bytes=2): 1443 """Read up to 24 bits (num_bytes) of SPI flash status register contents 1444 via RDSR, RDSR2, RDSR3 commands 1445 1446 Not all SPI flash supports all three commands. The upper 1 or 2 1447 bytes may be 0xFF. 1448 """ 1449 SPIFLASH_RDSR = 0x05 1450 SPIFLASH_RDSR2 = 0x35 1451 SPIFLASH_RDSR3 = 0x15 1452 1453 status = 0 1454 shift = 0 1455 for cmd in [SPIFLASH_RDSR, SPIFLASH_RDSR2, SPIFLASH_RDSR3][0:num_bytes]: 1456 status += self.run_spiflash_command(cmd, read_bits=8) << shift 1457 shift += 8 1458 return status 1459 1460 def write_status(self, new_status, num_bytes=2, set_non_volatile=False): 1461 """Write up to 24 bits (num_bytes) of new status register 1462 1463 num_bytes can be 1, 2 or 3. 1464 1465 Not all flash supports the additional commands to write the 1466 second and third byte of the status register. When writing 2 1467 bytes, esptool also sends a 16-byte WRSR command (as some 1468 flash types use this instead of WRSR2.) 1469 1470 If the set_non_volatile flag is set, non-volatile bits will 1471 be set as well as volatile ones (WREN used instead of WEVSR). 1472 1473 """ 1474 SPIFLASH_WRSR = 0x01 1475 SPIFLASH_WRSR2 = 0x31 1476 SPIFLASH_WRSR3 = 0x11 1477 SPIFLASH_WEVSR = 0x50 1478 SPIFLASH_WREN = 0x06 1479 SPIFLASH_WRDI = 0x04 1480 1481 enable_cmd = SPIFLASH_WREN if set_non_volatile else SPIFLASH_WEVSR 1482 1483 # try using a 16-bit WRSR (not supported by all chips) 1484 # this may be redundant, but shouldn't hurt 1485 if num_bytes == 2: 1486 self.run_spiflash_command(enable_cmd) 1487 self.run_spiflash_command(SPIFLASH_WRSR, struct.pack("<H", new_status)) 1488 1489 # also try using individual commands 1490 # (also not supported by all chips for num_bytes 2 & 3) 1491 for cmd in [SPIFLASH_WRSR, SPIFLASH_WRSR2, SPIFLASH_WRSR3][0:num_bytes]: 1492 self.run_spiflash_command(enable_cmd) 1493 self.run_spiflash_command(cmd, struct.pack("B", new_status & 0xFF)) 1494 new_status >>= 8 1495 1496 self.run_spiflash_command(SPIFLASH_WRDI) 1497 1498 def get_crystal_freq(self): 1499 """ 1500 Figure out the crystal frequency from the UART clock divider 1501 1502 Returns a normalized value in integer MHz (only values 40 or 26 are supported) 1503 """ 1504 # The logic here is: 1505 # - We know that our baud rate and the ESP UART baud rate are roughly the same, 1506 # or we couldn't communicate 1507 # - We can read the UART clock divider register to know how the ESP derives this 1508 # from the APB bus frequency 1509 # - Multiplying these two together gives us the bus frequency which is either 1510 # the crystal frequency (ESP32) or double the crystal frequency (ESP8266). 1511 # See the self.XTAL_CLK_DIVIDER parameter for this factor. 1512 uart_div = self.read_reg(self.UART_CLKDIV_REG) & self.UART_CLKDIV_MASK 1513 est_xtal = (self._port.baudrate * uart_div) / 1e6 / self.XTAL_CLK_DIVIDER 1514 if est_xtal > 45: 1515 norm_xtal = 48 1516 elif est_xtal > 33: 1517 norm_xtal = 40 1518 else: 1519 norm_xtal = 26 1520 if abs(norm_xtal - est_xtal) > 1: 1521 print( 1522 "WARNING: Detected crystal freq %.2fMHz is quite different to " 1523 "normalized freq %dMHz. Unsupported crystal in use?" 1524 % (est_xtal, norm_xtal) 1525 ) 1526 return norm_xtal 1527 1528 def hard_reset(self): 1529 print("Hard resetting via RTS pin...") 1530 HardReset(self._port)() 1531 1532 def soft_reset(self, stay_in_bootloader): 1533 if not self.IS_STUB: 1534 if stay_in_bootloader: 1535 return # ROM bootloader is already in bootloader! 1536 else: 1537 # 'run user code' is as close to a soft reset as we can do 1538 self.flash_begin(0, 0) 1539 self.flash_finish(False) 1540 else: 1541 if stay_in_bootloader: 1542 # soft resetting from the stub loader 1543 # will re-load the ROM bootloader 1544 self.flash_begin(0, 0) 1545 self.flash_finish(True) 1546 elif self.CHIP_NAME != "ESP8266": 1547 raise FatalError( 1548 "Soft resetting is currently only supported on ESP8266" 1549 ) 1550 else: 1551 # running user code from stub loader requires some hacks 1552 # in the stub loader 1553 self.command(self.ESP_RUN_USER_CODE, wait_response=False) 1554 1555 def check_chip_id(self): 1556 try: 1557 chip_id = self.get_chip_id() 1558 if chip_id != self.IMAGE_CHIP_ID: 1559 print( 1560 "WARNING: Chip ID {} ({}) doesn't match expected Chip ID {}. " 1561 "esptool may not work correctly.".format( 1562 chip_id, 1563 self.UNSUPPORTED_CHIPS.get(chip_id, "Unknown"), 1564 self.IMAGE_CHIP_ID, 1565 ) 1566 ) 1567 # Try to flash anyways by disabling stub 1568 self.stub_is_disabled = True 1569 except NotImplementedInROMError: 1570 pass 1571 1572 1573def slip_reader(port, trace_function): 1574 """Generator to read SLIP packets from a serial port. 1575 Yields one full SLIP packet at a time, raises exception on timeout or invalid data. 1576 1577 Designed to avoid too many calls to serial.read(1), which can bog 1578 down on slow systems. 1579 """ 1580 1581 def detect_panic_handler(input): 1582 """ 1583 Checks the input bytes for panic handler messages. 1584 Raises a FatalError if Guru Meditation or Fatal Exception is found, as both 1585 of these are used between different ROM versions. 1586 Tries to also parse the error cause (e.g. IllegalInstruction). 1587 """ 1588 1589 guru_meditation = ( 1590 rb"G?uru Meditation Error: (?:Core \d panic'ed \(([a-zA-Z ]*)\))?" 1591 ) 1592 fatal_exception = rb"F?atal exception \(\d+\): (?:([a-zA-Z ]*)?.*epc)?" 1593 1594 # Search either for Guru Meditation or Fatal Exception 1595 data = re.search( 1596 rb"".join([rb"(?:", guru_meditation, rb"|", fatal_exception, rb")"]), 1597 input, 1598 re.DOTALL, 1599 ) 1600 if data is not None: 1601 cause = [ 1602 "({})".format(i.decode("utf-8")) 1603 for i in [data.group(1), data.group(2)] 1604 if i is not None 1605 ] 1606 cause = f" {cause[0]}" if len(cause) else "" 1607 msg = f"Guru Meditation Error detected{cause}" 1608 raise FatalError(msg) 1609 1610 partial_packet = None 1611 in_escape = False 1612 successful_slip = False 1613 while True: 1614 waiting = port.inWaiting() 1615 read_bytes = port.read(1 if waiting == 0 else waiting) 1616 if read_bytes == b"": 1617 if partial_packet is None: # fail due to no data 1618 msg = ( 1619 "Serial data stream stopped: Possible serial noise or corruption." 1620 if successful_slip 1621 else "No serial data received." 1622 ) 1623 else: # fail during packet transfer 1624 msg = "Packet content transfer stopped (received {} bytes)".format( 1625 len(partial_packet) 1626 ) 1627 trace_function(msg) 1628 raise FatalError(msg) 1629 trace_function("Read %d bytes: %s", len(read_bytes), HexFormatter(read_bytes)) 1630 for b in read_bytes: 1631 b = bytes([b]) 1632 if partial_packet is None: # waiting for packet header 1633 if b == b"\xc0": 1634 partial_packet = b"" 1635 else: 1636 trace_function("Read invalid data: %s", HexFormatter(read_bytes)) 1637 remaining_data = port.read(port.inWaiting()) 1638 trace_function( 1639 "Remaining data in serial buffer: %s", 1640 HexFormatter(remaining_data), 1641 ) 1642 detect_panic_handler(read_bytes + remaining_data) 1643 raise FatalError( 1644 "Invalid head of packet (0x%s): " 1645 "Possible serial noise or corruption." % hexify(b) 1646 ) 1647 elif in_escape: # part-way through escape sequence 1648 in_escape = False 1649 if b == b"\xdc": 1650 partial_packet += b"\xc0" 1651 elif b == b"\xdd": 1652 partial_packet += b"\xdb" 1653 else: 1654 trace_function("Read invalid data: %s", HexFormatter(read_bytes)) 1655 remaining_data = port.read(port.inWaiting()) 1656 trace_function( 1657 "Remaining data in serial buffer: %s", 1658 HexFormatter(remaining_data), 1659 ) 1660 detect_panic_handler(read_bytes + remaining_data) 1661 raise FatalError("Invalid SLIP escape (0xdb, 0x%s)" % (hexify(b))) 1662 elif b == b"\xdb": # start of escape sequence 1663 in_escape = True 1664 elif b == b"\xc0": # end of packet 1665 trace_function("Received full packet: %s", HexFormatter(partial_packet)) 1666 yield partial_packet 1667 partial_packet = None 1668 successful_slip = True 1669 else: # normal byte in packet 1670 partial_packet += b 1671 1672 1673class HexFormatter(object): 1674 """ 1675 Wrapper class which takes binary data in its constructor 1676 and returns a hex string as it's __str__ method. 1677 1678 This is intended for "lazy formatting" of trace() output 1679 in hex format. Avoids overhead (significant on slow computers) 1680 of generating long hex strings even if tracing is disabled. 1681 1682 Note that this doesn't save any overhead if passed as an 1683 argument to "%", only when passed to trace() 1684 1685 If auto_split is set (default), any long line (> 16 bytes) will be 1686 printed as separately indented lines, with ASCII decoding at the end 1687 of each line. 1688 """ 1689 1690 def __init__(self, binary_string, auto_split=True): 1691 self._s = binary_string 1692 self._auto_split = auto_split 1693 1694 def __str__(self): 1695 if self._auto_split and len(self._s) > 16: 1696 result = "" 1697 s = self._s 1698 while len(s) > 0: 1699 line = s[:16] 1700 ascii_line = "".join( 1701 ( 1702 c 1703 if ( 1704 c == " " 1705 or (c in string.printable and c not in string.whitespace) 1706 ) 1707 else "." 1708 ) 1709 for c in line.decode("ascii", "replace") 1710 ) 1711 s = s[16:] 1712 result += "\n %-16s %-16s | %s" % ( 1713 hexify(line[:8], False), 1714 hexify(line[8:], False), 1715 ascii_line, 1716 ) 1717 return result 1718 else: 1719 return hexify(self._s, False) 1720