1# 2# Copyright 2021 Espressif Systems (Shanghai) PTE LTD 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import base64 18import binascii 19import hashlib 20import logging 21import os 22import subprocess 23import sys 24import tempfile 25 26from construct import AlignedStruct, Bytes, GreedyRange, Int32ul, Padding, Struct, abs_, this 27 28from . import ESPCoreDumpLoaderError, _ArchMethodsBase, _TargetMethodsBase 29from .elf import (TASK_STATUS_CORRECT, TASK_STATUS_TCB_CORRUPTED, ElfFile, ElfSegment, ESPCoreDumpElfFile, 30 EspTaskStatus, NoteSection) 31from .xtensa import _ArchMethodsXtensa, _TargetMethodsESP32 32 33IDF_PATH = os.getenv('IDF_PATH') 34PARTTOOL_PY = os.path.join(IDF_PATH, 'components', 'partition_table', 'parttool.py') 35ESPTOOL_PY = os.path.join(IDF_PATH, 'components', 'esptool_py', 'esptool', 'esptool.py') 36 37# Following structs are based on source code 38# components/espcoredump/include_core_dump/esp_core_dump_priv.h 39 40EspCoreDumpV1Header = Struct( 41 'tot_len' / Int32ul, 42 'ver' / Int32ul, 43 'task_num' / Int32ul, 44 'tcbsz' / Int32ul, 45) 46 47EspCoreDumpV2Header = Struct( 48 'tot_len' / Int32ul, 49 'ver' / Int32ul, 50 'task_num' / Int32ul, 51 'tcbsz' / Int32ul, 52 'segs_num' / Int32ul, 53) 54 55CRC = Int32ul 56SHA256 = Bytes(32) 57 58TaskHeader = Struct( 59 'tcb_addr' / Int32ul, 60 'stack_top' / Int32ul, 61 'stack_end' / Int32ul, 62) 63 64MemSegmentHeader = Struct( 65 'mem_start' / Int32ul, 66 'mem_sz' / Int32ul, 67 'data' / Bytes(this.mem_sz), 68) 69 70 71class EspCoreDumpVersion(object): 72 """Core dump version class 73 """ 74 # This class contains all version-dependent params 75 ESP32 = 0 76 ESP32S2 = 2 77 78 XTENSA_CHIPS = [ESP32, ESP32S2] 79 80 ESP_COREDUMP_TARGETS = XTENSA_CHIPS 81 82 def __init__(self, version=None): 83 """Constructor for core dump version 84 """ 85 super(EspCoreDumpVersion, self).__init__() 86 if version is None: 87 self.version = 0 88 else: 89 self.set_version(version) 90 91 @staticmethod 92 def make_dump_ver(major, minor): 93 return ((major & 0xFF) << 8) | ((minor & 0xFF) << 0) 94 95 def set_version(self, version): 96 self.version = version 97 98 @property 99 def chip_ver(self): 100 return (self.version & 0xFFFF0000) >> 16 101 102 @property 103 def dump_ver(self): 104 return self.version & 0x0000FFFF 105 106 @property 107 def major(self): 108 return (self.version & 0x0000FF00) >> 8 109 110 @property 111 def minor(self): 112 return self.version & 0x000000FF 113 114 115class EspCoreDumpLoader(EspCoreDumpVersion): 116 # "legacy" stands for core dumps v0.1 (before IDF v4.1) 117 BIN_V1 = EspCoreDumpVersion.make_dump_ver(0, 1) 118 BIN_V2 = EspCoreDumpVersion.make_dump_ver(0, 2) 119 ELF_CRC32 = EspCoreDumpVersion.make_dump_ver(1, 0) 120 ELF_SHA256 = EspCoreDumpVersion.make_dump_ver(1, 1) 121 122 def __init__(self): 123 super(EspCoreDumpLoader, self).__init__() 124 self.core_src_file = None 125 self.core_src_struct = None 126 self.core_src = None 127 128 self.core_elf_file = None 129 130 self.header = None 131 self.header_struct = EspCoreDumpV1Header 132 self.checksum_struct = CRC 133 134 # These two method classes will be assigned in ``reload_coredump`` 135 self.target_method_cls = _TargetMethodsBase 136 self.arch_method_cls = _ArchMethodsBase 137 138 self._temp_files = [] 139 140 def __del__(self): 141 if self.core_src_file: 142 self.core_src_file.close() 143 if self.core_elf_file: 144 self.core_elf_file.close() 145 for f in self._temp_files: 146 try: 147 os.remove(f) 148 except OSError: 149 pass 150 151 def _create_temp_file(self): 152 t = tempfile.NamedTemporaryFile('wb', delete=False) 153 self._temp_files.append(t.name) 154 return t 155 156 def _reload_coredump(self): 157 with open(self.core_src_file.name, 'rb') as fr: 158 coredump_bytes = fr.read() 159 160 _header = EspCoreDumpV1Header.parse(coredump_bytes) # first we use V1 format to get version 161 self.set_version(_header.ver) 162 if self.dump_ver == self.ELF_CRC32: 163 self.checksum_struct = CRC 164 self.header_struct = EspCoreDumpV2Header 165 elif self.dump_ver == self.ELF_SHA256: 166 self.checksum_struct = SHA256 167 self.header_struct = EspCoreDumpV2Header 168 elif self.dump_ver == self.BIN_V1: 169 self.checksum_struct = CRC 170 self.header_struct = EspCoreDumpV1Header 171 elif self.dump_ver == self.BIN_V2: 172 self.checksum_struct = CRC 173 self.header_struct = EspCoreDumpV2Header 174 else: 175 raise ESPCoreDumpLoaderError('Core dump version "0x%x" is not supported!' % self.dump_ver) 176 177 self.core_src_struct = Struct( 178 'header' / self.header_struct, 179 'data' / Bytes(this.header.tot_len - self.header_struct.sizeof() - self.checksum_struct.sizeof()), 180 'checksum' / self.checksum_struct, 181 ) 182 self.core_src = self.core_src_struct.parse(coredump_bytes) 183 184 # Reload header if header struct changes after parsing 185 if self.header_struct != EspCoreDumpV1Header: 186 self.header = EspCoreDumpV2Header.parse(coredump_bytes) 187 188 if self.chip_ver in self.ESP_COREDUMP_TARGETS: 189 if self.chip_ver == self.ESP32: 190 self.target_method_cls = _TargetMethodsESP32 191 192 if self.chip_ver in self.XTENSA_CHIPS: 193 self.arch_method_cls = _ArchMethodsXtensa 194 else: 195 raise ESPCoreDumpLoaderError('Core dump chip "0x%x" is not supported!' % self.chip_ver) 196 197 def _validate_dump_file(self): 198 if self.chip_ver not in self.ESP_COREDUMP_TARGETS: 199 raise ESPCoreDumpLoaderError('Invalid core dump chip version: "{}", should be <= "0x{:X}"' 200 .format(self.chip_ver, self.ESP32S2)) 201 202 if self.checksum_struct == CRC: 203 self._crc_validate() 204 elif self.checksum_struct == SHA256: 205 self._sha256_validate() 206 207 def _crc_validate(self): 208 data_crc = binascii.crc32(EspCoreDumpV2Header.build(self.core_src.header) + self.core_src.data) & 0xffffffff 209 if data_crc != self.core_src.checksum: 210 raise ESPCoreDumpLoaderError('Invalid core dump CRC %x, should be %x' % (data_crc, self.core_src.crc)) 211 212 def _sha256_validate(self): 213 data_sha256 = hashlib.sha256(EspCoreDumpV2Header.build(self.core_src.header) + self.core_src.data) 214 data_sha256_str = data_sha256.hexdigest() 215 sha256_str = binascii.hexlify(self.core_src.checksum).decode('ascii') 216 if data_sha256_str != sha256_str: 217 raise ESPCoreDumpLoaderError('Invalid core dump SHA256 "{}", should be "{}"' 218 .format(data_sha256_str, sha256_str)) 219 220 def create_corefile(self, exe_name=None): # type: (str) -> None 221 """ 222 Creates core dump ELF file 223 """ 224 self._validate_dump_file() 225 self.core_elf_file = self._create_temp_file() 226 227 if self.dump_ver in [self.ELF_CRC32, 228 self.ELF_SHA256]: 229 self._extract_elf_corefile(exe_name) 230 elif self.dump_ver in [self.BIN_V1, 231 self.BIN_V2]: 232 self._extract_bin_corefile() 233 else: 234 raise NotImplementedError 235 236 def _extract_elf_corefile(self, exe_name=None): 237 """ 238 Reads the ELF formatted core dump image and parse it 239 """ 240 self.core_elf_file.write(self.core_src.data) 241 # Need to be closed before read. Otherwise the result will be wrong 242 self.core_elf_file.close() 243 244 core_elf = ESPCoreDumpElfFile(self.core_elf_file.name) 245 246 # Read note segments from core file which are belong to tasks (TCB or stack) 247 for seg in core_elf.note_segments: 248 for note_sec in seg.note_secs: 249 # Check for version info note 250 if note_sec.name == 'ESP_CORE_DUMP_INFO' \ 251 and note_sec.type == ESPCoreDumpElfFile.PT_INFO \ 252 and exe_name: 253 exe_elf = ElfFile(exe_name) 254 app_sha256 = binascii.hexlify(exe_elf.sha256) 255 coredump_sha256_struct = Struct( 256 'ver' / Int32ul, 257 'sha256' / Bytes(64) # SHA256 as hex string 258 ) 259 coredump_sha256 = coredump_sha256_struct.parse(note_sec.desc[:coredump_sha256_struct.sizeof()]) 260 if coredump_sha256.sha256 != app_sha256: 261 raise ESPCoreDumpLoaderError( 262 'Invalid application image for coredump: coredump SHA256({}) != app SHA256({}).' 263 .format(coredump_sha256, app_sha256)) 264 if coredump_sha256.ver != self.version: 265 raise ESPCoreDumpLoaderError( 266 'Invalid application image for coredump: coredump SHA256 version({}) != app SHA256 version({}).' 267 .format(coredump_sha256.ver, self.version)) 268 269 @staticmethod 270 def _get_aligned_size(size, align_with=4): 271 if size % align_with: 272 return align_with * (size // align_with + 1) 273 return size 274 275 @staticmethod 276 def _build_note_section(name, sec_type, desc): 277 name = bytearray(name, encoding='ascii') + b'\0' 278 return NoteSection.build({ 279 'namesz': len(name), 280 'descsz': len(desc), 281 'type': sec_type, 282 'name': name, 283 'desc': desc, 284 }) 285 286 def _extract_bin_corefile(self): 287 """ 288 Creates core dump ELF file 289 """ 290 tcbsz_aligned = self._get_aligned_size(self.header.tcbsz) 291 292 coredump_data_struct = Struct( 293 'tasks' / GreedyRange( 294 AlignedStruct( 295 4, 296 'task_header' / TaskHeader, 297 'tcb' / Bytes(self.header.tcbsz), 298 'stack' / Bytes(abs_(this.task_header.stack_top - this.task_header.stack_end)), 299 ) 300 ), 301 'mem_seg_headers' / MemSegmentHeader[self.core_src.header.segs_num] 302 ) 303 304 core_elf = ESPCoreDumpElfFile() 305 notes = b'' 306 core_dump_info_notes = b'' 307 task_info_notes = b'' 308 309 coredump_data = coredump_data_struct.parse(self.core_src.data) 310 for i, task in enumerate(coredump_data.tasks): 311 stack_len_aligned = self._get_aligned_size(abs(task.task_header.stack_top - task.task_header.stack_end)) 312 task_status_kwargs = { 313 'task_index': i, 314 'task_flags': TASK_STATUS_CORRECT, 315 'task_tcb_addr': task.task_header.tcb_addr, 316 'task_stack_start': min(task.task_header.stack_top, task.task_header.stack_end), 317 'task_stack_len': stack_len_aligned, 318 'task_name': Padding(16).build({}) # currently we don't have task_name, keep it as padding 319 } 320 321 # Write TCB 322 try: 323 if self.target_method_cls.tcb_is_sane(task.task_header.tcb_addr, tcbsz_aligned): 324 core_elf.add_segment(task.task_header.tcb_addr, 325 task.tcb, 326 ElfFile.PT_LOAD, 327 ElfSegment.PF_R | ElfSegment.PF_W) 328 elif task.task_header.tcb_addr and self.target_method_cls.addr_is_fake(task.task_header.tcb_addr): 329 task_status_kwargs['task_flags'] |= TASK_STATUS_TCB_CORRUPTED 330 except ESPCoreDumpLoaderError as e: 331 logging.warning('Skip TCB {} bytes @ 0x{:x}. (Reason: {})' 332 .format(tcbsz_aligned, task.task_header.tcb_addr, e)) 333 334 # Write stack 335 try: 336 if self.target_method_cls.stack_is_sane(task_status_kwargs['task_stack_start']): 337 core_elf.add_segment(task_status_kwargs['task_stack_start'], 338 task.stack, 339 ElfFile.PT_LOAD, 340 ElfSegment.PF_R | ElfSegment.PF_W) 341 elif task_status_kwargs['task_stack_start'] \ 342 and self.target_method_cls.addr_is_fake(task_status_kwargs['task_stack_start']): 343 task_status_kwargs['task_flags'] |= TASK_STATUS_TCB_CORRUPTED 344 core_elf.add_segment(task_status_kwargs['task_stack_start'], 345 task.stack, 346 ElfFile.PT_LOAD, 347 ElfSegment.PF_R | ElfSegment.PF_W) 348 except ESPCoreDumpLoaderError as e: 349 logging.warning('Skip task\'s ({:x}) stack {} bytes @ 0x{:x}. (Reason: {})' 350 .format(task_status_kwargs['tcb_addr'], 351 task_status_kwargs['stack_len_aligned'], 352 task_status_kwargs['stack_base'], 353 e)) 354 355 try: 356 logging.debug('Stack start_end: 0x{:x} @ 0x{:x}' 357 .format(task.task_header.stack_top, task.task_header.stack_end)) 358 task_regs, extra_regs = self.arch_method_cls.get_registers_from_stack( 359 task.stack, 360 task.task_header.stack_end > task.task_header.stack_top 361 ) 362 except Exception as e: 363 raise ESPCoreDumpLoaderError(str(e)) 364 365 task_info_notes += self._build_note_section('TASK_INFO', 366 ESPCoreDumpElfFile.PT_TASK_INFO, 367 EspTaskStatus.build(task_status_kwargs)) 368 notes += self._build_note_section('CORE', 369 ElfFile.PT_LOAD, 370 self.arch_method_cls.build_prstatus_data(task.task_header.tcb_addr, 371 task_regs)) 372 373 if extra_regs and len(core_dump_info_notes) == 0: 374 # actually there will be only one such note - for crashed task 375 core_dump_info_notes += self._build_note_section('ESP_CORE_DUMP_INFO', 376 ESPCoreDumpElfFile.PT_INFO, 377 Int32ul.build(self.header.ver)) 378 379 exc_regs = [] 380 for reg_id in extra_regs: 381 exc_regs.extend([reg_id, extra_regs[reg_id]]) 382 _regs = [task.task_header.tcb_addr] + exc_regs 383 core_dump_info_notes += self._build_note_section( 384 'EXTRA_INFO', 385 ESPCoreDumpElfFile.PT_EXTRA_INFO, 386 Int32ul[1 + len(exc_regs)].build(_regs) 387 ) 388 389 if self.dump_ver == self.BIN_V2: 390 for header in coredump_data.mem_seg_headers: 391 logging.debug('Read memory segment {} bytes @ 0x{:x}'.format(header.mem_sz, header.mem_start)) 392 core_elf.add_segment(header.mem_start, header.data, ElfFile.PT_LOAD, ElfSegment.PF_R | ElfSegment.PF_W) 393 394 # add notes 395 try: 396 core_elf.add_segment(0, notes, ElfFile.PT_NOTE, 0) 397 except ESPCoreDumpLoaderError as e: 398 logging.warning('Skip NOTES segment {:d} bytes @ 0x{:x}. (Reason: {})'.format(len(notes), 0, e)) 399 # add core dump info notes 400 try: 401 core_elf.add_segment(0, core_dump_info_notes, ElfFile.PT_NOTE, 0) 402 except ESPCoreDumpLoaderError as e: 403 logging.warning('Skip core dump info NOTES segment {:d} bytes @ 0x{:x}. (Reason: {})' 404 .format(len(core_dump_info_notes), 0, e)) 405 try: 406 core_elf.add_segment(0, task_info_notes, ElfFile.PT_NOTE, 0) 407 except ESPCoreDumpLoaderError as e: 408 logging.warning('Skip failed tasks info NOTES segment {:d} bytes @ 0x{:x}. (Reason: {})' 409 .format(len(task_info_notes), 0, e)) 410 # dump core ELF 411 core_elf.e_type = ElfFile.ET_CORE 412 core_elf.e_machine = ESPCoreDumpElfFile.EM_XTENSA 413 core_elf.dump(self.core_elf_file.name) 414 415 416class ESPCoreDumpFlashLoader(EspCoreDumpLoader): 417 ESP_COREDUMP_PART_TABLE_OFF = 0x8000 418 419 def __init__(self, offset, target='esp32', port=None, baud=None): 420 super(ESPCoreDumpFlashLoader, self).__init__() 421 self.port = port 422 self.baud = baud 423 self.target = target 424 425 self._get_coredump(offset) 426 self._reload_coredump() 427 428 def _get_coredump(self, off): 429 """ 430 Loads core dump from flash using parttool or elftool (if offset is set) 431 """ 432 try: 433 if off: 434 logging.info('Invoke esptool to read image.') 435 self._invoke_esptool(off=off) 436 else: 437 logging.info('Invoke parttool to read image.') 438 self._invoke_parttool() 439 except subprocess.CalledProcessError as e: 440 if e.output: 441 logging.info(e.output) 442 logging.error('Error during the subprocess execution') 443 else: 444 # Need to be closed before read. Otherwise the result will be wrong 445 self.core_src_file.close() 446 447 def _invoke_esptool(self, off=None): 448 """ 449 Loads core dump from flash using elftool 450 """ 451 tool_args = [sys.executable, ESPTOOL_PY, '-c', self.target] 452 if self.port: 453 tool_args.extend(['-p', self.port]) 454 if self.baud: 455 tool_args.extend(['-b', str(self.baud)]) 456 457 self.core_src_file = self._create_temp_file() 458 try: 459 (part_offset, part_size) = self._get_core_dump_partition_info() 460 if not off: 461 off = part_offset # set default offset if not specified 462 logging.warning('The core dump image offset is not specified. Use partition offset: %d.', part_offset) 463 if part_offset != off: 464 logging.warning('Predefined image offset: %d does not match core dump partition offset: %d', off, 465 part_offset) 466 467 # Here we use V1 format to locate the size 468 tool_args.extend(['read_flash', str(off), str(EspCoreDumpV1Header.sizeof())]) 469 tool_args.append(self.core_src_file.name) 470 471 # read core dump length 472 et_out = subprocess.check_output(tool_args) 473 if et_out: 474 logging.info(et_out.decode('utf-8')) 475 476 header = EspCoreDumpV1Header.parse(open(self.core_src_file.name, 'rb').read()) 477 if not header or not 0 < header.tot_len <= part_size: 478 logging.error('Incorrect size of core dump image: {}, use partition size instead: {}' 479 .format(header.tot_len, part_size)) 480 coredump_len = part_size 481 else: 482 coredump_len = header.tot_len 483 # set actual size of core dump image and read it from flash 484 tool_args[-2] = str(coredump_len) 485 et_out = subprocess.check_output(tool_args) 486 if et_out: 487 logging.info(et_out.decode('utf-8')) 488 except subprocess.CalledProcessError as e: 489 logging.error('esptool script execution failed with err %d', e.returncode) 490 logging.debug("Command ran: '%s'", e.cmd) 491 logging.debug('Command out:') 492 logging.debug(e.output) 493 raise e 494 495 def _invoke_parttool(self): 496 """ 497 Loads core dump from flash using parttool 498 """ 499 tool_args = [sys.executable, PARTTOOL_PY] 500 if self.port: 501 tool_args.extend(['--port', self.port]) 502 tool_args.extend(['read_partition', '--partition-type', 'data', '--partition-subtype', 'coredump', '--output']) 503 504 self.core_src_file = self._create_temp_file() 505 try: 506 tool_args.append(self.core_src_file.name) 507 # read core dump partition 508 et_out = subprocess.check_output(tool_args) 509 if et_out: 510 logging.info(et_out.decode('utf-8')) 511 except subprocess.CalledProcessError as e: 512 logging.error('parttool script execution failed with err %d', e.returncode) 513 logging.debug("Command ran: '%s'", e.cmd) 514 logging.debug('Command out:') 515 logging.debug(e.output) 516 raise e 517 518 def _get_core_dump_partition_info(self, part_off=None): 519 """ 520 Get core dump partition info using parttool 521 """ 522 logging.info('Retrieving core dump partition offset and size...') 523 if not part_off: 524 part_off = self.ESP_COREDUMP_PART_TABLE_OFF 525 try: 526 tool_args = [sys.executable, PARTTOOL_PY, '-q', '--partition-table-offset', str(part_off)] 527 if self.port: 528 tool_args.extend(['--port', self.port]) 529 invoke_args = tool_args + ['get_partition_info', '--partition-type', 'data', 530 '--partition-subtype', 'coredump', 531 '--info', 'offset', 'size'] 532 res = subprocess.check_output(invoke_args).strip() 533 (offset_str, size_str) = res.rsplit(b'\n')[-1].split(b' ') 534 size = int(size_str, 16) 535 offset = int(offset_str, 16) 536 logging.info('Core dump partition offset=%d, size=%d', offset, size) 537 except subprocess.CalledProcessError as e: 538 logging.error('parttool get partition info failed with err %d', e.returncode) 539 logging.debug("Command ran: '%s'", e.cmd) 540 logging.debug('Command out:') 541 logging.debug(e.output) 542 logging.error('Check if the coredump partition exists in partition table.') 543 raise e 544 return offset, size 545 546 547class ESPCoreDumpFileLoader(EspCoreDumpLoader): 548 def __init__(self, path, is_b64=False): 549 super(ESPCoreDumpFileLoader, self).__init__() 550 self.is_b64 = is_b64 551 552 self._get_coredump(path) 553 self._reload_coredump() 554 555 def _get_coredump(self, path): 556 """ 557 Loads core dump from (raw binary or base64-encoded) file 558 """ 559 logging.debug('Load core dump from "%s", %s format', path, 'b64' if self.is_b64 else 'raw') 560 if not self.is_b64: 561 self.core_src_file = open(path, mode='rb') 562 else: 563 self.core_src_file = self._create_temp_file() 564 with open(path, 'rb') as fb64: 565 while True: 566 line = fb64.readline() 567 if len(line) == 0: 568 break 569 data = base64.standard_b64decode(line.rstrip(b'\r\n')) 570 self.core_src_file.write(data) 571 self.core_src_file.flush() 572 self.core_src_file.seek(0) 573