1#!/usr/bin/env python3
2#
3# Copyright (c) 2020 Intel Corporation
4#
5# SPDX-License-Identifier: Apache-2.0
6
7import abc
8import binascii
9import logging
10
11from coredump_parser.elf_parser import ThreadInfoOffset
12
13
14logger = logging.getLogger("gdbstub")
15
16
17class GdbStub(abc.ABC):
18    def __init__(self, logfile, elffile):
19        self.logfile = logfile
20        self.elffile = elffile
21        self.socket = None
22        self.gdb_signal = None
23        self.thread_ptrs = list()
24        self.selected_thread = 0
25
26        mem_regions = list()
27
28        for r in logfile.get_memory_regions():
29            mem_regions.append(r)
30
31        for r in elffile.get_memory_regions():
32            mem_regions.append(r)
33
34        self.mem_regions = mem_regions
35
36    def get_gdb_packet(self):
37        socket = self.socket
38        if socket is None:
39            return None
40
41        data = b''
42        checksum = 0
43        # Wait for '$'
44        while True:
45            ch = socket.recv(1)
46            if ch == b'$':
47                break
48
49        # Get a full packet
50        while True:
51            ch = socket.recv(1)
52            if ch == b'#':
53                # End of packet
54                break
55
56            checksum += ord(ch)
57            data += ch
58
59        # Get checksum (2-bytes)
60        ch = socket.recv(2)
61        in_chksum = ord(binascii.unhexlify(ch))
62
63        logger.debug(f"Received GDB packet: {data}")
64
65        if (checksum % 256) == in_chksum:
66            # ACK
67            logger.debug("ACK")
68            socket.send(b'+')
69
70            return data
71        else:
72            # NACK
73            logger.debug(f"NACK (checksum {in_chksum} != {checksum}")
74            socket.send(b'-')
75
76            return None
77
78    def put_gdb_packet(self, data):
79        socket = self.socket
80        if socket is None:
81            return
82
83        checksum = 0
84        for d in data:
85            checksum += d
86
87        pkt = b'$' + data + b'#'
88
89        checksum = checksum % 256
90        pkt += format(checksum, "02X").encode()
91
92        logger.debug(f"Sending GDB packet: {pkt}")
93
94        socket.send(pkt)
95
96    def get_memory(self, start_address, length):
97        def get_mem_region(addr):
98            for r in self.mem_regions:
99                if r['start'] <= addr < r['end']:
100                    return r
101
102            return None
103
104        # FIXME: Need more efficient way of extracting memory content
105        remaining = length
106        addr = start_address
107        barray = b''
108        r = get_mem_region(addr)
109        while remaining > 0:
110            if r is None:
111                barray = None
112                break
113
114            if addr > r['end']:
115                r = get_mem_region(addr)
116                continue
117
118            offset = addr - r['start']
119            barray += r['data'][offset:offset+1]
120
121            addr += 1
122            remaining -= 1
123
124        return barray
125
126    def handle_signal_query_packet(self):
127        # the '?' packet
128        pkt = b'S'
129        pkt += format(self.gdb_signal, "02X").encode()
130
131        self.put_gdb_packet(pkt)
132
133    @abc.abstractmethod
134    def handle_register_group_read_packet(self):
135        # the 'g' packet for reading a group of registers
136        pass
137
138    def handle_register_group_write_packet(self):
139        # the 'G' packet for writing to a group of registers
140        #
141        # We don't support writing so return error
142        self.put_gdb_packet(b"E01")
143
144    def handle_register_single_read_packet(self, pkt):
145        # the 'p' packet for reading a single register
146        self.put_gdb_packet(b"E01")
147
148    def handle_register_single_write_packet(self, pkt):
149        # the 'P' packet for writing to registers
150        #
151        # We don't support writing so return error
152        self.put_gdb_packet(b"E01")
153
154    def handle_memory_read_packet(self, pkt):
155        # the 'm' packet for reading memory: m<addr>,<len>
156
157        # extract address and length from packet
158        # and convert them into usable integer values
159        str_addr, str_length = pkt[1:].split(b',')
160        s_addr = int(b'0x' + str_addr, 16)
161        length = int(b'0x' + str_length, 16)
162
163        barray = self.get_memory(s_addr, length)
164
165        if barray is not None:
166            pkt = binascii.hexlify(barray)
167            self.put_gdb_packet(pkt)
168        else:
169            self.put_gdb_packet(b"E01")
170
171    def handle_memory_write_packet(self, pkt):
172        # the 'M' packet for writing to memory
173        #
174        # We don't support writing so return error
175        self.put_gdb_packet(b"E02")
176
177    def handle_general_query_packet(self, pkt):
178        if self.arch_supports_thread_operations() and self.elffile.has_kernel_thread_info():
179            # For packets qfThreadInfo/qsThreadInfo, obtain a list of all active thread IDs
180            if pkt[0:12] == b"qfThreadInfo":
181                threads_metadata_data = self.logfile.get_threads_metadata()["data"]
182
183                if threads_metadata_data is None:
184                    self.put_gdb_packet(b"l")
185                    return
186
187                size_t_size = self.elffile.get_kernel_thread_info_size_t_size()
188
189                # First, find and store the thread that _kernel considers current
190                k_curr_thread_offset = self.elffile.get_kernel_thread_info_offset(ThreadInfoOffset.THREAD_INFO_OFFSET_K_CURR_THREAD)
191                curr_thread_ptr_bytes = threads_metadata_data[k_curr_thread_offset:(k_curr_thread_offset + size_t_size)]
192                curr_thread_ptr = int.from_bytes(curr_thread_ptr_bytes, "little")
193                self.thread_ptrs.append(curr_thread_ptr)
194
195                thread_count = 1
196                response = b"m1"
197
198                # Next, find the pointer to the linked list of threads in the _kernel struct
199                k_threads_offset = self.elffile.get_kernel_thread_info_offset(ThreadInfoOffset.THREAD_INFO_OFFSET_K_THREADS)
200                thread_ptr_bytes = threads_metadata_data[k_threads_offset:(k_threads_offset + size_t_size)]
201                thread_ptr = int.from_bytes(thread_ptr_bytes, "little")
202
203                if thread_ptr != curr_thread_ptr:
204                    self.thread_ptrs.append(thread_ptr)
205                    thread_count += 1
206                    response += b"," + bytes(str(thread_count), 'ascii')
207
208                # Next walk the linked list, counting the number of threads and construct the response for qfThreadInfo along the way
209                t_next_thread_offset = self.elffile.get_kernel_thread_info_offset(ThreadInfoOffset.THREAD_INFO_OFFSET_T_NEXT_THREAD)
210                while thread_ptr is not None:
211                    thread_ptr_bytes = self.get_memory(thread_ptr + t_next_thread_offset, size_t_size)
212
213                    if thread_ptr_bytes is not None:
214                        thread_ptr = int.from_bytes(thread_ptr_bytes, "little")
215                        if thread_ptr == 0:
216                            thread_ptr = None
217                            continue
218
219                        if thread_ptr != curr_thread_ptr:
220                            self.thread_ptrs.append(thread_ptr)
221                            thread_count += 1
222                            response += b"," + bytes(f'{thread_count:x}', 'ascii')
223                    else:
224                        thread_ptr = None
225
226                self.put_gdb_packet(response)
227            elif pkt[0:12] == b"qsThreadInfo":
228                self.put_gdb_packet(b"l")
229
230            # For qThreadExtraInfo, obtain a printable string description of thread attributes for the provided thread
231            elif pkt[0:16] == b"qThreadExtraInfo":
232                thread_info_bytes = b''
233
234                thread_index_str = ''
235                for n in range(17, len(pkt)):
236                    thread_index_str += chr(pkt[n])
237
238                thread_id = int(thread_index_str, 16)
239                if len(self.thread_ptrs) > thread_id:
240                    thread_info_bytes += b'name: '
241                    thread_ptr = self.thread_ptrs[thread_id - 1]
242                    t_name_offset = self.elffile.get_kernel_thread_info_offset(ThreadInfoOffset.THREAD_INFO_OFFSET_T_NAME)
243
244                    thread_name_next_byte = self.get_memory(thread_ptr + t_name_offset, 1)
245                    index = 0
246                    while (thread_name_next_byte is not None) and (thread_name_next_byte != b'\x00'):
247                        thread_info_bytes += thread_name_next_byte
248
249                        index += 1
250                        thread_name_next_byte = self.get_memory(thread_ptr + t_name_offset + index, 1)
251
252                    t_state_offset = self.elffile.get_kernel_thread_info_offset(ThreadInfoOffset.THREAD_INFO_OFFSET_T_STATE)
253                    thread_state_byte = self.get_memory(thread_ptr + t_state_offset, 1)
254                    if thread_state_byte is not None:
255                        thread_state = int.from_bytes(thread_state_byte, "little")
256                        thread_info_bytes += b', state: ' + bytes(hex(thread_state), 'ascii')
257
258                    t_user_options_offset = self.elffile.get_kernel_thread_info_offset(ThreadInfoOffset.THREAD_INFO_OFFSET_T_USER_OPTIONS)
259                    thread_user_options_byte = self.get_memory(thread_ptr + t_user_options_offset, 1)
260                    if thread_user_options_byte is not None:
261                        thread_user_options = int.from_bytes(thread_user_options_byte, "little")
262                        thread_info_bytes += b', user_options: ' + bytes(hex(thread_user_options), 'ascii')
263
264                    t_prio_offset = self.elffile.get_kernel_thread_info_offset(ThreadInfoOffset.THREAD_INFO_OFFSET_T_PRIO)
265                    thread_prio_byte = self.get_memory(thread_ptr + t_prio_offset, 1)
266                    if thread_prio_byte is not None:
267                        thread_prio = int.from_bytes(thread_prio_byte, "little")
268                        thread_info_bytes += b', prio: ' + bytes(hex(thread_prio), 'ascii')
269
270                self.put_gdb_packet(binascii.hexlify(thread_info_bytes))
271            else:
272                self.put_gdb_packet(b'')
273        else:
274            self.put_gdb_packet(b'')
275
276    def arch_supports_thread_operations(self):
277        return False
278
279    def handle_thread_alive_packet(self, pkt):
280        # the 'T' packet for finding out if a thread is alive.
281        if self.arch_supports_thread_operations() and self.elffile.has_kernel_thread_info():
282            # Reply OK to report thread alive, allowing GDB to perform other thread operations
283            self.put_gdb_packet(b'OK')
284        else:
285            self.put_gdb_packet(b'')
286
287    def handle_thread_register_group_read_packet(self):
288        self.put_gdb_packet(b'')
289
290    def handle_thread_op_packet(self, pkt):
291        # the 'H' packet for setting thread for subsequent operations.
292        if self.arch_supports_thread_operations() and self.elffile.has_kernel_thread_info():
293            if pkt[0:2] == b"Hg":
294                thread_index_str = ''
295                for n in range(2, len(pkt)):
296                    thread_index_str += chr(pkt[n])
297
298                # Thread-id of '0' indicates an arbitrary process or thread
299                if thread_index_str in ('0', ''):
300                    self.selected_thread = 0
301                    self.handle_register_group_read_packet()
302                    return
303
304                self.selected_thread = int(thread_index_str, 16) - 1
305                self.handle_thread_register_group_read_packet()
306            else:
307                self.put_gdb_packet(b'')
308        else:
309            self.put_gdb_packet(b'')
310
311    def run(self, socket):
312        self.socket = socket
313
314        while True:
315            pkt = self.get_gdb_packet()
316            if pkt is None:
317                continue
318
319            pkt_type = pkt[0:1]
320            logger.debug(f"Got packet type: {pkt_type}")
321
322            if pkt_type == b'?':
323                self.handle_signal_query_packet()
324            elif pkt_type in (b'C', b'S'):
325                # Continue/stepping execution, which is not supported.
326                # So signal exception again
327                self.handle_signal_query_packet()
328            elif pkt_type == b'g':
329                self.handle_register_group_read_packet()
330            elif pkt_type == b'G':
331                self.handle_register_group_write_packet()
332            elif pkt_type == b'p':
333                self.handle_register_single_read_packet(pkt)
334            elif pkt_type == b'P':
335                self.handle_register_single_write_packet(pkt)
336            elif pkt_type == b'm':
337                self.handle_memory_read_packet(pkt)
338            elif pkt_type == b'M':
339                self.handle_memory_write_packet(pkt)
340            elif pkt_type == b'q':
341                self.handle_general_query_packet(pkt)
342            elif pkt_type == b'T':
343                self.handle_thread_alive_packet(pkt)
344            elif pkt_type == b'H':
345                self.handle_thread_op_packet(pkt)
346            elif pkt_type == b'k':
347                # GDB quits
348                break
349            else:
350                self.put_gdb_packet(b'')
351