1#
2# Copyright 2021 Espressif Systems (Shanghai) PTE LTD
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import base64
18import binascii
19import hashlib
20import logging
21import os
22import subprocess
23import sys
24import tempfile
25
26from construct import AlignedStruct, Bytes, GreedyRange, Int32ul, Padding, Struct, abs_, this
27
28from . import ESPCoreDumpLoaderError, _ArchMethodsBase, _TargetMethodsBase
29from .elf import (TASK_STATUS_CORRECT, TASK_STATUS_TCB_CORRUPTED, ElfFile, ElfSegment, ESPCoreDumpElfFile,
30                  EspTaskStatus, NoteSection)
31from .xtensa import _ArchMethodsXtensa, _TargetMethodsESP32
32
33IDF_PATH = os.getenv('IDF_PATH')
34PARTTOOL_PY = os.path.join(IDF_PATH, 'components', 'partition_table', 'parttool.py')
35ESPTOOL_PY = os.path.join(IDF_PATH, 'components', 'esptool_py', 'esptool', 'esptool.py')
36
37# Following structs are based on source code
38# components/espcoredump/include_core_dump/esp_core_dump_priv.h
39
40EspCoreDumpV1Header = Struct(
41    'tot_len' / Int32ul,
42    'ver' / Int32ul,
43    'task_num' / Int32ul,
44    'tcbsz' / Int32ul,
45)
46
47EspCoreDumpV2Header = Struct(
48    'tot_len' / Int32ul,
49    'ver' / Int32ul,
50    'task_num' / Int32ul,
51    'tcbsz' / Int32ul,
52    'segs_num' / Int32ul,
53)
54
55CRC = Int32ul
56SHA256 = Bytes(32)
57
58TaskHeader = Struct(
59    'tcb_addr' / Int32ul,
60    'stack_top' / Int32ul,
61    'stack_end' / Int32ul,
62)
63
64MemSegmentHeader = Struct(
65    'mem_start' / Int32ul,
66    'mem_sz' / Int32ul,
67    'data' / Bytes(this.mem_sz),
68)
69
70
71class EspCoreDumpVersion(object):
72    """Core dump version class
73    """
74    # This class contains all version-dependent params
75    ESP32 = 0
76    ESP32S2 = 2
77
78    XTENSA_CHIPS = [ESP32, ESP32S2]
79
80    ESP_COREDUMP_TARGETS = XTENSA_CHIPS
81
82    def __init__(self, version=None):
83        """Constructor for core dump version
84        """
85        super(EspCoreDumpVersion, self).__init__()
86        if version is None:
87            self.version = 0
88        else:
89            self.set_version(version)
90
91    @staticmethod
92    def make_dump_ver(major, minor):
93        return ((major & 0xFF) << 8) | ((minor & 0xFF) << 0)
94
95    def set_version(self, version):
96        self.version = version
97
98    @property
99    def chip_ver(self):
100        return (self.version & 0xFFFF0000) >> 16
101
102    @property
103    def dump_ver(self):
104        return self.version & 0x0000FFFF
105
106    @property
107    def major(self):
108        return (self.version & 0x0000FF00) >> 8
109
110    @property
111    def minor(self):
112        return self.version & 0x000000FF
113
114
115class EspCoreDumpLoader(EspCoreDumpVersion):
116    # "legacy" stands for core dumps v0.1 (before IDF v4.1)
117    BIN_V1 = EspCoreDumpVersion.make_dump_ver(0, 1)
118    BIN_V2 = EspCoreDumpVersion.make_dump_ver(0, 2)
119    ELF_CRC32 = EspCoreDumpVersion.make_dump_ver(1, 0)
120    ELF_SHA256 = EspCoreDumpVersion.make_dump_ver(1, 1)
121
122    def __init__(self):
123        super(EspCoreDumpLoader, self).__init__()
124        self.core_src_file = None
125        self.core_src_struct = None
126        self.core_src = None
127
128        self.core_elf_file = None
129
130        self.header = None
131        self.header_struct = EspCoreDumpV1Header
132        self.checksum_struct = CRC
133
134        # These two method classes will be assigned in ``reload_coredump``
135        self.target_method_cls = _TargetMethodsBase
136        self.arch_method_cls = _ArchMethodsBase
137
138        self._temp_files = []
139
140    def __del__(self):
141        if self.core_src_file:
142            self.core_src_file.close()
143        if self.core_elf_file:
144            self.core_elf_file.close()
145        for f in self._temp_files:
146            try:
147                os.remove(f)
148            except OSError:
149                pass
150
151    def _create_temp_file(self):
152        t = tempfile.NamedTemporaryFile('wb', delete=False)
153        self._temp_files.append(t.name)
154        return t
155
156    def _reload_coredump(self):
157        with open(self.core_src_file.name, 'rb') as fr:
158            coredump_bytes = fr.read()
159
160        _header = EspCoreDumpV1Header.parse(coredump_bytes)  # first we use V1 format to get version
161        self.set_version(_header.ver)
162        if self.dump_ver == self.ELF_CRC32:
163            self.checksum_struct = CRC
164            self.header_struct = EspCoreDumpV2Header
165        elif self.dump_ver == self.ELF_SHA256:
166            self.checksum_struct = SHA256
167            self.header_struct = EspCoreDumpV2Header
168        elif self.dump_ver == self.BIN_V1:
169            self.checksum_struct = CRC
170            self.header_struct = EspCoreDumpV1Header
171        elif self.dump_ver == self.BIN_V2:
172            self.checksum_struct = CRC
173            self.header_struct = EspCoreDumpV2Header
174        else:
175            raise ESPCoreDumpLoaderError('Core dump version "0x%x" is not supported!' % self.dump_ver)
176
177        self.core_src_struct = Struct(
178            'header' / self.header_struct,
179            'data' / Bytes(this.header.tot_len - self.header_struct.sizeof() - self.checksum_struct.sizeof()),
180            'checksum' / self.checksum_struct,
181        )
182        self.core_src = self.core_src_struct.parse(coredump_bytes)
183
184        # Reload header if header struct changes after parsing
185        if self.header_struct != EspCoreDumpV1Header:
186            self.header = EspCoreDumpV2Header.parse(coredump_bytes)
187
188        if self.chip_ver in self.ESP_COREDUMP_TARGETS:
189            if self.chip_ver == self.ESP32:
190                self.target_method_cls = _TargetMethodsESP32
191
192            if self.chip_ver in self.XTENSA_CHIPS:
193                self.arch_method_cls = _ArchMethodsXtensa
194        else:
195            raise ESPCoreDumpLoaderError('Core dump chip "0x%x" is not supported!' % self.chip_ver)
196
197    def _validate_dump_file(self):
198        if self.chip_ver not in self.ESP_COREDUMP_TARGETS:
199            raise ESPCoreDumpLoaderError('Invalid core dump chip version: "{}", should be <= "0x{:X}"'
200                                         .format(self.chip_ver, self.ESP32S2))
201
202        if self.checksum_struct == CRC:
203            self._crc_validate()
204        elif self.checksum_struct == SHA256:
205            self._sha256_validate()
206
207    def _crc_validate(self):
208        data_crc = binascii.crc32(EspCoreDumpV2Header.build(self.core_src.header) + self.core_src.data) & 0xffffffff
209        if data_crc != self.core_src.checksum:
210            raise ESPCoreDumpLoaderError('Invalid core dump CRC %x, should be %x' % (data_crc, self.core_src.crc))
211
212    def _sha256_validate(self):
213        data_sha256 = hashlib.sha256(EspCoreDumpV2Header.build(self.core_src.header) + self.core_src.data)
214        data_sha256_str = data_sha256.hexdigest()
215        sha256_str = binascii.hexlify(self.core_src.checksum).decode('ascii')
216        if data_sha256_str != sha256_str:
217            raise ESPCoreDumpLoaderError('Invalid core dump SHA256 "{}", should be "{}"'
218                                         .format(data_sha256_str, sha256_str))
219
220    def create_corefile(self, exe_name=None):  # type: (str) -> None
221        """
222        Creates core dump ELF file
223        """
224        self._validate_dump_file()
225        self.core_elf_file = self._create_temp_file()
226
227        if self.dump_ver in [self.ELF_CRC32,
228                             self.ELF_SHA256]:
229            self._extract_elf_corefile(exe_name)
230        elif self.dump_ver in [self.BIN_V1,
231                               self.BIN_V2]:
232            self._extract_bin_corefile()
233        else:
234            raise NotImplementedError
235
236    def _extract_elf_corefile(self, exe_name=None):
237        """
238        Reads the ELF formatted core dump image and parse it
239        """
240        self.core_elf_file.write(self.core_src.data)
241        # Need to be closed before read. Otherwise the result will be wrong
242        self.core_elf_file.close()
243
244        core_elf = ESPCoreDumpElfFile(self.core_elf_file.name)
245
246        # Read note segments from core file which are belong to tasks (TCB or stack)
247        for seg in core_elf.note_segments:
248            for note_sec in seg.note_secs:
249                # Check for version info note
250                if note_sec.name == 'ESP_CORE_DUMP_INFO' \
251                        and note_sec.type == ESPCoreDumpElfFile.PT_INFO \
252                        and exe_name:
253                    exe_elf = ElfFile(exe_name)
254                    app_sha256 = binascii.hexlify(exe_elf.sha256)
255                    coredump_sha256_struct = Struct(
256                        'ver' / Int32ul,
257                        'sha256' / Bytes(64)  # SHA256 as hex string
258                    )
259                    coredump_sha256 = coredump_sha256_struct.parse(note_sec.desc[:coredump_sha256_struct.sizeof()])
260                    if coredump_sha256.sha256 != app_sha256:
261                        raise ESPCoreDumpLoaderError(
262                            'Invalid application image for coredump: coredump SHA256({}) != app SHA256({}).'
263                            .format(coredump_sha256, app_sha256))
264                    if coredump_sha256.ver != self.version:
265                        raise ESPCoreDumpLoaderError(
266                            'Invalid application image for coredump: coredump SHA256 version({}) != app SHA256 version({}).'
267                            .format(coredump_sha256.ver, self.version))
268
269    @staticmethod
270    def _get_aligned_size(size, align_with=4):
271        if size % align_with:
272            return align_with * (size // align_with + 1)
273        return size
274
275    @staticmethod
276    def _build_note_section(name, sec_type, desc):
277        name = bytearray(name, encoding='ascii') + b'\0'
278        return NoteSection.build({
279            'namesz': len(name),
280            'descsz': len(desc),
281            'type': sec_type,
282            'name': name,
283            'desc': desc,
284        })
285
286    def _extract_bin_corefile(self):
287        """
288        Creates core dump ELF file
289        """
290        tcbsz_aligned = self._get_aligned_size(self.header.tcbsz)
291
292        coredump_data_struct = Struct(
293            'tasks' / GreedyRange(
294                AlignedStruct(
295                    4,
296                    'task_header' / TaskHeader,
297                    'tcb' / Bytes(self.header.tcbsz),
298                    'stack' / Bytes(abs_(this.task_header.stack_top - this.task_header.stack_end)),
299                )
300            ),
301            'mem_seg_headers' / MemSegmentHeader[self.core_src.header.segs_num]
302        )
303
304        core_elf = ESPCoreDumpElfFile()
305        notes = b''
306        core_dump_info_notes = b''
307        task_info_notes = b''
308
309        coredump_data = coredump_data_struct.parse(self.core_src.data)
310        for i, task in enumerate(coredump_data.tasks):
311            stack_len_aligned = self._get_aligned_size(abs(task.task_header.stack_top - task.task_header.stack_end))
312            task_status_kwargs = {
313                'task_index': i,
314                'task_flags': TASK_STATUS_CORRECT,
315                'task_tcb_addr': task.task_header.tcb_addr,
316                'task_stack_start': min(task.task_header.stack_top, task.task_header.stack_end),
317                'task_stack_len': stack_len_aligned,
318                'task_name': Padding(16).build({})  # currently we don't have task_name, keep it as padding
319            }
320
321            # Write TCB
322            try:
323                if self.target_method_cls.tcb_is_sane(task.task_header.tcb_addr, tcbsz_aligned):
324                    core_elf.add_segment(task.task_header.tcb_addr,
325                                         task.tcb,
326                                         ElfFile.PT_LOAD,
327                                         ElfSegment.PF_R | ElfSegment.PF_W)
328                elif task.task_header.tcb_addr and self.target_method_cls.addr_is_fake(task.task_header.tcb_addr):
329                    task_status_kwargs['task_flags'] |= TASK_STATUS_TCB_CORRUPTED
330            except ESPCoreDumpLoaderError as e:
331                logging.warning('Skip TCB {} bytes @ 0x{:x}. (Reason: {})'
332                                .format(tcbsz_aligned, task.task_header.tcb_addr, e))
333
334            # Write stack
335            try:
336                if self.target_method_cls.stack_is_sane(task_status_kwargs['task_stack_start']):
337                    core_elf.add_segment(task_status_kwargs['task_stack_start'],
338                                         task.stack,
339                                         ElfFile.PT_LOAD,
340                                         ElfSegment.PF_R | ElfSegment.PF_W)
341                elif task_status_kwargs['task_stack_start'] \
342                        and self.target_method_cls.addr_is_fake(task_status_kwargs['task_stack_start']):
343                    task_status_kwargs['task_flags'] |= TASK_STATUS_TCB_CORRUPTED
344                    core_elf.add_segment(task_status_kwargs['task_stack_start'],
345                                         task.stack,
346                                         ElfFile.PT_LOAD,
347                                         ElfSegment.PF_R | ElfSegment.PF_W)
348            except ESPCoreDumpLoaderError as e:
349                logging.warning('Skip task\'s ({:x}) stack {} bytes @ 0x{:x}. (Reason: {})'
350                                .format(task_status_kwargs['tcb_addr'],
351                                        task_status_kwargs['stack_len_aligned'],
352                                        task_status_kwargs['stack_base'],
353                                        e))
354
355            try:
356                logging.debug('Stack start_end: 0x{:x} @ 0x{:x}'
357                              .format(task.task_header.stack_top, task.task_header.stack_end))
358                task_regs, extra_regs = self.arch_method_cls.get_registers_from_stack(
359                    task.stack,
360                    task.task_header.stack_end > task.task_header.stack_top
361                )
362            except Exception as e:
363                raise ESPCoreDumpLoaderError(str(e))
364
365            task_info_notes += self._build_note_section('TASK_INFO',
366                                                        ESPCoreDumpElfFile.PT_TASK_INFO,
367                                                        EspTaskStatus.build(task_status_kwargs))
368            notes += self._build_note_section('CORE',
369                                              ElfFile.PT_LOAD,
370                                              self.arch_method_cls.build_prstatus_data(task.task_header.tcb_addr,
371                                                                                       task_regs))
372
373            if extra_regs and len(core_dump_info_notes) == 0:
374                # actually there will be only one such note - for crashed task
375                core_dump_info_notes += self._build_note_section('ESP_CORE_DUMP_INFO',
376                                                                 ESPCoreDumpElfFile.PT_INFO,
377                                                                 Int32ul.build(self.header.ver))
378
379                exc_regs = []
380                for reg_id in extra_regs:
381                    exc_regs.extend([reg_id, extra_regs[reg_id]])
382                _regs = [task.task_header.tcb_addr] + exc_regs
383                core_dump_info_notes += self._build_note_section(
384                    'EXTRA_INFO',
385                    ESPCoreDumpElfFile.PT_EXTRA_INFO,
386                    Int32ul[1 + len(exc_regs)].build(_regs)
387                )
388
389        if self.dump_ver == self.BIN_V2:
390            for header in coredump_data.mem_seg_headers:
391                logging.debug('Read memory segment {} bytes @ 0x{:x}'.format(header.mem_sz, header.mem_start))
392                core_elf.add_segment(header.mem_start, header.data, ElfFile.PT_LOAD, ElfSegment.PF_R | ElfSegment.PF_W)
393
394        # add notes
395        try:
396            core_elf.add_segment(0, notes, ElfFile.PT_NOTE, 0)
397        except ESPCoreDumpLoaderError as e:
398            logging.warning('Skip NOTES segment {:d} bytes @ 0x{:x}. (Reason: {})'.format(len(notes), 0, e))
399        # add core dump info notes
400        try:
401            core_elf.add_segment(0, core_dump_info_notes, ElfFile.PT_NOTE, 0)
402        except ESPCoreDumpLoaderError as e:
403            logging.warning('Skip core dump info NOTES segment {:d} bytes @ 0x{:x}. (Reason: {})'
404                            .format(len(core_dump_info_notes), 0, e))
405        try:
406            core_elf.add_segment(0, task_info_notes, ElfFile.PT_NOTE, 0)
407        except ESPCoreDumpLoaderError as e:
408            logging.warning('Skip failed tasks info NOTES segment {:d} bytes @ 0x{:x}. (Reason: {})'
409                            .format(len(task_info_notes), 0, e))
410        # dump core ELF
411        core_elf.e_type = ElfFile.ET_CORE
412        core_elf.e_machine = ESPCoreDumpElfFile.EM_XTENSA
413        core_elf.dump(self.core_elf_file.name)
414
415
416class ESPCoreDumpFlashLoader(EspCoreDumpLoader):
417    ESP_COREDUMP_PART_TABLE_OFF = 0x8000
418
419    def __init__(self, offset, target='esp32', port=None, baud=None):
420        super(ESPCoreDumpFlashLoader, self).__init__()
421        self.port = port
422        self.baud = baud
423        self.target = target
424
425        self._get_coredump(offset)
426        self._reload_coredump()
427
428    def _get_coredump(self, off):
429        """
430        Loads core dump from flash using parttool or elftool (if offset is set)
431        """
432        try:
433            if off:
434                logging.info('Invoke esptool to read image.')
435                self._invoke_esptool(off=off)
436            else:
437                logging.info('Invoke parttool to read image.')
438                self._invoke_parttool()
439        except subprocess.CalledProcessError as e:
440            if e.output:
441                logging.info(e.output)
442            logging.error('Error during the subprocess execution')
443        else:
444            # Need to be closed before read. Otherwise the result will be wrong
445            self.core_src_file.close()
446
447    def _invoke_esptool(self, off=None):
448        """
449        Loads core dump from flash using elftool
450        """
451        tool_args = [sys.executable, ESPTOOL_PY, '-c', self.target]
452        if self.port:
453            tool_args.extend(['-p', self.port])
454        if self.baud:
455            tool_args.extend(['-b', str(self.baud)])
456
457        self.core_src_file = self._create_temp_file()
458        try:
459            (part_offset, part_size) = self._get_core_dump_partition_info()
460            if not off:
461                off = part_offset  # set default offset if not specified
462                logging.warning('The core dump image offset is not specified. Use partition offset: %d.', part_offset)
463            if part_offset != off:
464                logging.warning('Predefined image offset: %d does not match core dump partition offset: %d', off,
465                                part_offset)
466
467            # Here we use V1 format to locate the size
468            tool_args.extend(['read_flash', str(off), str(EspCoreDumpV1Header.sizeof())])
469            tool_args.append(self.core_src_file.name)
470
471            # read core dump length
472            et_out = subprocess.check_output(tool_args)
473            if et_out:
474                logging.info(et_out.decode('utf-8'))
475
476            header = EspCoreDumpV1Header.parse(open(self.core_src_file.name, 'rb').read())
477            if not header or not 0 < header.tot_len <= part_size:
478                logging.error('Incorrect size of core dump image: {}, use partition size instead: {}'
479                              .format(header.tot_len, part_size))
480                coredump_len = part_size
481            else:
482                coredump_len = header.tot_len
483            # set actual size of core dump image and read it from flash
484            tool_args[-2] = str(coredump_len)
485            et_out = subprocess.check_output(tool_args)
486            if et_out:
487                logging.info(et_out.decode('utf-8'))
488        except subprocess.CalledProcessError as e:
489            logging.error('esptool script execution failed with err %d', e.returncode)
490            logging.debug("Command ran: '%s'", e.cmd)
491            logging.debug('Command out:')
492            logging.debug(e.output)
493            raise e
494
495    def _invoke_parttool(self):
496        """
497        Loads core dump from flash using parttool
498        """
499        tool_args = [sys.executable, PARTTOOL_PY]
500        if self.port:
501            tool_args.extend(['--port', self.port])
502        tool_args.extend(['read_partition', '--partition-type', 'data', '--partition-subtype', 'coredump', '--output'])
503
504        self.core_src_file = self._create_temp_file()
505        try:
506            tool_args.append(self.core_src_file.name)
507            # read core dump partition
508            et_out = subprocess.check_output(tool_args)
509            if et_out:
510                logging.info(et_out.decode('utf-8'))
511        except subprocess.CalledProcessError as e:
512            logging.error('parttool script execution failed with err %d', e.returncode)
513            logging.debug("Command ran: '%s'", e.cmd)
514            logging.debug('Command out:')
515            logging.debug(e.output)
516            raise e
517
518    def _get_core_dump_partition_info(self, part_off=None):
519        """
520        Get core dump partition info using parttool
521        """
522        logging.info('Retrieving core dump partition offset and size...')
523        if not part_off:
524            part_off = self.ESP_COREDUMP_PART_TABLE_OFF
525        try:
526            tool_args = [sys.executable, PARTTOOL_PY, '-q', '--partition-table-offset', str(part_off)]
527            if self.port:
528                tool_args.extend(['--port', self.port])
529            invoke_args = tool_args + ['get_partition_info', '--partition-type', 'data',
530                                       '--partition-subtype', 'coredump',
531                                       '--info', 'offset', 'size']
532            res = subprocess.check_output(invoke_args).strip()
533            (offset_str, size_str) = res.rsplit(b'\n')[-1].split(b' ')
534            size = int(size_str, 16)
535            offset = int(offset_str, 16)
536            logging.info('Core dump partition offset=%d, size=%d', offset, size)
537        except subprocess.CalledProcessError as e:
538            logging.error('parttool get partition info failed with err %d', e.returncode)
539            logging.debug("Command ran: '%s'", e.cmd)
540            logging.debug('Command out:')
541            logging.debug(e.output)
542            logging.error('Check if the coredump partition exists in partition table.')
543            raise e
544        return offset, size
545
546
547class ESPCoreDumpFileLoader(EspCoreDumpLoader):
548    def __init__(self, path, is_b64=False):
549        super(ESPCoreDumpFileLoader, self).__init__()
550        self.is_b64 = is_b64
551
552        self._get_coredump(path)
553        self._reload_coredump()
554
555    def _get_coredump(self, path):
556        """
557        Loads core dump from (raw binary or base64-encoded) file
558        """
559        logging.debug('Load core dump from "%s", %s format', path, 'b64' if self.is_b64 else 'raw')
560        if not self.is_b64:
561            self.core_src_file = open(path, mode='rb')
562        else:
563            self.core_src_file = self._create_temp_file()
564            with open(path, 'rb') as fb64:
565                while True:
566                    line = fb64.readline()
567                    if len(line) == 0:
568                        break
569                    data = base64.standard_b64decode(line.rstrip(b'\r\n'))
570                    self.core_src_file.write(data)
571                self.core_src_file.flush()
572                self.core_src_file.seek(0)
573