1#
2# Copyright 2021 Espressif Systems (Shanghai) CO., LTD
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8#     http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17import logging
18import re
19import time
20
21from pygdbmi.gdbcontroller import DEFAULT_GDB_TIMEOUT_SEC, GdbController
22
23from . import ESPCoreDumpError
24
25
26class EspGDB(object):
27    def __init__(self, gdb_path, gdb_cmds, core_filename, prog_filename, timeout_sec=DEFAULT_GDB_TIMEOUT_SEC):
28
29        """
30        Start GDB and initialize a GdbController instance
31        """
32        gdb_args = ['--quiet',  # inhibit dumping info at start-up
33                    '--nx',  # inhibit window interface
34                    '--nw',  # ignore .gdbinit
35                    '--interpreter=mi2',  # use GDB/MI v2
36                    '--core=%s' % core_filename]  # core file
37        for c in gdb_cmds:
38            if c:
39                gdb_args += ['-ex', c]
40        gdb_args.append(prog_filename)
41        self.p = GdbController(gdb_path=gdb_path, gdb_args=gdb_args)
42        self.timeout = timeout_sec
43
44        # Consume initial output by issuing a dummy command
45        self._gdbmi_run_cmd_get_responses(cmd='-data-list-register-values x pc',
46                                          resp_message=None, resp_type='console', multiple=True,
47                                          done_message='done', done_type='result')
48
49    def __del__(self):
50        try:
51            self.p.exit()
52        except IndexError:
53            logging.warning('Attempt to terminate the GDB process failed, because it is already terminated. Skip')
54
55    def _gdbmi_run_cmd_get_responses(self, cmd, resp_message, resp_type, multiple=True,
56                                     done_message=None, done_type=None):
57
58        self.p.write(cmd, read_response=False)
59        t_end = time.time() + self.timeout
60        filtered_response_list = []
61        all_responses = []
62        while time.time() < t_end:
63            more_responses = self.p.get_gdb_response(timeout_sec=0, raise_error_on_timeout=False)
64            filtered_response_list += filter(lambda rsp: rsp['message'] == resp_message and rsp['type'] == resp_type,
65                                             more_responses)
66            all_responses += more_responses
67            if filtered_response_list and not multiple:
68                break
69            if done_message and done_type and self._gdbmi_filter_responses(more_responses, done_message, done_type):
70                break
71        if not filtered_response_list and not multiple:
72            raise ESPCoreDumpError("Couldn't find response with message '{}', type '{}' in responses '{}'".format(
73                resp_message, resp_type, str(all_responses)
74            ))
75        return filtered_response_list
76
77    def _gdbmi_run_cmd_get_one_response(self, cmd, resp_message, resp_type):
78
79        return self._gdbmi_run_cmd_get_responses(cmd, resp_message, resp_type, multiple=False)[0]
80
81    def _gdbmi_data_evaluate_expression(self, expr):
82        """ Get the value of an expression, similar to the 'print' command """
83        return self._gdbmi_run_cmd_get_one_response("-data-evaluate-expression \"%s\"" % expr,
84                                                    'done', 'result')['payload']['value']
85
86    def get_freertos_task_name(self, tcb_addr):
87        """ Get FreeRTOS task name given the TCB address """
88        try:
89            val = self._gdbmi_data_evaluate_expression('(char*)((TCB_t *)0x%x)->pcTaskName' % tcb_addr)
90        except (ESPCoreDumpError, KeyError):
91            # KeyError is raised when "value" is not in "payload"
92            return ''
93
94        # Value is of form '0x12345678 "task_name"', extract the actual name
95        result = re.search(r"\"([^']*)\"$", val)
96        if result:
97            return result.group(1)
98        return ''
99
100    def run_cmd(self, gdb_cmd):
101        """ Execute a generic GDB console command via MI2
102        """
103        filtered_responses = self._gdbmi_run_cmd_get_responses(cmd="-interpreter-exec console \"%s\"" % gdb_cmd,
104                                                               resp_message=None, resp_type='console', multiple=True,
105                                                               done_message='done', done_type='result')
106        return ''.join([x['payload'] for x in filtered_responses]) \
107            .replace('\\n', '\n') \
108            .replace('\\t', '\t') \
109            .rstrip('\n')
110
111    def get_thread_info(self):
112        """ Get information about all threads known to GDB, and the current thread ID """
113        result = self._gdbmi_run_cmd_get_one_response('-thread-info', 'done', 'result')['payload']
114        current_thread_id = result['current-thread-id']
115        threads = result['threads']
116        return threads, current_thread_id
117
118    def switch_thread(self, thr_id):
119        """ Tell GDB to switch to a specific thread, given its ID """
120        self._gdbmi_run_cmd_get_one_response('-thread-select %s' % thr_id, 'done', 'result')
121
122    @staticmethod
123    def _gdbmi_filter_responses(responses, resp_message, resp_type):
124        return list(filter(lambda rsp: rsp['message'] == resp_message and rsp['type'] == resp_type, responses))
125
126    @staticmethod
127    def gdb2freertos_thread_id(gdb_target_id):
128        """ Convert GDB 'target ID' to the FreeRTOS TCB address """
129        return int(gdb_target_id.replace('process ', ''), 0)
130