1import os 2import queue 3import subprocess 4import sys 5import tempfile 6from contextlib import contextmanager 7from typing import Generator 8 9from .constants import COREDUMP_SCRIPT, TAG_KEY 10from .logger import Logger 11from .output_helpers import yellow_print 12from .web_socket_client import WebSocketClient 13 14# coredump related messages 15COREDUMP_UART_START = b'================= CORE DUMP START =================' 16COREDUMP_UART_END = b'================= CORE DUMP END =================' 17COREDUMP_UART_PROMPT = b'Press Enter to print core dump to UART...' 18 19# coredump states 20COREDUMP_IDLE = 0 21COREDUMP_READING = 1 22COREDUMP_DONE = 2 23 24# coredump decoding options 25COREDUMP_DECODE_DISABLE = 'disable' 26COREDUMP_DECODE_INFO = 'info' 27 28 29class CoreDump: 30 def __init__(self, decode_coredumps, event_queue, logger, websocket_client, elf_file): 31 # type: (str, queue.Queue, Logger, WebSocketClient, str) -> None 32 33 self._coredump_buffer = b'' 34 self._decode_coredumps = decode_coredumps 35 self.event_queue = event_queue 36 self._reading_coredump = COREDUMP_IDLE 37 self.logger = logger 38 self.websocket_client = websocket_client 39 self.elf_file = elf_file 40 41 @property 42 def in_progress(self) -> bool: 43 return bool(self._coredump_buffer) 44 45 def _process_coredump(self): # type: () -> None 46 if self._decode_coredumps != COREDUMP_DECODE_INFO: 47 raise NotImplementedError('process_coredump: %s not implemented' % self._decode_coredumps) 48 coredump_file = None 49 try: 50 # On Windows, the temporary file can't be read unless it is closed. 51 # Set delete=False and delete the file manually later. 52 with tempfile.NamedTemporaryFile(mode='wb', delete=False) as coredump_file: 53 coredump_file.write(self._coredump_buffer) 54 coredump_file.flush() 55 56 if self.websocket_client: 57 self.logger.output_enabled = True 58 yellow_print('Communicating through WebSocket') 59 self.websocket_client.send({'event': 'coredump', 60 'file': coredump_file.name, 61 'prog': self.elf_file}) 62 yellow_print('Waiting for debug finished event') 63 self.websocket_client.wait([('event', 'debug_finished')]) 64 yellow_print('Communications through WebSocket is finished') 65 else: 66 cmd = [sys.executable, 67 COREDUMP_SCRIPT, 68 'info_corefile', 69 '--core', coredump_file.name, 70 '--core-format', 'b64', 71 self.elf_file 72 ] 73 output = subprocess.check_output(cmd, stderr=subprocess.STDOUT) 74 self.logger.output_enabled = True 75 self.logger.print(output) # noqa: E999 76 self.logger.output_enabled = False # Will be reenabled in check_coredump_trigger_after_print 77 except subprocess.CalledProcessError as e: 78 yellow_print('Failed to run espcoredump script: {}\n{}\n\n'.format(e, e.output)) 79 self.logger.output_enabled = True 80 self.logger.print(COREDUMP_UART_START + b'\n') 81 self.logger.print(self._coredump_buffer) 82 # end line will be printed in handle_serial_input 83 finally: 84 if coredump_file is not None: 85 try: 86 os.unlink(coredump_file.name) 87 except OSError as e: 88 yellow_print('Couldn\'t remote temporary core dump file ({})'.format(e)) 89 90 def _check_coredump_trigger_before_print(self, line): # type: (bytes) -> None 91 if self._decode_coredumps == COREDUMP_DECODE_DISABLE: 92 return 93 if COREDUMP_UART_PROMPT in line: 94 yellow_print('Initiating core dump!') 95 self.event_queue.put((TAG_KEY, '\n')) 96 return 97 if COREDUMP_UART_START in line: 98 yellow_print('Core dump started (further output muted)') 99 self._reading_coredump = COREDUMP_READING 100 self._coredump_buffer = b'' 101 self.logger.output_enabled = False 102 return 103 if COREDUMP_UART_END in line: 104 self._reading_coredump = COREDUMP_DONE 105 yellow_print('\nCore dump finished!') 106 self._process_coredump() 107 return 108 if self._reading_coredump == COREDUMP_READING: 109 kb = 1024 110 buffer_len_kb = len(self._coredump_buffer) // kb 111 self._coredump_buffer += line.replace(b'\r', b'') + b'\n' 112 new_buffer_len_kb = len(self._coredump_buffer) // kb 113 if new_buffer_len_kb > buffer_len_kb: 114 yellow_print('Received %3d kB...' % new_buffer_len_kb, newline='\r') 115 116 def _check_coredump_trigger_after_print(self): # type: () -> None 117 if self._decode_coredumps == COREDUMP_DECODE_DISABLE: 118 return 119 120 # Re-enable output after the last line of core dump has been consumed 121 if not self.logger.output_enabled and self._reading_coredump == COREDUMP_DONE: 122 self._reading_coredump = COREDUMP_IDLE 123 self.logger.output_enabled = True 124 self._coredump_buffer = b'' 125 126 @contextmanager 127 def check(self, line): # type: (bytes) -> Generator 128 self._check_coredump_trigger_before_print(line) 129 try: 130 yield 131 finally: 132 self._check_coredump_trigger_after_print() 133