1# Copyright (c) 2017 Linaro Limited.
2# Copyright (c) 2024 Tenstorrent AI ULC
3#
4# SPDX-License-Identifier: Apache-2.0
5#
6# pylint: disable=duplicate-code
7
8'''Runner for openocd.'''
9
10import re
11import subprocess
12from os import path
13from pathlib import Path
14
15from zephyr_ext_common import ZEPHYR_BASE
16
17try:  # noqa SIM105
18    from elftools.elf.elffile import ELFFile
19except ImportError:
20    pass
21
22from runners.core import RunnerCaps, ZephyrBinaryRunner
23
24DEFAULT_OPENOCD_TCL_PORT = 6333
25DEFAULT_OPENOCD_TELNET_PORT = 4444
26DEFAULT_OPENOCD_GDB_PORT = 3333
27DEFAULT_OPENOCD_RTT_PORT = 5555
28DEFAULT_OPENOCD_RESET_HALT_CMD = 'reset init'
29DEFAULT_OPENOCD_TARGET_HANDLE = "_TARGETNAME"
30
31def to_num(number):
32    dev_match = re.search(r"^\d*\+dev", number)
33    dev_version = dev_match is not None
34
35    num_match = re.search(r"^\d*", number)
36    num = int(num_match.group(0))
37
38    if dev_version:
39        num += 1
40
41    return num
42
43class OpenOcdBinaryRunner(ZephyrBinaryRunner):
44    '''Runner front-end for openocd.'''
45
46    def __init__(self, cfg, pre_init=None, reset_halt_cmd=DEFAULT_OPENOCD_RESET_HALT_CMD,
47                 pre_load=None, load_cmd=None, verify_cmd=None, post_verify=None,
48                 do_verify=False, do_verify_only=False,
49                 tui=None, config=None, serial=None, use_elf=None,
50                 no_halt=False, no_init=False, no_targets=False,
51                 tcl_port=DEFAULT_OPENOCD_TCL_PORT,
52                 telnet_port=DEFAULT_OPENOCD_TELNET_PORT,
53                 gdb_port=DEFAULT_OPENOCD_GDB_PORT,
54                 gdb_client_port=DEFAULT_OPENOCD_GDB_PORT,
55                 gdb_init=None, no_load=False,
56                 target_handle=DEFAULT_OPENOCD_TARGET_HANDLE,
57                 rtt_port=DEFAULT_OPENOCD_RTT_PORT):
58        super().__init__(cfg)
59
60        if not path.exists(cfg.board_dir):
61            # try to find the board support in-tree
62            cfg_board_path = path.normpath(cfg.board_dir)
63            _temp_path = cfg_board_path.split("boards/")[1]
64            support = path.join(ZEPHYR_BASE, "boards", _temp_path, 'support')
65        else:
66            support = path.join(cfg.board_dir, 'support')
67
68
69        if not config:
70            default = path.join(support, 'openocd.cfg')
71            if path.exists(default):
72                config = [default]
73        self.openocd_config = config
74
75        search_args = []
76        if path.exists(support):
77            search_args.append('-s')
78            search_args.append(support)
79
80        if self.openocd_config is not None:
81            for i in self.openocd_config:
82                if path.exists(i) and not path.samefile(path.dirname(i), support):
83                    search_args.append('-s')
84                    search_args.append(path.dirname(i))
85
86        if cfg.openocd_search is not None:
87            for p in cfg.openocd_search:
88                search_args.extend(['-s', p])
89        self.openocd_cmd = [cfg.openocd or 'openocd'] + search_args
90        # openocd doesn't cope with Windows path names, so convert
91        # them to POSIX style just to be sure.
92        self.elf_name = Path(cfg.elf_file).as_posix() if cfg.elf_file else None
93        self.pre_init = pre_init or []
94        self.reset_halt_cmd = reset_halt_cmd
95        self.pre_load = pre_load or []
96        self.load_cmd = load_cmd
97        self.verify_cmd = verify_cmd
98        self.post_verify = post_verify or []
99        self.do_verify = do_verify or False
100        self.do_verify_only = do_verify_only or False
101        self.tcl_port = tcl_port
102        self.telnet_port = telnet_port
103        self.gdb_port = gdb_port
104        self.gdb_client_port = gdb_client_port
105        self.gdb_cmd = [cfg.gdb] if cfg.gdb else None
106        self.tui_arg = ['-tui'] if tui else []
107        self.halt_arg = [] if no_halt else ['-c halt']
108        self.init_arg = [] if no_init else ['-c init']
109        self.targets_arg = [] if no_targets else ['-c targets']
110        self.serial = ['-c set _ZEPHYR_BOARD_SERIAL ' + serial] if serial else []
111        self.use_elf = use_elf
112        self.gdb_init = gdb_init
113        self.load_arg = [] if no_load else ['-ex', 'load']
114        self.target_handle = target_handle
115        self.rtt_port = rtt_port
116
117    @classmethod
118    def name(cls):
119        return 'openocd'
120
121    @classmethod
122    def capabilities(cls):
123        return RunnerCaps(commands={'flash', 'debug', 'debugserver', 'attach', 'rtt'}, rtt=True)
124
125    @classmethod
126    def do_add_parser(cls, parser):
127        parser.add_argument('--config', action='append',
128                            help='''if given, override default config file;
129                            may be given multiple times''')
130        parser.add_argument('--serial', default="",
131                            help='''if given, selects FTDI instance by its serial number,
132                            defaults to empty''')
133        parser.add_argument('--use-elf', default=False, action='store_true',
134                            help='if given, Elf file will be used for loading instead of HEX image')
135        # Options for flashing:
136        parser.add_argument('--cmd-pre-init', action='append',
137                            help='''Command to run before calling init;
138                            may be given multiple times''')
139        parser.add_argument('--cmd-reset-halt', default=DEFAULT_OPENOCD_RESET_HALT_CMD,
140                            help=f'''Command to run for resetting and halting the target,
141                            defaults to "{DEFAULT_OPENOCD_RESET_HALT_CMD}"''')
142        parser.add_argument('--cmd-pre-load', action='append',
143                            help='''Command to run before flashing;
144                            may be given multiple times''')
145        parser.add_argument('--cmd-load',
146                            help='''Command to load/flash binary
147                            (required when flashing)''')
148        parser.add_argument('--cmd-verify',
149                            help='''Command to verify flashed binary''')
150        parser.add_argument('--cmd-post-verify', action='append',
151                            help='''Command to run after verification;
152                            may be given multiple times''')
153        parser.add_argument('--verify', action='store_true',
154                            help='if given, verify after flash')
155        parser.add_argument('--verify-only', action='store_true',
156                            help='if given, do verify and verify only. No flashing')
157
158        # Options for debugging:
159        parser.add_argument('--tui', default=False, action='store_true',
160                            help='if given, GDB uses -tui')
161        parser.add_argument('--tcl-port', default=DEFAULT_OPENOCD_TCL_PORT,
162                            help='openocd TCL port, defaults to 6333')
163        parser.add_argument('--telnet-port',
164                            default=DEFAULT_OPENOCD_TELNET_PORT,
165                            help='openocd telnet port, defaults to 4444')
166        parser.add_argument('--gdb-port', default=DEFAULT_OPENOCD_GDB_PORT,
167                            help='openocd gdb port, defaults to 3333')
168        parser.add_argument('--gdb-client-port', default=DEFAULT_OPENOCD_GDB_PORT,
169                            help='''openocd gdb client port if multiple ports come
170                            up, defaults to 3333''')
171        parser.add_argument('--gdb-init', action='append',
172                            help='if given, add GDB init commands')
173        parser.add_argument('--no-halt', action='store_true',
174                            help='if given, no halt issued in gdb server cmd')
175        parser.add_argument('--no-init', action='store_true',
176                            help='if given, no init issued in gdb server cmd')
177        parser.add_argument('--no-targets', action='store_true',
178                            help='if given, no target issued in gdb server cmd')
179        parser.add_argument('--no-load', action='store_true',
180                            help='if given, no load issued in gdb server cmd')
181        parser.add_argument('--target-handle', default=DEFAULT_OPENOCD_TARGET_HANDLE,
182                            help=f'''Internal handle used in openocd targets cfg
183                            files, defaults to "{DEFAULT_OPENOCD_TARGET_HANDLE}".
184                            ''')
185        parser.add_argument('--rtt-port', default=DEFAULT_OPENOCD_RTT_PORT,
186                            help='openocd rtt port, defaults to 5555')
187
188
189    @classmethod
190    def do_create(cls, cfg, args):
191        return OpenOcdBinaryRunner(
192            cfg,
193            pre_init=args.cmd_pre_init, reset_halt_cmd=args.cmd_reset_halt,
194            pre_load=args.cmd_pre_load, load_cmd=args.cmd_load,
195            verify_cmd=args.cmd_verify, post_verify=args.cmd_post_verify,
196            do_verify=args.verify, do_verify_only=args.verify_only,
197            tui=args.tui, config=args.config, serial=args.serial,
198            use_elf=args.use_elf, no_halt=args.no_halt, no_init=args.no_init,
199            no_targets=args.no_targets, tcl_port=args.tcl_port,
200            telnet_port=args.telnet_port, gdb_port=args.gdb_port,
201            gdb_client_port=args.gdb_client_port, gdb_init=args.gdb_init,
202            no_load=args.no_load, target_handle=args.target_handle,
203            rtt_port=args.rtt_port)
204
205    def print_gdbserver_message(self):
206        if not self.thread_info_enabled:
207            thread_msg = '; no thread info available'
208        elif self.supports_thread_info():
209            thread_msg = '; thread info enabled'
210        else:
211            thread_msg = '; update OpenOCD software for thread info'
212        self.logger.info('OpenOCD GDB server running on port '
213                         f'{self.gdb_port}{thread_msg}')
214
215    def print_rttserver_message(self):
216        self.logger.info(f'OpenOCD RTT server running on port {self.rtt_port}')
217
218    def read_version(self):
219        self.require(self.openocd_cmd[0])
220
221        # OpenOCD prints in stderr, need redirect to get output
222        out = self.check_output([self.openocd_cmd[0], '--version'],
223                                stderr=subprocess.STDOUT).decode()
224
225        version_match = re.search(r"Open On-Chip Debugger (\d+.\d+.\d+)", out)
226        version = version_match.group(1).split('.')
227
228        return [to_num(i) for i in version]
229
230    def supports_thread_info(self):
231        # Zephyr rtos was introduced after 0.11.0
232        (major, minor, rev) = self.read_version()
233        return (major, minor, rev) > (0, 11, 0)
234
235    def do_run(self, command, **kwargs):
236        self.require(self.openocd_cmd[0])
237        if globals().get('ELFFile') is None:
238            raise RuntimeError(
239                'elftools missing; please "pip3 install elftools"')
240
241        self.cfg_cmd = []
242        if self.openocd_config is not None:
243            for i in self.openocd_config:
244                self.cfg_cmd.append('-f')
245                self.cfg_cmd.append(i)
246
247        if command == 'flash' and self.use_elf:
248            self.do_flash_elf(**kwargs)
249        elif command == 'flash':
250            self.do_flash(**kwargs)
251        elif command in ('attach', 'debug', 'rtt'):
252            self.do_attach_debug_rtt(command, **kwargs)
253        elif command == 'load':
254            self.do_load(**kwargs)
255        else:
256            self.do_debugserver(**kwargs)
257
258    def do_flash(self, **kwargs):
259        self.ensure_output('hex')
260        if self.load_cmd is None:
261            raise ValueError('Cannot flash; load command is missing')
262        if self.verify_cmd is None:
263            raise ValueError('Cannot flash; verify command is missing')
264
265        # openocd doesn't cope with Windows path names, so convert
266        # them to POSIX style just to be sure.
267        hex_name = Path(self.cfg.hex_file).as_posix()
268
269        self.logger.info(f'Flashing file: {hex_name}')
270
271        pre_init_cmd = []
272        pre_load_cmd = []
273        post_verify_cmd = []
274        for i in self.pre_init:
275            pre_init_cmd.append("-c")
276            pre_init_cmd.append(i)
277
278        for i in self.pre_load:
279            pre_load_cmd.append("-c")
280            pre_load_cmd.append(i)
281
282        for i in self.post_verify:
283            post_verify_cmd.append("-c")
284            post_verify_cmd.append(i)
285
286        load_image = []
287        if not self.do_verify_only:
288            load_image = ['-c', self.reset_halt_cmd,
289                          '-c', self.load_cmd + ' ' + hex_name]
290
291        verify_image = []
292        if self.do_verify or self.do_verify_only:
293            verify_image = ['-c', self.reset_halt_cmd,
294                            '-c', self.verify_cmd + ' ' + hex_name]
295
296        cmd = (self.openocd_cmd + self.serial + self.cfg_cmd +
297               pre_init_cmd + self.init_arg + self.targets_arg +
298               pre_load_cmd + load_image +
299               verify_image +
300               post_verify_cmd +
301               ['-c', 'reset run',
302                '-c', 'shutdown'])
303        self.check_call(cmd)
304
305    def do_flash_elf(self, **kwargs):
306        if self.elf_name is None:
307            raise ValueError('Cannot debug; no .elf specified')
308
309        # Extract entry point address from Elf to use it later with
310        # "resume" command of OpenOCD.
311        with open(self.elf_name, 'rb') as f:
312            ep_addr = f"0x{ELFFile(f).header['e_entry']:016x}"
313
314        pre_init_cmd = []
315        for i in self.pre_init:
316            pre_init_cmd.append("-c")
317            pre_init_cmd.append(i)
318
319        pre_load_cmd = []
320        load_image = []
321        if not self.do_verify_only:
322            for i in self.pre_load:
323                pre_load_cmd.append("-c")
324                pre_load_cmd.append(i)
325            load_image = ['-c', 'load_image ' + self.elf_name]
326
327        verify_image = []
328        post_verify_cmd = []
329        if self.do_verify or self.do_verify_only:
330            verify_image = ['-c', 'verify_image ' + self.elf_name]
331            for i in self.post_verify:
332                post_verify_cmd.append("-c")
333                post_verify_cmd.append(i)
334
335        prologue = ['-c', 'resume ' + ep_addr,
336                    '-c', 'shutdown']
337
338        cmd = (self.openocd_cmd + self.serial + self.cfg_cmd +
339               pre_init_cmd + self.init_arg + self.targets_arg +
340               pre_load_cmd + ['-c', self.reset_halt_cmd] +
341               load_image +
342               verify_image + post_verify_cmd +
343               prologue)
344
345        self.check_call(cmd)
346
347    def do_attach_debug_rtt(self, command, **kwargs):
348        if self.gdb_cmd is None:
349            raise ValueError('Cannot debug; no gdb specified')
350        if self.elf_name is None:
351            raise ValueError('Cannot debug; no .elf specified')
352
353        pre_init_cmd = []
354        for i in self.pre_init:
355            pre_init_cmd.append("-c")
356            pre_init_cmd.append(i)
357
358        if self.thread_info_enabled and self.supports_thread_info():
359            pre_init_cmd.append("-c")
360            rtos_command = f'${self.target_handle} configure -rtos Zephyr'
361            pre_init_cmd.append(rtos_command)
362
363        server_cmd = (self.openocd_cmd + self.serial + self.cfg_cmd +
364                      ['-c', f'tcl_port {self.tcl_port}',
365                       '-c', f'telnet_port {self.telnet_port}',
366                       '-c', f'gdb_port {self.gdb_port}'] +
367                      pre_init_cmd + self.init_arg + self.targets_arg +
368                      self.halt_arg)
369        gdb_cmd = (self.gdb_cmd + self.tui_arg +
370                   ['-ex', f'target extended-remote :{self.gdb_client_port}',
371                    self.elf_name])
372        if command == 'debug':
373            gdb_cmd.extend(self.load_arg)
374        if self.gdb_init is not None:
375            for i in self.gdb_init:
376                gdb_cmd.append("-ex")
377                gdb_cmd.append(i)
378        if command == 'rtt':
379            rtt_address = self.get_rtt_address()
380            if rtt_address is None:
381                raise ValueError("RTT Control block not be found")
382
383            # cannot prompt the user to press return for automation purposes
384            gdb_cmd.extend(['-ex', 'set pagination off'])
385            # start the internal openocd rtt service via gdb monitor commands
386            gdb_cmd.extend(
387                ['-ex', f'monitor rtt setup 0x{rtt_address:x} 0x10 "SEGGER RTT"'])
388            gdb_cmd.extend(['-ex', 'monitor reset run'])
389            gdb_cmd.extend(['-ex', 'monitor rtt start'])
390            gdb_cmd.extend(
391                ['-ex', f'monitor rtt server start {self.rtt_port} 0'])
392            # detach from the target and quit the gdb client session
393            gdb_cmd.extend(['-ex', 'detach', '-ex', 'quit'])
394
395        self.require(gdb_cmd[0])
396        self.print_gdbserver_message()
397
398        if command in ('attach', 'debug'):
399            server_proc = self.popen_ignore_int(server_cmd, stderr=subprocess.DEVNULL)
400            try:
401                self.run_client(gdb_cmd)
402            finally:
403                server_proc.terminate()
404                server_proc.wait()
405        elif command == 'rtt':
406            self.print_rttserver_message()
407            server_proc = self.popen_ignore_int(server_cmd)
408            try:
409                # run the binary with gdb, set up the rtt server (runs to completion)
410                subprocess.run(gdb_cmd)
411                # run the rtt client in the foreground
412                self.run_telnet_client('localhost', self.rtt_port)
413            finally:
414                server_proc.terminate()
415                server_proc.wait()
416
417    def do_debugserver(self, **kwargs):
418        pre_init_cmd = []
419        for i in self.pre_init:
420            pre_init_cmd.append("-c")
421            pre_init_cmd.append(i)
422
423        if self.thread_info_enabled and self.supports_thread_info():
424            pre_init_cmd.append("-c")
425            rtos_command = f'${self.target_handle} configure -rtos Zephyr'
426            pre_init_cmd.append(rtos_command)
427
428        cmd = (self.openocd_cmd + self.cfg_cmd +
429               ['-c', f'tcl_port {self.tcl_port}',
430                '-c', f'telnet_port {self.telnet_port}',
431                '-c', f'gdb_port {self.gdb_port}'] +
432               pre_init_cmd + self.init_arg + self.targets_arg +
433               ['-c', self.reset_halt_cmd])
434        self.print_gdbserver_message()
435        self.check_call(cmd)
436