1# Copyright 2015-2017 Espressif Systems (Shanghai) PTE LTD
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http:#www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15""" DUT for IDF applications """
16import collections
17import functools
18import io
19import os
20import os.path
21import re
22import subprocess
23import sys
24import tempfile
25import time
26
27import pexpect
28import serial
29
30# python2 and python3 queue package name is different
31try:
32    import Queue as _queue
33except ImportError:
34    import queue as _queue  # type: ignore
35
36from serial.tools import list_ports
37from tiny_test_fw import DUT, Utility
38
39try:
40    import esptool
41except ImportError:  # cheat and use IDF's copy of esptool if available
42    idf_path = os.getenv('IDF_PATH')
43    if not idf_path or not os.path.exists(idf_path):
44        raise
45    sys.path.insert(0, os.path.join(idf_path, 'components', 'esptool_py', 'esptool'))
46    import esptool
47
48import espefuse
49import espsecure
50
51
52class IDFToolError(OSError):
53    pass
54
55
56class IDFDUTException(RuntimeError):
57    pass
58
59
60class IDFRecvThread(DUT.RecvThread):
61
62    PERFORMANCE_PATTERN = re.compile(r'\[Performance]\[(\w+)]: ([^\r\n]+)\r?\n')
63    EXCEPTION_PATTERNS = [
64        re.compile(r"(Guru Meditation Error: Core\s+\d panic'ed \([\w].*?\))"),
65        re.compile(r'(abort\(\) was called at PC 0x[a-fA-F\d]{8} on core \d)'),
66        re.compile(r'(rst 0x\d+ \(TG\dWDT_SYS_RESET|TGWDT_CPU_RESET\))')
67    ]
68    BACKTRACE_PATTERN = re.compile(r'Backtrace:((\s(0x[0-9a-f]{8}):0x[0-9a-f]{8})+)')
69    BACKTRACE_ADDRESS_PATTERN = re.compile(r'(0x[0-9a-f]{8}):0x[0-9a-f]{8}')
70
71    def __init__(self, read, dut):
72        super(IDFRecvThread, self).__init__(read, dut)
73        self.exceptions = _queue.Queue()
74        self.performance_items = _queue.Queue()
75
76    def collect_performance(self, comp_data):
77        matches = self.PERFORMANCE_PATTERN.findall(comp_data)
78        for match in matches:
79            Utility.console_log('[Performance][{}]: {}'.format(match[0], match[1]), color='orange')
80            self.performance_items.put((match[0], match[1]))
81
82    def detect_exception(self, comp_data):
83        for pattern in self.EXCEPTION_PATTERNS:
84            start = 0
85            while True:
86                match = pattern.search(comp_data, pos=start)
87                if match:
88                    start = match.end()
89                    self.exceptions.put(match.group(0))
90                    Utility.console_log('[Exception]: {}'.format(match.group(0)), color='red')
91                else:
92                    break
93
94    def detect_backtrace(self, comp_data):
95        start = 0
96        while True:
97            match = self.BACKTRACE_PATTERN.search(comp_data, pos=start)
98            if match:
99                start = match.end()
100                Utility.console_log('[Backtrace]:{}'.format(match.group(1)), color='red')
101                # translate backtrace
102                addresses = self.BACKTRACE_ADDRESS_PATTERN.findall(match.group(1))
103                translated_backtrace = ''
104                for addr in addresses:
105                    ret = self.dut.lookup_pc_address(addr)
106                    if ret:
107                        translated_backtrace += ret + '\n'
108                if translated_backtrace:
109                    Utility.console_log('Translated backtrace\n:' + translated_backtrace, color='yellow')
110                else:
111                    Utility.console_log('Failed to translate backtrace', color='yellow')
112            else:
113                break
114
115    CHECK_FUNCTIONS = [collect_performance, detect_exception, detect_backtrace]
116
117
118def _uses_esptool(func):
119    """ Suspend listener thread, connect with esptool,
120    call target function with esptool instance,
121    then resume listening for output
122    """
123    @functools.wraps(func)
124    def handler(self, *args, **kwargs):
125        self.stop_receive()
126
127        settings = self.port_inst.get_settings()
128
129        try:
130            if not self.rom_inst:
131                if not self.secure_boot_en:
132                    self.rom_inst = esptool.ESPLoader.detect_chip(self.port_inst)
133                else:
134                    self.rom_inst = self.get_rom()(self.port_inst)
135            self.rom_inst.connect('hard_reset')
136
137            if (self.secure_boot_en):
138                esp = self.rom_inst
139                esp.flash_spi_attach(0)
140            else:
141                esp = self.rom_inst.run_stub()
142
143            ret = func(self, esp, *args, **kwargs)
144            # do hard reset after use esptool
145            esp.hard_reset()
146        finally:
147            # always need to restore port settings
148            self.port_inst.apply_settings(settings)
149
150        self.start_receive()
151
152        return ret
153    return handler
154
155
156class IDFDUT(DUT.SerialDUT):
157    """ IDF DUT, extends serial with esptool methods
158
159    (Becomes aware of IDFApp instance which holds app-specific data)
160    """
161
162    # /dev/ttyAMA0 port is listed in Raspberry Pi
163    # /dev/tty.Bluetooth-Incoming-Port port is listed in Mac
164    INVALID_PORT_PATTERN = re.compile(r'AMA|Bluetooth')
165    # if need to erase NVS partition in start app
166    ERASE_NVS = True
167    RECV_THREAD_CLS = IDFRecvThread
168
169    def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
170        super(IDFDUT, self).__init__(name, port, log_file, app, **kwargs)
171        self.allow_dut_exception = allow_dut_exception
172        self.exceptions = _queue.Queue()
173        self.performance_items = _queue.Queue()
174        self.rom_inst = None
175        self.secure_boot_en = self.app.get_sdkconfig_config_value('CONFIG_SECURE_BOOT') and \
176            not self.app.get_sdkconfig_config_value('CONFIG_EFUSE_VIRTUAL')
177
178    @classmethod
179    def get_rom(cls):
180        raise NotImplementedError('This is an abstraction class, method not defined.')
181
182    @classmethod
183    def get_mac(cls, app, port):
184        """
185        get MAC address via esptool
186
187        :param app: application instance (to get tool)
188        :param port: serial port as string
189        :return: MAC address or None
190        """
191        esp = None
192        try:
193            esp = cls.get_rom()(port)
194            esp.connect()
195            return esp.read_mac()
196        except RuntimeError:
197            return None
198        finally:
199            if esp:
200                # do hard reset after use esptool
201                esp.hard_reset()
202                esp._port.close()
203
204    @classmethod
205    def confirm_dut(cls, port, **kwargs):
206        inst = None
207        try:
208            expected_rom_class = cls.get_rom()
209        except NotImplementedError:
210            expected_rom_class = None
211
212        try:
213            # TODO: check whether 8266 works with this logic
214            # Otherwise overwrite it in ESP8266DUT
215            inst = esptool.ESPLoader.detect_chip(port)
216            if expected_rom_class and type(inst) != expected_rom_class:
217                raise RuntimeError('Target not expected')
218            return inst.read_mac() is not None, get_target_by_rom_class(type(inst))
219        except(esptool.FatalError, RuntimeError):
220            return False, None
221        finally:
222            if inst is not None:
223                inst._port.close()
224
225    def _try_flash(self, erase_nvs):
226        """
227        Called by start_app()
228
229        :return: None
230        """
231        flash_files = []
232        encrypt_files = []
233        try:
234            # Open the files here to prevents us from having to seek back to 0
235            # each time. Before opening them, we have to organize the lists the
236            # way esptool.write_flash needs:
237            # If encrypt is provided, flash_files contains all the files to
238            # flash.
239            # Else, flash_files contains the files to be flashed as plain text
240            # and encrypt_files contains the ones to flash encrypted.
241            flash_files = self.app.flash_files
242            encrypt_files = self.app.encrypt_files
243            encrypt = self.app.flash_settings.get('encrypt', False)
244            if encrypt:
245                flash_files = encrypt_files
246                encrypt_files = []
247            else:
248                flash_files = [entry
249                               for entry in flash_files
250                               if entry not in encrypt_files]
251
252            flash_files = [(offs, open(path, 'rb')) for (offs, path) in flash_files]
253            encrypt_files = [(offs, open(path, 'rb')) for (offs, path) in encrypt_files]
254
255            if erase_nvs:
256                address = self.app.partition_table['nvs']['offset']
257                size = self.app.partition_table['nvs']['size']
258                nvs_file = tempfile.TemporaryFile()
259                nvs_file.write(b'\xff' * size)
260                nvs_file.seek(0)
261                if not isinstance(address, int):
262                    address = int(address, 0)
263                # We have to check whether this file needs to be added to
264                # flash_files list or encrypt_files.
265                # Get the CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT macro
266                # value. If it is set to True, then NVS is always encrypted.
267                sdkconfig_dict = self.app.get_sdkconfig()
268                macro_encryption = 'CONFIG_SECURE_FLASH_ENCRYPTION_MODE_DEVELOPMENT' in sdkconfig_dict
269                # If the macro is not enabled (plain text flash) or all files
270                # must be encrypted, add NVS to flash_files.
271                if not macro_encryption or encrypt:
272                    flash_files.append((address, nvs_file))
273                else:
274                    encrypt_files.append((address, nvs_file))
275
276            self.write_flash_data(flash_files, encrypt_files, False, encrypt)
277        finally:
278            for (_, f) in flash_files:
279                f.close()
280            for (_, f) in encrypt_files:
281                f.close()
282
283    @_uses_esptool
284    def write_flash_data(self, esp, flash_files=None, encrypt_files=None, ignore_flash_encryption_efuse_setting=True, encrypt=False):
285        """
286        Try flashing at a particular baud rate.
287
288        Structured this way so @_uses_esptool will reconnect each time
289        :return: None
290        """
291        last_error = None
292        for baud_rate in [921600, 115200]:
293            try:
294                # fake flasher args object, this is a hack until
295                # esptool Python API is improved
296                class FlashArgs(object):
297                    def __init__(self, attributes):
298                        for key, value in attributes.items():
299                            self.__setattr__(key, value)
300
301                # write_flash expects the parameter encrypt_files to be None and not
302                # an empty list, so perform the check here
303                flash_args = FlashArgs({
304                    'flash_size': self.app.flash_settings['flash_size'],
305                    'flash_mode': self.app.flash_settings['flash_mode'],
306                    'flash_freq': self.app.flash_settings['flash_freq'],
307                    'addr_filename': flash_files or None,
308                    'encrypt_files': encrypt_files or None,
309                    'no_stub': self.secure_boot_en,
310                    'compress': not self.secure_boot_en,
311                    'verify': False,
312                    'encrypt': encrypt,
313                    'ignore_flash_encryption_efuse_setting': ignore_flash_encryption_efuse_setting,
314                    'erase_all': False,
315                    'after': 'no_reset',
316                })
317
318                esp.change_baud(baud_rate)
319                esptool.detect_flash_size(esp, flash_args)
320                esptool.write_flash(esp, flash_args)
321                break
322            except RuntimeError as e:
323                last_error = e
324        else:
325            raise last_error
326
327    def image_info(self, path_to_file):
328        """
329        get hash256 of app
330
331        :param: path: path to file
332        :return: sha256 appended to app
333        """
334
335        old_stdout = sys.stdout
336        new_stdout = io.StringIO()
337        sys.stdout = new_stdout
338
339        class Args(object):
340            def __init__(self, attributes):
341                for key, value in attributes.items():
342                    self.__setattr__(key, value)
343
344        args = Args({
345            'chip': self.TARGET,
346            'filename': path_to_file,
347        })
348        esptool.image_info(args)
349        output = new_stdout.getvalue()
350        sys.stdout = old_stdout
351        return output
352
353    def start_app(self, erase_nvs=ERASE_NVS):
354        """
355        download and start app.
356
357        :param: erase_nvs: whether erase NVS partition during flash
358        :return: None
359        """
360        self._try_flash(erase_nvs)
361
362    def start_app_no_enc(self):
363        """
364        download and start app.
365
366        :param: erase_nvs: whether erase NVS partition during flash
367        :return: None
368        """
369        flash_files = self.app.flash_files + self.app.encrypt_files
370        self.write_flash(flash_files)
371
372    def write_flash(self, flash_files=None, encrypt_files=None, ignore_flash_encryption_efuse_setting=True, encrypt=False):
373        """
374        Flash files
375
376        :return: None
377        """
378        flash_offs_files = []
379        encrypt_offs_files = []
380        try:
381            if flash_files:
382                flash_offs_files = [(offs, open(path, 'rb')) for (offs, path) in flash_files]
383
384            if encrypt_files:
385                encrypt_offs_files = [(offs, open(path, 'rb')) for (offs, path) in encrypt_files]
386
387            self.write_flash_data(flash_offs_files, encrypt_offs_files, ignore_flash_encryption_efuse_setting, encrypt)
388        finally:
389            for (_, f) in flash_offs_files:
390                f.close()
391            for (_, f) in encrypt_offs_files:
392                f.close()
393
394    def bootloader_flash(self):
395        """
396        download bootloader.
397
398        :return: None
399        """
400        bootloader_path = os.path.join(self.app.binary_path, 'bootloader', 'bootloader.bin')
401        offs = int(self.app.get_sdkconfig()['CONFIG_BOOTLOADER_OFFSET_IN_FLASH'], 0)
402        flash_files = [(offs, bootloader_path)]
403        self.write_flash(flash_files)
404
405    @_uses_esptool
406    def reset(self, esp):
407        """
408        hard reset DUT
409
410        :return: None
411        """
412        # decorator `_use_esptool` will do reset
413        # so we don't need to do anything in this method
414        pass
415
416    @_uses_esptool
417    def erase_partition(self, esp, partition):
418        """
419        :param partition: partition name to erase
420        :return: None
421        """
422        address = self.app.partition_table[partition]['offset']
423        size = self.app.partition_table[partition]['size']
424        esp.erase_region(address, size)
425
426    @_uses_esptool
427    def erase_flash(self, esp):
428        """
429        erase the flash completely
430
431        :return: None
432        """
433        esp.erase_flash()
434
435    @_uses_esptool
436    def dump_flash(self, esp, output_file, **kwargs):
437        """
438        dump flash
439
440        :param output_file: output file name, if relative path, will use sdk path as base path.
441        :keyword partition: partition name, dump the partition.
442                            ``partition`` is preferred than using ``address`` and ``size``.
443        :keyword address: dump from address (need to be used with size)
444        :keyword size: dump size (need to be used with address)
445        :return: None
446        """
447        if os.path.isabs(output_file) is False:
448            output_file = os.path.relpath(output_file, self.app.get_log_folder())
449        if 'partition' in kwargs:
450            partition = self.app.partition_table[kwargs['partition']]
451            _address = partition['offset']
452            _size = partition['size']
453        elif 'address' in kwargs and 'size' in kwargs:
454            _address = kwargs['address']
455            _size = kwargs['size']
456        else:
457            raise IDFToolError("You must specify 'partition' or ('address' and 'size') to dump flash")
458
459        content = esp.read_flash(_address, _size)
460        with open(output_file, 'wb') as f:
461            f.write(content)
462
463    @staticmethod
464    def _sort_usb_ports(ports):
465        """
466        Move the usb ports to the very beginning
467        :param ports: list of ports
468        :return: list of ports with usb ports at beginning
469        """
470        usb_ports = []
471        rest_ports = []
472        for port in ports:
473            if 'usb' in port.lower():
474                usb_ports.append(port)
475            else:
476                rest_ports.append(port)
477        return usb_ports + rest_ports
478
479    @classmethod
480    def list_available_ports(cls):
481        # It will return other kinds of ports as well, such as ttyS* ports.
482        # Give the usb ports higher priority
483        ports = cls._sort_usb_ports([x.device for x in list_ports.comports()])
484        espport = os.getenv('ESPPORT')
485        if not espport:
486            # It's a little hard filter out invalid port with `serial.tools.list_ports.grep()`:
487            # The check condition in `grep` is: `if r.search(port) or r.search(desc) or r.search(hwid)`.
488            # This means we need to make all 3 conditions fail, to filter out the port.
489            # So some part of the filters will not be straight forward to users.
490            # And negative regular expression (`^((?!aa|bb|cc).)*$`) is not easy to understand.
491            # Filter out invalid port by our own will be much simpler.
492            return [x for x in ports if not cls.INVALID_PORT_PATTERN.search(x)]
493
494        # On MacOs with python3.6: type of espport is already utf8
495        if isinstance(espport, type(u'')):
496            port_hint = espport
497        else:
498            port_hint = espport.decode('utf8')
499
500        # If $ESPPORT is a valid port, make it appear first in the list
501        if port_hint in ports:
502            ports.remove(port_hint)
503            return [port_hint] + ports
504
505        # On macOS, user may set ESPPORT to /dev/tty.xxx while
506        # pySerial lists only the corresponding /dev/cu.xxx port
507        if sys.platform == 'darwin' and 'tty.' in port_hint:
508            port_hint = port_hint.replace('tty.', 'cu.')
509            if port_hint in ports:
510                ports.remove(port_hint)
511                return [port_hint] + ports
512
513        return ports
514
515    def lookup_pc_address(self, pc_addr):
516        cmd = ['%saddr2line' % self.TOOLCHAIN_PREFIX,
517               '-pfiaC', '-e', self.app.elf_file, pc_addr]
518        ret = ''
519        try:
520            translation = subprocess.check_output(cmd)
521            ret = translation.decode()
522        except OSError:
523            pass
524        return ret
525
526    @staticmethod
527    def _queue_read_all(source_queue):
528        output = []
529        while True:
530            try:
531                output.append(source_queue.get(timeout=0))
532            except _queue.Empty:
533                break
534        return output
535
536    def _queue_copy(self, source_queue, dest_queue):
537        data = self._queue_read_all(source_queue)
538        for d in data:
539            dest_queue.put(d)
540
541    def _get_from_queue(self, queue_name):
542        self_queue = getattr(self, queue_name)
543        if self.receive_thread:
544            recv_thread_queue = getattr(self.receive_thread, queue_name)
545            self._queue_copy(recv_thread_queue, self_queue)
546        return self._queue_read_all(self_queue)
547
548    def stop_receive(self):
549        if self.receive_thread:
550            for name in ['performance_items', 'exceptions']:
551                source_queue = getattr(self.receive_thread, name)
552                dest_queue = getattr(self, name)
553                self._queue_copy(source_queue, dest_queue)
554        super(IDFDUT, self).stop_receive()
555
556    def get_exceptions(self):
557        """ Get exceptions detected by DUT receive thread. """
558        return self._get_from_queue('exceptions')
559
560    def get_performance_items(self):
561        """
562        DUT receive thread will automatic collect performance results with pattern ``[Performance][name]: value\n``.
563        This method is used to get all performance results.
564
565        :return: a list of performance items.
566        """
567        return self._get_from_queue('performance_items')
568
569    def close(self):
570        super(IDFDUT, self).close()
571        if not self.allow_dut_exception and self.get_exceptions():
572            raise IDFDUTException('DUT exception detected on {}'.format(self))
573
574
575class ESP32DUT(IDFDUT):
576    TARGET = 'esp32'
577    TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-'
578
579    @classmethod
580    def get_rom(cls):
581        return esptool.ESP32ROM
582
583
584class ESP32S2DUT(IDFDUT):
585    TARGET = 'esp32s2'
586    TOOLCHAIN_PREFIX = 'xtensa-esp32s2-elf-'
587
588    @classmethod
589    def get_rom(cls):
590        return esptool.ESP32S2ROM
591
592
593class ESP32S3DUT(IDFDUT):
594    TARGET = 'esp32s3'
595    TOOLCHAIN_PREFIX = 'xtensa-esp32s3-elf-'
596
597    @classmethod
598    def get_rom(cls):
599        return esptool.ESP32S3ROM
600
601    def erase_partition(self, esp, partition):
602        raise NotImplementedError()
603
604
605class ESP32C3DUT(IDFDUT):
606    TARGET = 'esp32c3'
607    TOOLCHAIN_PREFIX = 'riscv32-esp-elf-'
608
609    @classmethod
610    def get_rom(cls):
611        return esptool.ESP32C3ROM
612
613
614class ESP8266DUT(IDFDUT):
615    TARGET = 'esp8266'
616    TOOLCHAIN_PREFIX = 'xtensa-lx106-elf-'
617
618    @classmethod
619    def get_rom(cls):
620        return esptool.ESP8266ROM
621
622
623def get_target_by_rom_class(cls):
624    for c in [ESP32DUT, ESP32S2DUT, ESP32S3DUT, ESP32C3DUT, ESP8266DUT, IDFQEMUDUT]:
625        if c.get_rom() == cls:
626            return c.TARGET
627    return None
628
629
630class IDFQEMUDUT(IDFDUT):
631    TARGET = None
632    TOOLCHAIN_PREFIX = None
633    ERASE_NVS = True
634    DEFAULT_EXPECT_TIMEOUT = 30  # longer timeout, since app startup takes more time in QEMU (due to slow SHA emulation)
635    QEMU_SERIAL_PORT = 3334
636
637    def __init__(self, name, port, log_file, app, allow_dut_exception=False, **kwargs):
638        self.flash_image = tempfile.NamedTemporaryFile('rb+', suffix='.bin', prefix='qemu_flash_img')
639        self.app = app
640        self.flash_size = 4 * 1024 * 1024
641        self._write_flash_img()
642
643        args = [
644            'qemu-system-xtensa',
645            '-nographic',
646            '-machine', self.TARGET,
647            '-drive', 'file={},if=mtd,format=raw'.format(self.flash_image.name),
648            '-nic', 'user,model=open_eth',
649            '-serial', 'tcp::{},server,nowait'.format(self.QEMU_SERIAL_PORT),
650            '-S',
651            '-global driver=timer.esp32.timg,property=wdt_disable,value=true']
652        # TODO(IDF-1242): generate a temporary efuse binary, pass it to QEMU
653
654        if 'QEMU_BIOS_PATH' in os.environ:
655            args += ['-L', os.environ['QEMU_BIOS_PATH']]
656
657        self.qemu = pexpect.spawn(' '.join(args), timeout=self.DEFAULT_EXPECT_TIMEOUT)
658        self.qemu.expect_exact(b'(qemu)')
659        super(IDFQEMUDUT, self).__init__(name, port, log_file, app, allow_dut_exception=allow_dut_exception, **kwargs)
660
661    def _write_flash_img(self):
662        self.flash_image.seek(0)
663        self.flash_image.write(b'\x00' * self.flash_size)
664        for offs, path in self.app.flash_files:
665            with open(path, 'rb') as flash_file:
666                contents = flash_file.read()
667                self.flash_image.seek(offs)
668                self.flash_image.write(contents)
669        self.flash_image.flush()
670
671    @classmethod
672    def get_rom(cls):
673        return esptool.ESP32ROM
674
675    @classmethod
676    def get_mac(cls, app, port):
677        # TODO(IDF-1242): get this from QEMU/efuse binary
678        return '11:22:33:44:55:66'
679
680    @classmethod
681    def confirm_dut(cls, port, **kwargs):
682        return True, cls.TARGET
683
684    def start_app(self, erase_nvs=ERASE_NVS):
685        # TODO: implement erase_nvs
686        # since the flash image is generated every time in the constructor, maybe this isn't needed...
687        self.qemu.sendline(b'cont\n')
688        self.qemu.expect_exact(b'(qemu)')
689
690    def reset(self):
691        self.qemu.sendline(b'system_reset\n')
692        self.qemu.expect_exact(b'(qemu)')
693
694    def erase_partition(self, partition):
695        raise NotImplementedError('method erase_partition not implemented')
696
697    def erase_flash(self):
698        raise NotImplementedError('method erase_flash not implemented')
699
700    def dump_flash(self, output_file, **kwargs):
701        raise NotImplementedError('method dump_flash not implemented')
702
703    @classmethod
704    def list_available_ports(cls):
705        return ['socket://localhost:{}'.format(cls.QEMU_SERIAL_PORT)]
706
707    def close(self):
708        super(IDFQEMUDUT, self).close()
709        self.qemu.sendline(b'q\n')
710        self.qemu.expect_exact(b'(qemu)')
711        for _ in range(self.DEFAULT_EXPECT_TIMEOUT):
712            if not self.qemu.isalive():
713                break
714            time.sleep(1)
715        else:
716            self.qemu.terminate(force=True)
717
718
719class ESP32QEMUDUT(IDFQEMUDUT):
720    TARGET = 'esp32'  # type: ignore
721    TOOLCHAIN_PREFIX = 'xtensa-esp32-elf-'  # type: ignore
722
723
724class IDFFPGADUT(IDFDUT):
725    TARGET = None                           # type: str
726    TOOLCHAIN_PREFIX = None                 # type: str
727    ERASE_NVS = True
728    FLASH_ENCRYPT_SCHEME = None             # type: str
729    FLASH_ENCRYPT_CNT_KEY = None            # type: str
730    FLASH_ENCRYPT_CNT_VAL = 0
731    FLASH_ENCRYPT_PURPOSE = None            # type: str
732    SECURE_BOOT_EN_KEY = None               # type: str
733    SECURE_BOOT_EN_VAL = 0
734    FLASH_SECTOR_SIZE = 4096
735
736    def __init__(self, name, port, log_file, app, allow_dut_exception=False, efuse_reset_port=None, **kwargs):
737        super(IDFFPGADUT, self).__init__(name, port, log_file, app, allow_dut_exception=allow_dut_exception, **kwargs)
738        self.esp = self.get_rom()(port)
739        self.efuses = None
740        self.efuse_operations = None
741        self.efuse_reset_port = efuse_reset_port
742
743    @classmethod
744    def get_rom(cls):
745        raise NotImplementedError('This is an abstraction class, method not defined.')
746
747    def erase_partition(self, esp, partition):
748        raise NotImplementedError()
749
750    def enable_efuses(self):
751        # We use an extra COM port to reset the efuses on FPGA.
752        # Connect DTR pin of the COM port to the efuse reset pin on daughter board
753        # Set EFUSEPORT env variable to the extra COM port
754        if not self.efuse_reset_port:
755            raise RuntimeError('EFUSEPORT not specified')
756
757        # Stop any previous serial port operation
758        self.stop_receive()
759        if self.secure_boot_en:
760            self.esp.connect()
761        self.efuses, self.efuse_operations = espefuse.get_efuses(self.esp, False, False, True)
762
763    def burn_efuse(self, field, val):
764        if not self.efuse_operations:
765            self.enable_efuses()
766        BurnEfuseArgs = collections.namedtuple('burn_efuse_args', ['name_value_pairs', 'only_burn_at_end'])
767        args = BurnEfuseArgs({field: val}, False)
768        self.efuse_operations.burn_efuse(self.esp, self.efuses, args)
769
770    def burn_efuse_key(self, key, purpose, block):
771        if not self.efuse_operations:
772            self.enable_efuses()
773        BurnKeyArgs = collections.namedtuple('burn_key_args',
774                                             ['keyfile', 'keypurpose', 'block',
775                                              'force_write_always', 'no_write_protect', 'no_read_protect', 'only_burn_at_end'])
776        args = BurnKeyArgs([key],
777                           [purpose],
778                           [block],
779                           False, False, False, False)
780        self.efuse_operations.burn_key(self.esp, self.efuses, args)
781
782    def burn_efuse_key_digest(self, key, purpose, block):
783        if not self.efuse_operations:
784            self.enable_efuses()
785        BurnDigestArgs = collections.namedtuple('burn_key_digest_args',
786                                                ['keyfile', 'keypurpose', 'block',
787                                                 'force_write_always', 'no_write_protect', 'no_read_protect', 'only_burn_at_end'])
788        args = BurnDigestArgs([open(key, 'rb')],
789                              [purpose],
790                              [block],
791                              False, False, True, False)
792        self.efuse_operations.burn_key_digest(self.esp, self.efuses, args)
793
794    def reset_efuses(self):
795        if not self.efuse_reset_port:
796            raise RuntimeError('EFUSEPORT not specified')
797        with serial.Serial(self.efuse_reset_port) as efuseport:
798            print('Resetting efuses')
799            efuseport.dtr = 0
800            self.port_inst.setRTS(1)
801            self.port_inst.setRTS(0)
802            time.sleep(1)
803            efuseport.dtr = 1
804            self.efuse_operations = None
805            self.efuses = None
806
807    def sign_data(self, data_file, key_files, version, append_signature=0):
808        SignDataArgs = collections.namedtuple('sign_data_args',
809                                              ['datafile','keyfile','output', 'version', 'append_signatures'])
810        outfile = tempfile.NamedTemporaryFile()
811        args = SignDataArgs(data_file, key_files, outfile.name, str(version), append_signature)
812        espsecure.sign_data(args)
813        outfile.seek(0)
814        return outfile.read()
815
816
817class ESP32C3FPGADUT(IDFFPGADUT):
818    TARGET = 'esp32c3'
819    TOOLCHAIN_PREFIX = 'riscv32-esp-elf-'
820    FLASH_ENCRYPT_SCHEME = 'AES-XTS'
821    FLASH_ENCRYPT_CNT_KEY = 'SPI_BOOT_CRYPT_CNT'
822    FLASH_ENCRYPT_CNT_VAL = 1
823    FLASH_ENCRYPT_PURPOSE = 'XTS_AES_128_KEY'
824    SECURE_BOOT_EN_KEY = 'SECURE_BOOT_EN'
825    SECURE_BOOT_EN_VAL = 1
826
827    @classmethod
828    def get_rom(cls):
829        return esptool.ESP32C3ROM
830
831    def erase_partition(self, esp, partition):
832        raise NotImplementedError()
833
834    def flash_encrypt_burn_cnt(self):
835        self.burn_efuse(self.FLASH_ENCRYPT_CNT_KEY, self.FLASH_ENCRYPT_CNT_VAL)
836
837    def flash_encrypt_burn_key(self, key, block=0):
838        self.burn_efuse_key(key, self.FLASH_ENCRYPT_PURPOSE, 'BLOCK_KEY%d' % block)
839
840    def flash_encrypt_get_scheme(self):
841        return self.FLASH_ENCRYPT_SCHEME
842
843    def secure_boot_burn_en_bit(self):
844        self.burn_efuse(self.SECURE_BOOT_EN_KEY, self.SECURE_BOOT_EN_VAL)
845
846    def secure_boot_burn_digest(self, digest, key_index=0, block=0):
847        self.burn_efuse_key_digest(digest, 'SECURE_BOOT_DIGEST%d' % key_index, 'BLOCK_KEY%d' % block)
848
849    @classmethod
850    def confirm_dut(cls, port, **kwargs):
851        return True, cls.TARGET
852
853
854class ESP32S3FPGADUT(IDFFPGADUT):
855    TARGET = 'esp32s3'
856    TOOLCHAIN_PREFIX = 'xtensa-esp32s3-elf-'
857    FLASH_ENCRYPT_SCHEME = 'AES-XTS'
858    FLASH_ENCRYPT_CNT_KEY = 'SPI_BOOT_CRYPT_CNT'
859    FLASH_ENCRYPT_CNT_VAL = 1
860    FLASH_ENCRYPT_PURPOSE = 'XTS_AES_128_KEY'
861    SECURE_BOOT_EN_KEY = 'SECURE_BOOT_EN'
862    SECURE_BOOT_EN_VAL = 1
863
864    @classmethod
865    def get_rom(cls):
866        return esptool.ESP32S3ROM
867
868    def erase_partition(self, esp, partition):
869        raise NotImplementedError()
870
871    def flash_encrypt_burn_cnt(self):
872        self.burn_efuse(self.FLASH_ENCRYPT_CNT_KEY, self.FLASH_ENCRYPT_CNT_VAL)
873
874    def flash_encrypt_burn_key(self, key, block=0):
875        self.burn_efuse_key(key, self.FLASH_ENCRYPT_PURPOSE, 'BLOCK_KEY%d' % block)
876
877    def flash_encrypt_get_scheme(self):
878        return self.FLASH_ENCRYPT_SCHEME
879
880    def secure_boot_burn_en_bit(self):
881        self.burn_efuse(self.SECURE_BOOT_EN_KEY, self.SECURE_BOOT_EN_VAL)
882
883    def secure_boot_burn_digest(self, digest, key_index=0, block=0):
884        self.burn_efuse_key_digest(digest, 'SECURE_BOOT_DIGEST%d' % key_index, 'BLOCK_KEY%d' % block)
885
886    @classmethod
887    def confirm_dut(cls, port, **kwargs):
888        return True, cls.TARGET
889