1# Copyright (c) 2017 Linaro Limited.
2# Copyright (c) 2024 Tenstorrent AI ULC
3#
4# SPDX-License-Identifier: Apache-2.0
5#
6# pylint: disable=duplicate-code
7
8'''Runner for openocd.'''
9
10import re
11import subprocess
12from os import name as os_name
13from os import path
14from pathlib import Path
15
16from zephyr_ext_common import ZEPHYR_BASE
17
18if os_name != "nt":
19    import sys
20    import termios
21
22try:  # noqa SIM105
23    from elftools.elf.elffile import ELFFile
24except ImportError:
25    pass
26
27from runners.core import RunnerCaps, ZephyrBinaryRunner
28
29DEFAULT_OPENOCD_TCL_PORT = 6333
30DEFAULT_OPENOCD_TELNET_PORT = 4444
31DEFAULT_OPENOCD_GDB_PORT = 3333
32DEFAULT_OPENOCD_RTT_PORT = 5555
33DEFAULT_OPENOCD_RESET_HALT_CMD = 'reset init'
34DEFAULT_OPENOCD_TARGET_HANDLE = "_TARGETNAME"
35
36def to_num(number):
37    dev_match = re.search(r"^\d*\+dev", number)
38    dev_version = dev_match is not None
39
40    num_match = re.search(r"^\d*", number)
41    num = int(num_match.group(0))
42
43    if dev_version:
44        num += 1
45
46    return num
47
48class OpenOcdBinaryRunner(ZephyrBinaryRunner):
49    '''Runner front-end for openocd.'''
50
51    def __init__(self, cfg, pre_init=None, reset_halt_cmd=DEFAULT_OPENOCD_RESET_HALT_CMD,
52                 pre_load=None, erase_cmd=None, load_cmd=None, verify_cmd=None,
53                 post_verify=None, do_verify=False, do_verify_only=False, do_erase=False,
54                 tui=None, config=None, serial=None, use_elf=None,
55                 no_halt=False, no_init=False, no_targets=False,
56                 tcl_port=DEFAULT_OPENOCD_TCL_PORT,
57                 telnet_port=DEFAULT_OPENOCD_TELNET_PORT,
58                 gdb_port=DEFAULT_OPENOCD_GDB_PORT,
59                 gdb_client_port=DEFAULT_OPENOCD_GDB_PORT,
60                 gdb_init=None, no_load=False,
61                 target_handle=DEFAULT_OPENOCD_TARGET_HANDLE,
62                 rtt_port=DEFAULT_OPENOCD_RTT_PORT):
63        super().__init__(cfg)
64
65        if not path.exists(cfg.board_dir):
66            # try to find the board support in-tree
67            cfg_board_path = path.normpath(cfg.board_dir)
68            _temp_path = cfg_board_path.split("boards/")[1]
69            support = path.join(ZEPHYR_BASE, "boards", _temp_path, 'support')
70        else:
71            support = path.join(cfg.board_dir, 'support')
72
73
74        if not config:
75            default = path.join(support, 'openocd.cfg')
76            if path.exists(default):
77                config = [default]
78        self.openocd_config = config
79
80        search_args = []
81        if path.exists(support):
82            search_args.append('-s')
83            search_args.append(support)
84
85        if self.openocd_config is not None:
86            for i in self.openocd_config:
87                if path.exists(i) and not path.samefile(path.dirname(i), support):
88                    search_args.append('-s')
89                    search_args.append(path.dirname(i))
90
91        if cfg.openocd_search is not None:
92            for p in cfg.openocd_search:
93                search_args.extend(['-s', p])
94        self.openocd_cmd = [cfg.openocd or 'openocd'] + search_args
95        # openocd doesn't cope with Windows path names, so convert
96        # them to POSIX style just to be sure.
97        self.elf_name = Path(cfg.elf_file).as_posix() if cfg.elf_file else None
98        self.pre_init = pre_init or []
99        self.reset_halt_cmd = reset_halt_cmd
100        self.pre_load = pre_load or []
101        self.erase_cmd = erase_cmd
102        self.load_cmd = load_cmd
103        self.verify_cmd = verify_cmd
104        self.post_verify = post_verify or []
105        self.do_verify = do_verify or False
106        self.do_verify_only = do_verify_only or False
107        self.do_erase = do_erase or False
108        self.tcl_port = tcl_port
109        self.telnet_port = telnet_port
110        self.gdb_port = gdb_port
111        self.gdb_client_port = gdb_client_port
112        self.gdb_cmd = [cfg.gdb] if cfg.gdb else None
113        self.tui_arg = ['-tui'] if tui else []
114        self.halt_arg = [] if no_halt else ['-c halt']
115        self.init_arg = [] if no_init else ['-c init']
116        self.targets_arg = [] if no_targets else ['-c targets']
117        self.serial = ['-c set _ZEPHYR_BOARD_SERIAL ' + serial] if serial else []
118        self.use_elf = use_elf
119        self.gdb_init = gdb_init
120        self.load_arg = [] if no_load else ['-ex', 'load']
121        self.target_handle = target_handle
122        self.rtt_port = rtt_port
123
124    @classmethod
125    def name(cls):
126        return 'openocd'
127
128    @classmethod
129    def capabilities(cls):
130        return RunnerCaps(commands={'flash', 'debug', 'debugserver', 'attach', 'rtt'},
131                          rtt=True, erase=True)
132
133    @classmethod
134    def do_add_parser(cls, parser):
135        parser.add_argument('--config', action='append',
136                            help='''if given, override default config file;
137                            may be given multiple times''')
138        parser.add_argument('--serial', default="",
139                            help='''if given, selects FTDI instance by its serial number,
140                            defaults to empty''')
141        parser.add_argument('--use-elf', default=False, action='store_true',
142                            help='if given, Elf file will be used for loading instead of HEX image')
143        # Options for flashing:
144        parser.add_argument('--cmd-pre-init', action='append',
145                            help='''Command to run before calling init;
146                            may be given multiple times''')
147        parser.add_argument('--cmd-reset-halt', default=DEFAULT_OPENOCD_RESET_HALT_CMD,
148                            help=f'''Command to run for resetting and halting the target,
149                            defaults to "{DEFAULT_OPENOCD_RESET_HALT_CMD}"''')
150        parser.add_argument('--cmd-pre-load', action='append',
151                            help='''Command to run before flashing;
152                            may be given multiple times''')
153        parser.add_argument('--cmd-erase', action='append',
154                            help='''Command to erase device; may be given multiple times''')
155        parser.add_argument('--cmd-load',
156                            help='''Command to load/flash binary
157                            (required when flashing)''')
158        parser.add_argument('--cmd-verify',
159                            help='''Command to verify flashed binary''')
160        parser.add_argument('--cmd-post-verify', action='append',
161                            help='''Command to run after verification;
162                            may be given multiple times''')
163        parser.add_argument('--verify', action='store_true',
164                            help='if given, verify after flash')
165        parser.add_argument('--verify-only', action='store_true',
166                            help='if given, do verify and verify only. No flashing')
167
168        # Options for debugging:
169        parser.add_argument('--tui', default=False, action='store_true',
170                            help='if given, GDB uses -tui')
171        parser.add_argument('--tcl-port', default=DEFAULT_OPENOCD_TCL_PORT,
172                            help='openocd TCL port, defaults to 6333')
173        parser.add_argument('--telnet-port',
174                            default=DEFAULT_OPENOCD_TELNET_PORT,
175                            help='openocd telnet port, defaults to 4444')
176        parser.add_argument('--gdb-port', default=DEFAULT_OPENOCD_GDB_PORT,
177                            help='openocd gdb port, defaults to 3333')
178        parser.add_argument('--gdb-client-port', default=DEFAULT_OPENOCD_GDB_PORT,
179                            help='''openocd gdb client port if multiple ports come
180                            up, defaults to 3333''')
181        parser.add_argument('--gdb-init', action='append',
182                            help='if given, add GDB init commands')
183        parser.add_argument('--no-halt', action='store_true',
184                            help='if given, no halt issued in gdb server cmd')
185        parser.add_argument('--no-init', action='store_true',
186                            help='if given, no init issued in gdb server cmd')
187        parser.add_argument('--no-targets', action='store_true',
188                            help='if given, no target issued in gdb server cmd')
189        parser.add_argument('--no-load', action='store_true',
190                            help='if given, no load issued in gdb server cmd')
191        parser.add_argument('--target-handle', default=DEFAULT_OPENOCD_TARGET_HANDLE,
192                            help=f'''Internal handle used in openocd targets cfg
193                            files, defaults to "{DEFAULT_OPENOCD_TARGET_HANDLE}".
194                            ''')
195        parser.add_argument('--rtt-port', default=DEFAULT_OPENOCD_RTT_PORT,
196                            help='openocd rtt port, defaults to 5555')
197
198
199    @classmethod
200    def do_create(cls, cfg, args):
201        return OpenOcdBinaryRunner(
202            cfg,
203            pre_init=args.cmd_pre_init, reset_halt_cmd=args.cmd_reset_halt,
204            pre_load=args.cmd_pre_load, erase_cmd=args.cmd_erase, load_cmd=args.cmd_load,
205            verify_cmd=args.cmd_verify, post_verify=args.cmd_post_verify,
206            do_verify=args.verify, do_verify_only=args.verify_only, do_erase=args.erase,
207            tui=args.tui, config=args.config, serial=args.serial,
208            use_elf=args.use_elf, no_halt=args.no_halt, no_init=args.no_init,
209            no_targets=args.no_targets, tcl_port=args.tcl_port,
210            telnet_port=args.telnet_port, gdb_port=args.gdb_port,
211            gdb_client_port=args.gdb_client_port, gdb_init=args.gdb_init,
212            no_load=args.no_load, target_handle=args.target_handle,
213            rtt_port=args.rtt_port)
214
215    def print_gdbserver_message(self):
216        if not self.thread_info_enabled:
217            thread_msg = '; no thread info available'
218        elif self.supports_thread_info():
219            thread_msg = '; thread info enabled'
220        else:
221            thread_msg = '; update OpenOCD software for thread info'
222        self.logger.info('OpenOCD GDB server running on port '
223                         f'{self.gdb_port}{thread_msg}')
224
225    def print_rttserver_message(self):
226        self.logger.info(f'OpenOCD RTT server running on port {self.rtt_port}')
227
228    def read_version(self):
229        self.require(self.openocd_cmd[0])
230
231        # OpenOCD prints in stderr, need redirect to get output
232        out = self.check_output([self.openocd_cmd[0], '--version'],
233                                stderr=subprocess.STDOUT).decode()
234
235        # Account for version info format of ADI fork of OpenOCD as well
236        version_match = re.search(r"Open On-Chip Debugger.* v?(\d+.\d+.\d+)[ \n]", out)
237        version = version_match.group(1).split('.')
238
239        return [to_num(i) for i in version]
240
241    def supports_thread_info(self):
242        # Zephyr rtos was introduced after 0.11.0
243        (major, minor, rev) = self.read_version()
244        return (major, minor, rev) > (0, 11, 0)
245
246    def do_run(self, command, **kwargs):
247        self.require(self.openocd_cmd[0])
248        if globals().get('ELFFile') is None:
249            raise RuntimeError(
250                'elftools missing; please "pip3 install elftools"')
251
252        self.cfg_cmd = []
253        if self.openocd_config is not None:
254            for i in self.openocd_config:
255                self.cfg_cmd.append('-f')
256                self.cfg_cmd.append(i)
257
258        if command == 'flash' and self.use_elf:
259            self.do_flash_elf(**kwargs)
260        elif command == 'flash':
261            self.do_flash(**kwargs)
262        elif command in ('attach', 'debug', 'rtt'):
263            self.do_attach_debug_rtt(command, **kwargs)
264        elif command == 'load':
265            self.do_load(**kwargs)
266        else:
267            self.do_debugserver(**kwargs)
268
269    def do_flash(self, **kwargs):
270        self.ensure_output('hex')
271        if self.load_cmd is None:
272            raise ValueError('Cannot flash; load command is missing')
273        if self.verify_cmd is None:
274            raise ValueError('Cannot flash; verify command is missing')
275
276        # openocd doesn't cope with Windows path names, so convert
277        # them to POSIX style just to be sure.
278        hex_name = Path(self.cfg.hex_file).as_posix()
279
280        self.logger.info(f'Flashing file: {hex_name}')
281
282        pre_init_cmd = []
283        pre_load_cmd = []
284        post_verify_cmd = []
285        for i in self.pre_init:
286            pre_init_cmd.append("-c")
287            pre_init_cmd.append(i)
288
289        for i in self.pre_load:
290            pre_load_cmd.append("-c")
291            pre_load_cmd.append(i)
292
293        for i in self.post_verify:
294            post_verify_cmd.append("-c")
295            post_verify_cmd.append(i)
296
297        load_image = []
298        if not self.do_verify_only:
299            # Halt target
300            load_image = ['-c', self.reset_halt_cmd]
301            # Perform any erase operations
302            if self.do_erase:
303                if self.erase_cmd is None:
304                    self.logger.error('--erase not supported for target without --cmd-erase')
305                    return
306                for erase_cmd in self.erase_cmd:
307                    load_image += ["-c", erase_cmd]
308                # Trim the "erase" from "flash write_image erase" since a mass erase is already done
309                if self.load_cmd.endswith(' erase'):
310                    self.load_cmd = self.load_cmd[:-6]
311            # Load image
312            load_image +=['-c', self.load_cmd + ' ' + hex_name]
313
314        verify_image = []
315        if self.do_verify or self.do_verify_only:
316            verify_image = ['-c', self.reset_halt_cmd,
317                            '-c', self.verify_cmd + ' ' + hex_name]
318
319        cmd = (self.openocd_cmd + self.serial + self.cfg_cmd +
320               pre_init_cmd + self.init_arg + self.targets_arg +
321               pre_load_cmd + load_image +
322               verify_image +
323               post_verify_cmd +
324               ['-c', 'reset run',
325                '-c', 'shutdown'])
326        self.check_call(cmd)
327
328    def do_flash_elf(self, **kwargs):
329        if self.elf_name is None:
330            raise ValueError('Cannot debug; no .elf specified')
331
332        # Extract entry point address from Elf to use it later with
333        # "resume" command of OpenOCD.
334        with open(self.elf_name, 'rb') as f:
335            ep_addr = f"0x{ELFFile(f).header['e_entry']:016x}"
336
337        pre_init_cmd = []
338        for i in self.pre_init:
339            pre_init_cmd.append("-c")
340            pre_init_cmd.append(i)
341
342        pre_load_cmd = []
343        load_image = []
344        if not self.do_verify_only:
345            for i in self.pre_load:
346                pre_load_cmd.append("-c")
347                pre_load_cmd.append(i)
348            load_image = ['-c', 'load_image ' + self.elf_name]
349
350        verify_image = []
351        post_verify_cmd = []
352        if self.do_verify or self.do_verify_only:
353            verify_image = ['-c', 'verify_image ' + self.elf_name]
354            for i in self.post_verify:
355                post_verify_cmd.append("-c")
356                post_verify_cmd.append(i)
357
358        prologue = ['-c', 'resume ' + ep_addr,
359                    '-c', 'shutdown']
360
361        cmd = (self.openocd_cmd + self.serial + self.cfg_cmd +
362               pre_init_cmd + self.init_arg + self.targets_arg +
363               pre_load_cmd + ['-c', self.reset_halt_cmd] +
364               load_image +
365               verify_image + post_verify_cmd +
366               prologue)
367
368        self.check_call(cmd)
369
370    def do_attach_debug_rtt(self, command, **kwargs):
371        if self.gdb_cmd is None:
372            raise ValueError('Cannot debug; no gdb specified')
373        if self.elf_name is None:
374            raise ValueError('Cannot debug; no .elf specified')
375
376        pre_init_cmd = []
377        for i in self.pre_init:
378            pre_init_cmd.append("-c")
379            pre_init_cmd.append(i)
380
381        if self.thread_info_enabled and self.supports_thread_info():
382            pre_init_cmd.append("-c")
383            rtos_command = f'${self.target_handle} configure -rtos Zephyr'
384            pre_init_cmd.append(rtos_command)
385
386        server_cmd = (self.openocd_cmd + self.serial + self.cfg_cmd +
387                      ['-c', f'tcl_port {self.tcl_port}',
388                       '-c', f'telnet_port {self.telnet_port}',
389                       '-c', f'gdb_port {self.gdb_port}'] +
390                      pre_init_cmd + self.init_arg + self.targets_arg +
391                      self.halt_arg)
392        gdb_cmd = (self.gdb_cmd + self.tui_arg +
393                   ['-ex', f'target extended-remote :{self.gdb_client_port}',
394                    self.elf_name])
395        if command == 'debug':
396            gdb_cmd.extend(self.load_arg)
397        if self.gdb_init is not None:
398            for i in self.gdb_init:
399                gdb_cmd.append("-ex")
400                gdb_cmd.append(i)
401        if command == 'rtt':
402            rtt_address = self.get_rtt_address()
403            if rtt_address is None:
404                raise ValueError("RTT Control block not be found")
405
406            # cannot prompt the user to press return for automation purposes
407            gdb_cmd.extend(['-ex', 'set pagination off'])
408            # start the internal openocd rtt service via gdb monitor commands
409            gdb_cmd.extend(
410                ['-ex', f'monitor rtt setup 0x{rtt_address:x} 0x10 "SEGGER RTT"'])
411            gdb_cmd.extend(['-ex', 'monitor reset run'])
412            gdb_cmd.extend(['-ex', 'monitor rtt start'])
413            gdb_cmd.extend(
414                ['-ex', f'monitor rtt server start {self.rtt_port} 0'])
415            # detach from the target and quit the gdb client session
416            gdb_cmd.extend(['-ex', 'detach', '-ex', 'quit'])
417
418        self.require(gdb_cmd[0])
419        self.print_gdbserver_message()
420
421        if command in ('attach', 'debug'):
422            server_proc = self.popen_ignore_int(server_cmd, stderr=subprocess.DEVNULL)
423            try:
424                self.run_client(gdb_cmd)
425            finally:
426                server_proc.terminate()
427                server_proc.wait()
428        elif command == 'rtt':
429            self.print_rttserver_message()
430            server_proc = self.popen_ignore_int(server_cmd)
431
432            if os_name != 'nt':
433                # Save the terminal settings
434                fd = sys.stdin.fileno()
435                new_term = termios.tcgetattr(fd)
436                old_term = termios.tcgetattr(fd)
437
438                # New terminal setting unbuffered
439                new_term[3] = new_term[3] & ~termios.ICANON & ~termios.ECHO
440                termios.tcsetattr(fd, termios.TCSAFLUSH, new_term)
441            else:
442                fd = None
443                old_term = None
444
445            try:
446                # run the binary with gdb, set up the rtt server (runs to completion)
447                subprocess.run(gdb_cmd)
448                # run the rtt client in the foreground
449                self.run_telnet_client('localhost', self.rtt_port)
450            finally:
451                if old_term is not None and fd is not None:
452                    termios.tcsetattr(fd, termios.TCSAFLUSH, old_term)
453
454                server_proc.terminate()
455                server_proc.wait()
456
457    def do_debugserver(self, **kwargs):
458        pre_init_cmd = []
459        for i in self.pre_init:
460            pre_init_cmd.append("-c")
461            pre_init_cmd.append(i)
462
463        if self.thread_info_enabled and self.supports_thread_info():
464            pre_init_cmd.append("-c")
465            rtos_command = f'${self.target_handle} configure -rtos Zephyr'
466            pre_init_cmd.append(rtos_command)
467
468        cmd = (self.openocd_cmd + self.cfg_cmd +
469               ['-c', f'tcl_port {self.tcl_port}',
470                '-c', f'telnet_port {self.telnet_port}',
471                '-c', f'gdb_port {self.gdb_port}'] +
472               pre_init_cmd + self.init_arg + self.targets_arg +
473               ['-c', self.reset_halt_cmd])
474        self.print_gdbserver_message()
475        self.check_call(cmd)
476