1import os
2import queue
3import subprocess
4import sys
5import tempfile
6from contextlib import contextmanager
7from typing import Generator
8
9from .constants import COREDUMP_SCRIPT, TAG_KEY
10from .logger import Logger
11from .output_helpers import yellow_print
12from .web_socket_client import WebSocketClient
13
14# coredump related messages
15COREDUMP_UART_START = b'================= CORE DUMP START ================='
16COREDUMP_UART_END = b'================= CORE DUMP END ================='
17COREDUMP_UART_PROMPT = b'Press Enter to print core dump to UART...'
18
19# coredump states
20COREDUMP_IDLE = 0
21COREDUMP_READING = 1
22COREDUMP_DONE = 2
23
24# coredump decoding options
25COREDUMP_DECODE_DISABLE = 'disable'
26COREDUMP_DECODE_INFO = 'info'
27
28
29class CoreDump:
30    def __init__(self, decode_coredumps, event_queue, logger, websocket_client, elf_file):
31        # type: (str, queue.Queue, Logger, WebSocketClient, str) -> None
32
33        self._coredump_buffer = b''
34        self._decode_coredumps = decode_coredumps
35        self.event_queue = event_queue
36        self._reading_coredump = COREDUMP_IDLE
37        self.logger = logger
38        self.websocket_client = websocket_client
39        self.elf_file = elf_file
40
41    @property
42    def in_progress(self) -> bool:
43        return bool(self._coredump_buffer)
44
45    def _process_coredump(self):  # type: () -> None
46        if self._decode_coredumps != COREDUMP_DECODE_INFO:
47            raise NotImplementedError('process_coredump: %s not implemented' % self._decode_coredumps)
48        coredump_file = None
49        try:
50            # On Windows, the temporary file can't be read unless it is closed.
51            # Set delete=False and delete the file manually later.
52            with tempfile.NamedTemporaryFile(mode='wb', delete=False) as coredump_file:
53                coredump_file.write(self._coredump_buffer)
54                coredump_file.flush()
55
56            if self.websocket_client:
57                self.logger.output_enabled = True
58                yellow_print('Communicating through WebSocket')
59                self.websocket_client.send({'event': 'coredump',
60                                            'file': coredump_file.name,
61                                            'prog': self.elf_file})
62                yellow_print('Waiting for debug finished event')
63                self.websocket_client.wait([('event', 'debug_finished')])
64                yellow_print('Communications through WebSocket is finished')
65            else:
66                cmd = [sys.executable,
67                       COREDUMP_SCRIPT,
68                       'info_corefile',
69                       '--core', coredump_file.name,
70                       '--core-format', 'b64',
71                       self.elf_file
72                       ]
73                output = subprocess.check_output(cmd, stderr=subprocess.STDOUT)
74                self.logger.output_enabled = True
75                self.logger.print(output)  # noqa: E999
76                self.logger.output_enabled = False  # Will be reenabled in check_coredump_trigger_after_print
77        except subprocess.CalledProcessError as e:
78            yellow_print('Failed to run espcoredump script: {}\n{}\n\n'.format(e, e.output))
79            self.logger.output_enabled = True
80            self.logger.print(COREDUMP_UART_START + b'\n')
81            self.logger.print(self._coredump_buffer)
82            # end line will be printed in handle_serial_input
83        finally:
84            if coredump_file is not None:
85                try:
86                    os.unlink(coredump_file.name)
87                except OSError as e:
88                    yellow_print('Couldn\'t remote temporary core dump file ({})'.format(e))
89
90    def _check_coredump_trigger_before_print(self, line):  # type: (bytes) -> None
91        if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
92            return
93        if COREDUMP_UART_PROMPT in line:
94            yellow_print('Initiating core dump!')
95            self.event_queue.put((TAG_KEY, '\n'))
96            return
97        if COREDUMP_UART_START in line:
98            yellow_print('Core dump started (further output muted)')
99            self._reading_coredump = COREDUMP_READING
100            self._coredump_buffer = b''
101            self.logger.output_enabled = False
102            return
103        if COREDUMP_UART_END in line:
104            self._reading_coredump = COREDUMP_DONE
105            yellow_print('\nCore dump finished!')
106            self._process_coredump()
107            return
108        if self._reading_coredump == COREDUMP_READING:
109            kb = 1024
110            buffer_len_kb = len(self._coredump_buffer) // kb
111            self._coredump_buffer += line.replace(b'\r', b'') + b'\n'
112            new_buffer_len_kb = len(self._coredump_buffer) // kb
113            if new_buffer_len_kb > buffer_len_kb:
114                yellow_print('Received %3d kB...' % new_buffer_len_kb, newline='\r')
115
116    def _check_coredump_trigger_after_print(self):  # type: () -> None
117        if self._decode_coredumps == COREDUMP_DECODE_DISABLE:
118            return
119
120        # Re-enable output after the last line of core dump has been consumed
121        if not self.logger.output_enabled and self._reading_coredump == COREDUMP_DONE:
122            self._reading_coredump = COREDUMP_IDLE
123            self.logger.output_enabled = True
124            self._coredump_buffer = b''
125
126    @contextmanager
127    def check(self, line):  # type: (bytes) -> Generator
128        self._check_coredump_trigger_before_print(line)
129        try:
130            yield
131        finally:
132            self._check_coredump_trigger_after_print()
133