1# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http:#www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15""" DUT for IDF applications """ 16import collections 17import functools 18import io 19import os 20import os.path 21import re 22import subprocess 23import sys 24import tempfile 25import time 26 27import pexpect 28import serial 29 30# python2 and python3 queue package name is different 31try: 32 import Queue as _queue 33except ImportError: 34 import queue as _queue # type: ignore 35 36from serial.tools import list_ports 37from tiny_test_fw import DUT, Utility 38 39try: 40 import esptool 41except ImportError: # cheat and use IDF's copy of esptool if available 42 idf_path = os.getenv('IDF_PATH') 43 if not idf_path or not os.path.exists(idf_path): 44 raise 45 sys.path.insert(0, os.path.join(idf_path, 'components', 'esptool_py', 'esptool')) 46 import esptool 47 48import espefuse 49import espsecure 50 51 52class IDFToolError(OSError): 53 pass 54 55 56class IDFDUTException(RuntimeError): 57 pass 58 59 60class IDFRecvThread(DUT.RecvThread): 61 62 PERFORMANCE_PATTERN = re.compile(r'\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n') 63 EXCEPTION_PATTERNS = [ 64 re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))"), 65 re.compile(r'(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)'), 66 re.compile(r'(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))') 67 ] 68 BACKTRACE_PATTERN = re.compile(r'Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)') 69 BACKTRACE_ADDRESS_PATTERN = re.compile(r'(0x[0-9a-f]{8}):0x[0-9a-f]{8}') 70 71 def __init__(self, read, dut): 72 super(IDFRecvThread, self).__init__(read, dut) 73 self.exceptions = _queue.Queue() 74 self.performance_items = _queue.Queue() 75 76 def collect_performance(self, comp_data): 77 matches = self.PERFORMANCE_PATTERN.findall(comp_data) 78 for match in matches: 79 Utility.console_log('[Performance][{}]: {}'.format(match[0], match[1]), color='orange') 80 self.performance_items.put((match[0], match[1])) 81 82 def detect_exception(self, comp_data): 83 for pattern in self.EXCEPTION_PATTERNS: 84 start = 0 85 while True: 86 match = pattern.search(comp_data, pos=start) 87 if match: 88 start = match.end() 89 self.exceptions.put(match.group(0)) 90 Utility.console_log('[Exception]: {}'.format(match.group(0)), color='red') 91 else: 92 break 93 94 def detect_backtrace(self, comp_data): 95 start = 0 96 while True: 97 match = self.BACKTRACE_PATTERN.search(comp_data, pos=start) 98 if match: 99 start = match.end() 100 Utility.console_log('[Backtrace]:{}'.format(match.group(1)), color='red') 101 # translate backtrace 102 addresses = self.BACKTRACE_ADDRESS_PATTERN.findall(match.group(1)) 103 translated_backtrace = '' 104 for addr in addresses: 105 ret = self.dut.lookup_pc_address(addr) 106 if ret: 107 translated_backtrace += ret + '\n' 108 if translated_backtrace: 109 Utility.console_log('Translated backtrace\n:' + translated_backtrace, color='yellow') 110 else: 111 Utility.console_log('Failed to translate backtrace', color='yellow') 112 else: 113 break 114 115 CHECK_FUNCTIONS = [collect_performance, detect_exception, detect_backtrace] 116 117 118def _uses_esptool(func): 119 """ Suspend listener thread, connect with esptool, 120 call target function with esptool instance, 121 then resume listening for output 122 """ 123 @functools.wraps(func) 124 def handler(self, *args, **kwargs): 125 self.stop_receive() 126 127 settings = self.port_inst.get_settings() 128 129 try: 130 if not self.rom_inst: 131 if not self.secure_boot_en: 132 self.rom_inst = esptool.ESPLoader.detect_chip(self.port_inst) 133 else: 134 self.rom_inst = self.get_rom()(self.port_inst) 135 self.rom_inst.connect('hard_reset') 136 137 if (self.secure_boot_en): 138 esp = self.rom_inst 139 esp.flash_spi_attach(0) 140 else: 141 esp = self.rom_inst.run_stub() 142 143 ret = func(self, esp, *args, **kwargs) 144 # do hard reset after use esptool 145 esp.hard_reset() 146 finally: 147 # always need to restore port settings 148 self.port_inst.apply_settings(settings) 149 150 self.start_receive() 151 152 return ret 153 return handler 154 155 156class IDFDUT(DUT.SerialDUT): 157 """ IDF DUT, extends serial with esptool methods 158 159 (Becomes aware of IDFApp instance which holds app-specific data) 160 """ 161 162 # /dev/ttyAMA0 port is listed in Raspberry Pi 163 # /dev/tty.Bluetooth-Incoming-Port port is listed in Mac 164 INVALID_PORT_PATTERN = re.compile(r'AMA|Bluetooth') 165 # if need to erase NVS partition in start app 166 ERASE_NVS = True 167 RECV_THREAD_CLS = IDFRecvThread 168 169 def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs): 170 super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs) 171 self.allow_dut_exception = allow_dut_exception 172 self.exceptions = _queue.Queue() 173 self.performance_items = _queue.Queue() 174 self.rom_inst = None 175 self.secure_boot_en = self.app.get_sdkconfig_config_value('CONFIG_SECURE_BOOT') and \ 176 not self.app.get_sdkconfig_config_value('CONFIG_EFUSE_VIRTUAL') 177 178 @classmethod 179 def get_rom(cls): 180 raise NotImplementedError('This is an abstraction class, method not defined.') 181 182 @classmethod 183 def get_mac(cls, app, port): 184 """ 185 get MAC address via esptool 186 187 :param app: application instance (to get tool) 188 :param port: serial port as string 189 :return: MAC address or None 190 """ 191 esp = None 192 try: 193 esp = cls.get_rom()(port) 194 esp.connect() 195 return esp.read_mac() 196 except RuntimeError: 197 return None 198 finally: 199 if esp: 200 # do hard reset after use esptool 201 esp.hard_reset() 202 esp._port.close() 203 204 @classmethod 205 def confirm_dut(cls, port, **kwargs): 206 inst = None 207 try: 208 expected_rom_class = cls.get_rom() 209 except NotImplementedError: 210 expected_rom_class = None 211 212 try: 213 # TODO: check whether 8266 works with this logic 214 # Otherwise overwrite it in ESP8266DUT 215 inst = esptool.ESPLoader.detect_chip(port) 216 if expected_rom_class and type(inst) != expected_rom_class: 217 raise RuntimeError('Target not expected') 218 return inst.read_mac() is not None, get_target_by_rom_class(type(inst)) 219 except(esptool.FatalError, RuntimeError): 220 return False, None 221 finally: 222 if inst is not None: 223 inst._port.close() 224 225 def _try_flash(self, erase_nvs): 226 """ 227 Called by start_app() 228 229 :return: None 230 """ 231 flash_files = [] 232 encrypt_files = [] 233 try: 234 # Open the files here to prevents us from having to seek back to 0 235 # each time. Before opening them, we have to organize the lists the 236 # way esptool.write_flash needs: 237 # If encrypt is provided, flash_files contains all the files to 238 # flash. 239 # Else, flash_files contains the files to be flashed as plain text 240 # and encrypt_files contains the ones to flash encrypted. 241 flash_files = self.app.flash_files 242 encrypt_files = self.app.encrypt_files 243 encrypt = self.app.flash_settings.get('encrypt', False) 244 if encrypt: 245 flash_files = encrypt_files 246 encrypt_files = [] 247 else: 248 flash_files = [entry 249 for entry in flash_files 250 if entry not in encrypt_files] 251 252 flash_files = [(offs, open(path, 'rb')) for (offs, path) in flash_files] 253 encrypt_files = [(offs, open(path, 'rb')) for (offs, path) in encrypt_files] 254 255 if erase_nvs: 256 address = self.app.partition_table['nvs']['offset'] 257 size = self.app.partition_table['nvs']['size'] 258 nvs_file = tempfile.TemporaryFile() 259 nvs_file.write(b'\xff' * size) 260 nvs_file.seek(0) 261 if not isinstance(address, int): 262 address = int(address, 0) 263 # We have to check whether this file needs to be added to 264 # flash_files list or encrypt_files. 265 # Get the CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT macro 266 # value. If it is set to True, then NVS is always encrypted. 267 sdkconfig_dict = self.app.get_sdkconfig() 268 macro_encryption = 'CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT' in sdkconfig_dict 269 # If the macro is not enabled (plain text flash) or all files 270 # must be encrypted, add NVS to flash_files. 271 if not macro_encryption or encrypt: 272 flash_files.append((address, nvs_file)) 273 else: 274 encrypt_files.append((address, nvs_file)) 275 276 self.write_flash_data(flash_files, encrypt_files, False, encrypt) 277 finally: 278 for (_, f) in flash_files: 279 f.close() 280 for (_, f) in encrypt_files: 281 f.close() 282 283 @_uses_esptool 284 def write_flash_data(self, esp, flash_files=None, encrypt_files=None, ignore_flash_encryption_efuse_setting=True, encrypt=False): 285 """ 286 Try flashing at a particular baud rate. 287 288 Structured this way so @_uses_esptool will reconnect each time 289 :return: None 290 """ 291 last_error = None 292 for baud_rate in [921600, 115200]: 293 try: 294 # fake flasher args object, this is a hack until 295 # esptool Python API is improved 296 class FlashArgs(object): 297 def __init__(self, attributes): 298 for key, value in attributes.items(): 299 self.__setattr__(key, value) 300 301 # write_flash expects the parameter encrypt_files to be None and not 302 # an empty list, so perform the check here 303 flash_args = FlashArgs({ 304 'flash_size': self.app.flash_settings['flash_size'], 305 'flash_mode': self.app.flash_settings['flash_mode'], 306 'flash_freq': self.app.flash_settings['flash_freq'], 307 'addr_filename': flash_files or None, 308 'encrypt_files': encrypt_files or None, 309 'no_stub': self.secure_boot_en, 310 'compress': not self.secure_boot_en, 311 'verify': False, 312 'encrypt': encrypt, 313 'ignore_flash_encryption_efuse_setting': ignore_flash_encryption_efuse_setting, 314 'erase_all': False, 315 'after': 'no_reset', 316 }) 317 318 esp.change_baud(baud_rate) 319 esptool.detect_flash_size(esp, flash_args) 320 esptool.write_flash(esp, flash_args) 321 break 322 except RuntimeError as e: 323 last_error = e 324 else: 325 raise last_error 326 327 def image_info(self, path_to_file): 328 """ 329 get hash256 of app 330 331 :param: path: path to file 332 :return: sha256 appended to app 333 """ 334 335 old_stdout = sys.stdout 336 new_stdout = io.StringIO() 337 sys.stdout = new_stdout 338 339 class Args(object): 340 def __init__(self, attributes): 341 for key, value in attributes.items(): 342 self.__setattr__(key, value) 343 344 args = Args({ 345 'chip': self.TARGET, 346 'filename': path_to_file, 347 }) 348 esptool.image_info(args) 349 output = new_stdout.getvalue() 350 sys.stdout = old_stdout 351 return output 352 353 def start_app(self, erase_nvs=ERASE_NVS): 354 """ 355 download and start app. 356 357 :param: erase_nvs: whether erase NVS partition during flash 358 :return: None 359 """ 360 self._try_flash(erase_nvs) 361 362 def start_app_no_enc(self): 363 """ 364 download and start app. 365 366 :param: erase_nvs: whether erase NVS partition during flash 367 :return: None 368 """ 369 flash_files = self.app.flash_files + self.app.encrypt_files 370 self.write_flash(flash_files) 371 372 def write_flash(self, flash_files=None, encrypt_files=None, ignore_flash_encryption_efuse_setting=True, encrypt=False): 373 """ 374 Flash files 375 376 :return: None 377 """ 378 flash_offs_files = [] 379 encrypt_offs_files = [] 380 try: 381 if flash_files: 382 flash_offs_files = [(offs, open(path, 'rb')) for (offs, path) in flash_files] 383 384 if encrypt_files: 385 encrypt_offs_files = [(offs, open(path, 'rb')) for (offs, path) in encrypt_files] 386 387 self.write_flash_data(flash_offs_files, encrypt_offs_files, ignore_flash_encryption_efuse_setting, encrypt) 388 finally: 389 for (_, f) in flash_offs_files: 390 f.close() 391 for (_, f) in encrypt_offs_files: 392 f.close() 393 394 def bootloader_flash(self): 395 """ 396 download bootloader. 397 398 :return: None 399 """ 400 bootloader_path = os.path.join(self.app.binary_path, 'bootloader', 'bootloader.bin') 401 offs = int(self.app.get_sdkconfig()['CONFIG_BOOTLOADER_OFFSET_IN_FLASH'], 0) 402 flash_files = [(offs, bootloader_path)] 403 self.write_flash(flash_files) 404 405 @_uses_esptool 406 def reset(self, esp): 407 """ 408 hard reset DUT 409 410 :return: None 411 """ 412 # decorator `_use_esptool` will do reset 413 # so we don't need to do anything in this method 414 pass 415 416 @_uses_esptool 417 def erase_partition(self, esp, partition): 418 """ 419 :param partition: partition name to erase 420 :return: None 421 """ 422 address = self.app.partition_table[partition]['offset'] 423 size = self.app.partition_table[partition]['size'] 424 esp.erase_region(address, size) 425 426 @_uses_esptool 427 def erase_flash(self, esp): 428 """ 429 erase the flash completely 430 431 :return: None 432 """ 433 esp.erase_flash() 434 435 @_uses_esptool 436 def dump_flash(self, esp, output_file, **kwargs): 437 """ 438 dump flash 439 440 :param output_file: output file name, if relative path, will use sdk path as base path. 441 :keyword partition: partition name, dump the partition. 442 ``partition`` is preferred than using ``address`` and ``size``. 443 :keyword address: dump from address (need to be used with size) 444 :keyword size: dump size (need to be used with address) 445 :return: None 446 """ 447 if os.path.isabs(output_file) is False: 448 output_file = os.path.relpath(output_file, self.app.get_log_folder()) 449 if 'partition' in kwargs: 450 partition = self.app.partition_table[kwargs['partition']] 451 _address = partition['offset'] 452 _size = partition['size'] 453 elif 'address' in kwargs and 'size' in kwargs: 454 _address = kwargs['address'] 455 _size = kwargs['size'] 456 else: 457 raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash") 458 459 content = esp.read_flash(_address, _size) 460 with open(output_file, 'wb') as f: 461 f.write(content) 462 463 @staticmethod 464 def _sort_usb_ports(ports): 465 """ 466 Move the usb ports to the very beginning 467 :param ports: list of ports 468 :return: list of ports with usb ports at beginning 469 """ 470 usb_ports = [] 471 rest_ports = [] 472 for port in ports: 473 if 'usb' in port.lower(): 474 usb_ports.append(port) 475 else: 476 rest_ports.append(port) 477 return usb_ports + rest_ports 478 479 @classmethod 480 def list_available_ports(cls): 481 # It will return other kinds of ports as well, such as ttyS* ports. 482 # Give the usb ports higher priority 483 ports = cls._sort_usb_ports([x.device for x in list_ports.comports()]) 484 espport = os.getenv('ESPPORT') 485 if not espport: 486 # It's a little hard filter out invalid port with `serial.tools.list_ports.grep()`: 487 # The check condition in `grep` is: `if r.search(port) or r.search(desc) or r.search(hwid)`. 488 # This means we need to make all 3 conditions fail, to filter out the port. 489 # So some part of the filters will not be straight forward to users. 490 # And negative regular expression (`^((?!aa|bb|cc).)*$`) is not easy to understand. 491 # Filter out invalid port by our own will be much simpler. 492 return [x for x in ports if not cls.INVALID_PORT_PATTERN.search(x)] 493 494 # On MacOs with python3.6: type of espport is already utf8 495 if isinstance(espport, type(u'')): 496 port_hint = espport 497 else: 498 port_hint = espport.decode('utf8') 499 500 # If $ESPPORT is a valid port, make it appear first in the list 501 if port_hint in ports: 502 ports.remove(port_hint) 503 return [port_hint] + ports 504 505 # On macOS, user may set ESPPORT to /dev/tty.xxx while 506 # pySerial lists only the corresponding /dev/cu.xxx port 507 if sys.platform == 'darwin' and 'tty.' in port_hint: 508 port_hint = port_hint.replace('tty.', 'cu.') 509 if port_hint in ports: 510 ports.remove(port_hint) 511 return [port_hint] + ports 512 513 return ports 514 515 def lookup_pc_address(self, pc_addr): 516 cmd = ['%saddr2line' % self.TOOLCHAIN_PREFIX, 517 '-pfiaC', '-e', self.app.elf_file, pc_addr] 518 ret = '' 519 try: 520 translation = subprocess.check_output(cmd) 521 ret = translation.decode() 522 except OSError: 523 pass 524 return ret 525 526 @staticmethod 527 def _queue_read_all(source_queue): 528 output = [] 529 while True: 530 try: 531 output.append(source_queue.get(timeout=0)) 532 except _queue.Empty: 533 break 534 return output 535 536 def _queue_copy(self, source_queue, dest_queue): 537 data = self._queue_read_all(source_queue) 538 for d in data: 539 dest_queue.put(d) 540 541 def _get_from_queue(self, queue_name): 542 self_queue = getattr(self, queue_name) 543 if self.receive_thread: 544 recv_thread_queue = getattr(self.receive_thread, queue_name) 545 self._queue_copy(recv_thread_queue, self_queue) 546 return self._queue_read_all(self_queue) 547 548 def stop_receive(self): 549 if self.receive_thread: 550 for name in ['performance_items', 'exceptions']: 551 source_queue = getattr(self.receive_thread, name) 552 dest_queue = getattr(self, name) 553 self._queue_copy(source_queue, dest_queue) 554 super(IDFDUT, self).stop_receive() 555 556 def get_exceptions(self): 557 """ Get exceptions detected by DUT receive thread. """ 558 return self._get_from_queue('exceptions') 559 560 def get_performance_items(self): 561 """ 562 DUT receive thread will automatic collect performance results with pattern ``[Performance][name]: value\n``. 563 This method is used to get all performance results. 564 565 :return: a list of performance items. 566 """ 567 return self._get_from_queue('performance_items') 568 569 def close(self): 570 super(IDFDUT, self).close() 571 if not self.allow_dut_exception and self.get_exceptions(): 572 raise IDFDUTException('DUT exception detected on {}'.format(self)) 573 574 575class ESP32DUT(IDFDUT): 576 TARGET = 'esp32' 577 TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-' 578 579 @classmethod 580 def get_rom(cls): 581 return esptool.ESP32ROM 582 583 584class ESP32S2DUT(IDFDUT): 585 TARGET = 'esp32s2' 586 TOOLCHAIN_PREFIX = 'xtensa-esp32s2-elf-' 587 588 @classmethod 589 def get_rom(cls): 590 return esptool.ESP32S2ROM 591 592 593class ESP32S3DUT(IDFDUT): 594 TARGET = 'esp32s3' 595 TOOLCHAIN_PREFIX = 'xtensa-esp32s3-elf-' 596 597 @classmethod 598 def get_rom(cls): 599 return esptool.ESP32S3ROM 600 601 def erase_partition(self, esp, partition): 602 raise NotImplementedError() 603 604 605class ESP32C3DUT(IDFDUT): 606 TARGET = 'esp32c3' 607 TOOLCHAIN_PREFIX = 'riscv32-esp-elf-' 608 609 @classmethod 610 def get_rom(cls): 611 return esptool.ESP32C3ROM 612 613 614class ESP8266DUT(IDFDUT): 615 TARGET = 'esp8266' 616 TOOLCHAIN_PREFIX = 'xtensa-lx106-elf-' 617 618 @classmethod 619 def get_rom(cls): 620 return esptool.ESP8266ROM 621 622 623def get_target_by_rom_class(cls): 624 for c in [ESP32DUT, ESP32S2DUT, ESP32S3DUT, ESP32C3DUT, ESP8266DUT, IDFQEMUDUT]: 625 if c.get_rom() == cls: 626 return c.TARGET 627 return None 628 629 630class IDFQEMUDUT(IDFDUT): 631 TARGET = None 632 TOOLCHAIN_PREFIX = None 633 ERASE_NVS = True 634 DEFAULT_EXPECT_TIMEOUT = 30 # longer timeout, since app startup takes more time in QEMU (due to slow SHA emulation) 635 QEMU_SERIAL_PORT = 3334 636 637 def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs): 638 self.flash_image = tempfile.NamedTemporaryFile('rb+', suffix='.bin', prefix='qemu_flash_img') 639 self.app = app 640 self.flash_size = 4 * 1024 * 1024 641 self._write_flash_img() 642 643 args = [ 644 'qemu-system-xtensa', 645 '-nographic', 646 '-machine', self.TARGET, 647 '-drive', 'file={},if=mtd,format=raw'.format(self.flash_image.name), 648 '-nic', 'user,model=open_eth', 649 '-serial', 'tcp::{},server,nowait'.format(self.QEMU_SERIAL_PORT), 650 '-S', 651 '-global driver=timer.esp32.timg,property=wdt_disable,value=true'] 652 # TODO(IDF-1242): generate a temporary efuse binary, pass it to QEMU 653 654 if 'QEMU_BIOS_PATH' in os.environ: 655 args += ['-L', os.environ['QEMU_BIOS_PATH']] 656 657 self.qemu = pexpect.spawn(' '.join(args), timeout=self.DEFAULT_EXPECT_TIMEOUT) 658 self.qemu.expect_exact(b'(qemu)') 659 super(IDFQEMUDUT, self).__init__(name, port, log_file, app, allow_dut_exception=allow_dut_exception, **kwargs) 660 661 def _write_flash_img(self): 662 self.flash_image.seek(0) 663 self.flash_image.write(b'\x00' * self.flash_size) 664 for offs, path in self.app.flash_files: 665 with open(path, 'rb') as flash_file: 666 contents = flash_file.read() 667 self.flash_image.seek(offs) 668 self.flash_image.write(contents) 669 self.flash_image.flush() 670 671 @classmethod 672 def get_rom(cls): 673 return esptool.ESP32ROM 674 675 @classmethod 676 def get_mac(cls, app, port): 677 # TODO(IDF-1242): get this from QEMU/efuse binary 678 return '11:22:33:44:55:66' 679 680 @classmethod 681 def confirm_dut(cls, port, **kwargs): 682 return True, cls.TARGET 683 684 def start_app(self, erase_nvs=ERASE_NVS): 685 # TODO: implement erase_nvs 686 # since the flash image is generated every time in the constructor, maybe this isn't needed... 687 self.qemu.sendline(b'cont\n') 688 self.qemu.expect_exact(b'(qemu)') 689 690 def reset(self): 691 self.qemu.sendline(b'system_reset\n') 692 self.qemu.expect_exact(b'(qemu)') 693 694 def erase_partition(self, partition): 695 raise NotImplementedError('method erase_partition not implemented') 696 697 def erase_flash(self): 698 raise NotImplementedError('method erase_flash not implemented') 699 700 def dump_flash(self, output_file, **kwargs): 701 raise NotImplementedError('method dump_flash not implemented') 702 703 @classmethod 704 def list_available_ports(cls): 705 return ['socket://localhost:{}'.format(cls.QEMU_SERIAL_PORT)] 706 707 def close(self): 708 super(IDFQEMUDUT, self).close() 709 self.qemu.sendline(b'q\n') 710 self.qemu.expect_exact(b'(qemu)') 711 for _ in range(self.DEFAULT_EXPECT_TIMEOUT): 712 if not self.qemu.isalive(): 713 break 714 time.sleep(1) 715 else: 716 self.qemu.terminate(force=True) 717 718 719class ESP32QEMUDUT(IDFQEMUDUT): 720 TARGET = 'esp32' # type: ignore 721 TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-' # type: ignore 722 723 724class IDFFPGADUT(IDFDUT): 725 TARGET = None # type: str 726 TOOLCHAIN_PREFIX = None # type: str 727 ERASE_NVS = True 728 FLASH_ENCRYPT_SCHEME = None # type: str 729 FLASH_ENCRYPT_CNT_KEY = None # type: str 730 FLASH_ENCRYPT_CNT_VAL = 0 731 FLASH_ENCRYPT_PURPOSE = None # type: str 732 SECURE_BOOT_EN_KEY = None # type: str 733 SECURE_BOOT_EN_VAL = 0 734 FLASH_SECTOR_SIZE = 4096 735 736 def __init__(self, name, port, log_file, app, allow_dut_exception=False, efuse_reset_port=None, **kwargs): 737 super(IDFFPGADUT, self).__init__(name, port, log_file, app, allow_dut_exception=allow_dut_exception, **kwargs) 738 self.esp = self.get_rom()(port) 739 self.efuses = None 740 self.efuse_operations = None 741 self.efuse_reset_port = efuse_reset_port 742 743 @classmethod 744 def get_rom(cls): 745 raise NotImplementedError('This is an abstraction class, method not defined.') 746 747 def erase_partition(self, esp, partition): 748 raise NotImplementedError() 749 750 def enable_efuses(self): 751 # We use an extra COM port to reset the efuses on FPGA. 752 # Connect DTR pin of the COM port to the efuse reset pin on daughter board 753 # Set EFUSEPORT env variable to the extra COM port 754 if not self.efuse_reset_port: 755 raise RuntimeError('EFUSEPORT not specified') 756 757 # Stop any previous serial port operation 758 self.stop_receive() 759 if self.secure_boot_en: 760 self.esp.connect() 761 self.efuses, self.efuse_operations = espefuse.get_efuses(self.esp, False, False, True) 762 763 def burn_efuse(self, field, val): 764 if not self.efuse_operations: 765 self.enable_efuses() 766 BurnEfuseArgs = collections.namedtuple('burn_efuse_args', ['name_value_pairs', 'only_burn_at_end']) 767 args = BurnEfuseArgs({field: val}, False) 768 self.efuse_operations.burn_efuse(self.esp, self.efuses, args) 769 770 def burn_efuse_key(self, key, purpose, block): 771 if not self.efuse_operations: 772 self.enable_efuses() 773 BurnKeyArgs = collections.namedtuple('burn_key_args', 774 ['keyfile', 'keypurpose', 'block', 775 'force_write_always', 'no_write_protect', 'no_read_protect', 'only_burn_at_end']) 776 args = BurnKeyArgs([key], 777 [purpose], 778 [block], 779 False, False, False, False) 780 self.efuse_operations.burn_key(self.esp, self.efuses, args) 781 782 def burn_efuse_key_digest(self, key, purpose, block): 783 if not self.efuse_operations: 784 self.enable_efuses() 785 BurnDigestArgs = collections.namedtuple('burn_key_digest_args', 786 ['keyfile', 'keypurpose', 'block', 787 'force_write_always', 'no_write_protect', 'no_read_protect', 'only_burn_at_end']) 788 args = BurnDigestArgs([open(key, 'rb')], 789 [purpose], 790 [block], 791 False, False, True, False) 792 self.efuse_operations.burn_key_digest(self.esp, self.efuses, args) 793 794 def reset_efuses(self): 795 if not self.efuse_reset_port: 796 raise RuntimeError('EFUSEPORT not specified') 797 with serial.Serial(self.efuse_reset_port) as efuseport: 798 print('Resetting efuses') 799 efuseport.dtr = 0 800 self.port_inst.setRTS(1) 801 self.port_inst.setRTS(0) 802 time.sleep(1) 803 efuseport.dtr = 1 804 self.efuse_operations = None 805 self.efuses = None 806 807 def sign_data(self, data_file, key_files, version, append_signature=0): 808 SignDataArgs = collections.namedtuple('sign_data_args', 809 ['datafile','keyfile','output', 'version', 'append_signatures']) 810 outfile = tempfile.NamedTemporaryFile() 811 args = SignDataArgs(data_file, key_files, outfile.name, str(version), append_signature) 812 espsecure.sign_data(args) 813 outfile.seek(0) 814 return outfile.read() 815 816 817class ESP32C3FPGADUT(IDFFPGADUT): 818 TARGET = 'esp32c3' 819 TOOLCHAIN_PREFIX = 'riscv32-esp-elf-' 820 FLASH_ENCRYPT_SCHEME = 'AES-XTS' 821 FLASH_ENCRYPT_CNT_KEY = 'SPI_BOOT_CRYPT_CNT' 822 FLASH_ENCRYPT_CNT_VAL = 1 823 FLASH_ENCRYPT_PURPOSE = 'XTS_AES_128_KEY' 824 SECURE_BOOT_EN_KEY = 'SECURE_BOOT_EN' 825 SECURE_BOOT_EN_VAL = 1 826 827 @classmethod 828 def get_rom(cls): 829 return esptool.ESP32C3ROM 830 831 def erase_partition(self, esp, partition): 832 raise NotImplementedError() 833 834 def flash_encrypt_burn_cnt(self): 835 self.burn_efuse(self.FLASH_ENCRYPT_CNT_KEY, self.FLASH_ENCRYPT_CNT_VAL) 836 837 def flash_encrypt_burn_key(self, key, block=0): 838 self.burn_efuse_key(key, self.FLASH_ENCRYPT_PURPOSE, 'BLOCK_KEY%d' % block) 839 840 def flash_encrypt_get_scheme(self): 841 return self.FLASH_ENCRYPT_SCHEME 842 843 def secure_boot_burn_en_bit(self): 844 self.burn_efuse(self.SECURE_BOOT_EN_KEY, self.SECURE_BOOT_EN_VAL) 845 846 def secure_boot_burn_digest(self, digest, key_index=0, block=0): 847 self.burn_efuse_key_digest(digest, 'SECURE_BOOT_DIGEST%d' % key_index, 'BLOCK_KEY%d' % block) 848 849 @classmethod 850 def confirm_dut(cls, port, **kwargs): 851 return True, cls.TARGET 852 853 854class ESP32S3FPGADUT(IDFFPGADUT): 855 TARGET = 'esp32s3' 856 TOOLCHAIN_PREFIX = 'xtensa-esp32s3-elf-' 857 FLASH_ENCRYPT_SCHEME = 'AES-XTS' 858 FLASH_ENCRYPT_CNT_KEY = 'SPI_BOOT_CRYPT_CNT' 859 FLASH_ENCRYPT_CNT_VAL = 1 860 FLASH_ENCRYPT_PURPOSE = 'XTS_AES_128_KEY' 861 SECURE_BOOT_EN_KEY = 'SECURE_BOOT_EN' 862 SECURE_BOOT_EN_VAL = 1 863 864 @classmethod 865 def get_rom(cls): 866 return esptool.ESP32S3ROM 867 868 def erase_partition(self, esp, partition): 869 raise NotImplementedError() 870 871 def flash_encrypt_burn_cnt(self): 872 self.burn_efuse(self.FLASH_ENCRYPT_CNT_KEY, self.FLASH_ENCRYPT_CNT_VAL) 873 874 def flash_encrypt_burn_key(self, key, block=0): 875 self.burn_efuse_key(key, self.FLASH_ENCRYPT_PURPOSE, 'BLOCK_KEY%d' % block) 876 877 def flash_encrypt_get_scheme(self): 878 return self.FLASH_ENCRYPT_SCHEME 879 880 def secure_boot_burn_en_bit(self): 881 self.burn_efuse(self.SECURE_BOOT_EN_KEY, self.SECURE_BOOT_EN_VAL) 882 883 def secure_boot_burn_digest(self, digest, key_index=0, block=0): 884 self.burn_efuse_key_digest(digest, 'SECURE_BOOT_DIGEST%d' % key_index, 'BLOCK_KEY%d' % block) 885 886 @classmethod 887 def confirm_dut(cls, port, **kwargs): 888 return True, cls.TARGET 889