1# Copyright 2023 NXP
2# SPDX-License-Identifier: Apache-2.0
3"""
4Runner for NXP S32 Debug Probe.
5"""
6
7import argparse
8import os
9import platform
10import re
11import shlex
12import subprocess
13import sys
14import tempfile
15from dataclasses import dataclass
16from pathlib import Path
17
18from runners.core import BuildConfiguration, RunnerCaps, RunnerConfig, ZephyrBinaryRunner
19
20NXP_S32DBG_USB_CLASS = 'NXP Probes'
21NXP_S32DBG_USB_VID = 0x15a2
22NXP_S32DBG_USB_PID = 0x0067
23
24
25@dataclass
26class NXPS32DebugProbeConfig:
27    """NXP S32 Debug Probe configuration parameters."""
28    conn_str: str = 's32dbg'
29    server_port: int = 45000
30    speed: int = 16000
31    remote_timeout: int = 30
32    reset_type: str | None = 'default'
33    reset_delay: int = 0
34
35
36class NXPS32DebugProbeRunner(ZephyrBinaryRunner):
37    """Runner front-end for NXP S32 Debug Probe."""
38
39    def __init__(self,
40                 runner_cfg: RunnerConfig,
41                 probe_cfg: NXPS32DebugProbeConfig,
42                 core_name: str,
43                 soc_name: str,
44                 soc_family_name: str,
45                 start_all_cores: bool,
46                 s32ds_path: str | None = None,
47                 tool_opt: list[str] | None = None) -> None:
48        super().__init__(runner_cfg)
49        self.elf_file: str = runner_cfg.elf_file or ''
50        self.probe_cfg: NXPS32DebugProbeConfig = probe_cfg
51        self.core_name: str = core_name
52        self.soc_name: str = soc_name
53        self.soc_family_name: str = soc_family_name
54        self.start_all_cores: bool = start_all_cores
55        self.s32ds_path_override: str | None = s32ds_path
56
57        self.tool_opt: list[str] = []
58        if tool_opt:
59            for opt in tool_opt:
60                self.tool_opt.extend(shlex.split(opt))
61
62        build_cfg = BuildConfiguration(runner_cfg.build_dir)
63        self.arch = build_cfg.get('CONFIG_ARCH').replace('"', '')
64
65    @classmethod
66    def name(cls) -> str:
67        return 'nxp_s32dbg'
68
69    @classmethod
70    def capabilities(cls) -> RunnerCaps:
71        return RunnerCaps(commands={'debug', 'debugserver', 'attach'},
72                          dev_id=True, tool_opt=True)
73
74    @classmethod
75    def dev_id_help(cls) -> str:
76        return '''Debug probe connection string as in "s32dbg[:<address>]"
77                  where <address> can be the IP address if TAP is available via Ethernet,
78                  the serial ID of the probe or empty if TAP is available via USB.'''
79
80    @classmethod
81    def tool_opt_help(cls) -> str:
82        return '''Additional options for GDB client when used with "debug" or "attach" commands
83                  or for GTA server when used with "debugserver" command.'''
84
85    @classmethod
86    def do_add_parser(cls, parser: argparse.ArgumentParser) -> None:
87        parser.add_argument('--core-name',
88                            required=True,
89                            help='Core name as supported by the debug probe (e.g. "R52_0_0")')
90        parser.add_argument('--soc-name',
91                            required=True,
92                            help='SoC name as supported by the debug probe (e.g. "S32Z270")')
93        parser.add_argument('--soc-family-name',
94                            required=True,
95                            help='SoC family name as supported by the debug probe (e.g. "s32z2e2")')
96        parser.add_argument('--start-all-cores',
97                            action='store_true',
98                            help='Start all SoC cores and not just the one being debugged. '
99                                 'Use together with "debug" command.')
100        parser.add_argument('--s32ds-path',
101                            help='Override the path to NXP S32 Design Studio installation. '
102                                 'By default, this runner will try to obtain it from the system '
103                                 'path, if available.')
104        parser.add_argument('--server-port',
105                            default=NXPS32DebugProbeConfig.server_port,
106                            type=int,
107                            help='GTA server port')
108        parser.add_argument('--speed',
109                            default=NXPS32DebugProbeConfig.speed,
110                            type=int,
111                            help='JTAG interface speed')
112        parser.add_argument('--remote-timeout',
113                            default=NXPS32DebugProbeConfig.remote_timeout,
114                            type=int,
115                            help='Number of seconds to wait for the remote target responses')
116
117    @classmethod
118    def do_create(cls, cfg: RunnerConfig, args: argparse.Namespace) -> 'NXPS32DebugProbeRunner':
119        probe_cfg = NXPS32DebugProbeConfig(args.dev_id,
120                                           server_port=args.server_port,
121                                           speed=args.speed,
122                                           remote_timeout=args.remote_timeout)
123
124        return NXPS32DebugProbeRunner(cfg, probe_cfg, args.core_name, args.soc_name,
125                                      args.soc_family_name, args.start_all_cores,
126                                      s32ds_path=args.s32ds_path, tool_opt=args.tool_opt)
127
128    @staticmethod
129    def find_usb_probes() -> list[str]:
130        """Return a list of debug probe serial numbers connected via USB to this host."""
131        # use system's native commands to enumerate and retrieve the USB serial ID
132        # to avoid bloating this runner with third-party dependencies that often
133        # require priviledged permissions to access the device info
134        macaddr_pattern = r'(?:[0-9a-f]{2}[:]){5}[0-9a-f]{2}'
135        if platform.system() == 'Windows':
136            cmd = f'pnputil /enum-devices /connected /class "{NXP_S32DBG_USB_CLASS}"'
137            serialid_pattern = f'instance id: +usb\\\\.*\\\\({macaddr_pattern})'
138        else:
139            cmd = f'lsusb -v -d {NXP_S32DBG_USB_VID:x}:{NXP_S32DBG_USB_PID:x}'
140            serialid_pattern = f'iserial +.*({macaddr_pattern})'
141
142        try:
143            outb = subprocess.check_output(shlex.split(cmd), stderr=subprocess.DEVNULL)
144            out = outb.decode('utf-8').strip().lower()
145        except subprocess.CalledProcessError as err:
146            raise RuntimeError('error while looking for debug probes connected') from err
147
148        devices: list[str] = []
149        if out and 'no devices were found' not in out:
150            devices = re.findall(serialid_pattern, out)
151
152        return sorted(devices)
153
154    @classmethod
155    def select_probe(cls) -> str:
156        """
157        Find debugger probes connected and return the serial number of the one selected.
158
159        If there are multiple debugger probes connected and this runner is being executed
160        in a interactive prompt, ask the user to select one of the probes.
161        """
162        probes_snr = cls.find_usb_probes()
163        if not probes_snr:
164            raise RuntimeError('there are no debug probes connected')
165        elif len(probes_snr) == 1:
166            return probes_snr[0]
167        else:
168            if not sys.stdin.isatty():
169                raise RuntimeError(
170                    f'refusing to guess which of {len(probes_snr)} connected probes to use '
171                    '(Interactive prompts disabled since standard input is not a terminal). '
172                    'Please specify a device ID on the command line.')
173
174            print('There are multiple debug probes connected')
175            for i, probe in enumerate(probes_snr, 1):
176                print(f'{i}. {probe}')
177
178            prompt = f'Please select one with desired serial number (1-{len(probes_snr)}): '
179            while True:
180                try:
181                    value: int = int(input(prompt))
182                except EOFError:
183                    sys.exit(0)
184                except ValueError:
185                    continue
186                if 1 <= value <= len(probes_snr):
187                    break
188            return probes_snr[value - 1]
189
190    @property
191    def runtime_environment(self) -> dict[str, str] | None:
192        """Execution environment used for the client process."""
193        if platform.system() == 'Windows':
194            python_lib = (self.s32ds_path / 'S32DS' / 'build_tools' / 'msys32'
195                        / 'mingw32' / 'lib' / 'python2.7')
196            return {
197                **os.environ,
198                'PYTHONPATH': f'{python_lib}{os.pathsep}{python_lib / "site-packages"}'
199            }
200
201        return None
202
203    @property
204    def script_globals(self) -> dict[str, str | int | None]:
205        """Global variables required by the debugger scripts."""
206        return {
207            '_PROBE_IP': self.probe_cfg.conn_str,
208            '_JTAG_SPEED': self.probe_cfg.speed,
209            '_GDB_SERVER_PORT': self.probe_cfg.server_port,
210            '_RESET_TYPE': self.probe_cfg.reset_type,
211            '_RESET_DELAY': self.probe_cfg.reset_delay,
212            '_REMOTE_TIMEOUT': self.probe_cfg.remote_timeout,
213            '_CORE_NAME': f'{self.soc_name}_{self.core_name}',
214            '_SOC_NAME': self.soc_name,
215            '_IS_LOGGING_ENABLED': False,
216            '_FLASH_NAME': None,    # not supported
217            '_SECURE_TYPE': None,   # not supported
218            '_SECURE_KEY': None,    # not supported
219        }
220
221    def server_commands(self) -> list[str]:
222        """Get launch commands to start the GTA server."""
223        server_exec = str(self.s32ds_path / 'S32DS' / 'tools' / 'S32Debugger'
224                          / 'Debugger' / 'Server' / 'gta' / 'gta')
225        cmd = [server_exec, '-p', str(self.probe_cfg.server_port)]
226        return cmd
227
228    def client_commands(self) -> list[str]:
229        """Get launch commands to start the GDB client."""
230        if self.arch == 'arm':
231            client_exec_name = 'arm-none-eabi-gdb-py'
232        elif self.arch == 'arm64':
233            client_exec_name = 'aarch64-none-elf-gdb-py'
234        else:
235            raise RuntimeError(f'architecture {self.arch} not supported')
236
237        client_exec = str(self.s32ds_path / 'S32DS' / 'tools' / 'gdb-arm'
238                          / 'arm32-eabi' / 'bin' / client_exec_name)
239        cmd = [client_exec]
240        return cmd
241
242    def get_script(self, name: str) -> Path:
243        """
244        Get the file path of a debugger script with the given name.
245
246        :param name: name of the script, without the SoC family name prefix
247        :returns: path to the script
248        :raises RuntimeError: if file does not exist
249        """
250        script = (self.s32ds_path / 'S32DS' / 'tools' / 'S32Debugger' / 'Debugger' / 'scripts'
251                  / self.soc_family_name / f'{self.soc_family_name}_{name}.py')
252        if not script.exists():
253            raise RuntimeError(f'script not found: {script}')
254        return script
255
256    def do_run(self, command: str, **kwargs) -> None:
257        """
258        Execute the given command.
259
260        :param command: command name to execute
261        :raises RuntimeError: if target architecture or host OS is not supported
262        :raises MissingProgram: if required tools are not found in the host
263        """
264        if platform.system() not in ('Windows', 'Linux'):
265            raise RuntimeError(f'runner not supported on {platform.system()} systems')
266
267        if self.arch not in ('arm', 'arm64'):
268            raise RuntimeError(f'architecture {self.arch} not supported')
269
270        app_name = 's32ds' if platform.system() == 'Windows' else 's32ds.sh'
271        self.s32ds_path = Path(self.require(app_name, path=self.s32ds_path_override)).parent
272
273        if not self.probe_cfg.conn_str:
274            self.probe_cfg.conn_str = f's32dbg:{self.select_probe()}'
275            self.logger.info(f'using debug probe {self.probe_cfg.conn_str}')
276
277        if command in ('attach', 'debug'):
278            self.ensure_output('elf')
279            self.do_attach_debug(command, **kwargs)
280        else:
281            self.do_debugserver(**kwargs)
282
283    def do_attach_debug(self, command: str, **kwargs) -> None:
284        """
285        Launch the GTA server and GDB client to start a debugging session.
286
287        :param command: command name to execute
288        """
289        gdb_script: list[str] = []
290
291        # setup global variables required for the scripts before sourcing them
292        for name, val in self.script_globals.items():
293            gdb_script.append(f'py {name} = {repr(val)}')
294
295        # load platform-specific debugger script
296        if command == 'debug':
297            if self.start_all_cores:
298                startup_script = self.get_script('generic_bareboard_all_cores')
299            else:
300                startup_script = self.get_script('generic_bareboard')
301        else:
302            startup_script = self.get_script('attach')
303        gdb_script.append(f'source {startup_script}')
304
305        # executes the SoC and board initialization sequence
306        if command == 'debug':
307            gdb_script.append('py board_init()')
308
309        # initializes the debugger connection to the core specified
310        gdb_script.append('py core_init()')
311
312        gdb_script.append(f'file {Path(self.elf_file).as_posix()}')
313        if command == 'debug':
314            gdb_script.append('load')
315
316        with tempfile.TemporaryDirectory(suffix='nxp_s32dbg') as tmpdir:
317            gdb_cmds = Path(tmpdir) / 'runner.nxp_s32dbg'
318            gdb_cmds.write_text('\n'.join(gdb_script), encoding='utf-8')
319            self.logger.debug(gdb_cmds.read_text(encoding='utf-8'))
320
321            server_cmd = self.server_commands()
322            client_cmd = self.client_commands()
323            client_cmd.extend(['-x', gdb_cmds.as_posix()])
324            client_cmd.extend(self.tool_opt)
325
326            self.run_server_and_client(server_cmd, client_cmd, env=self.runtime_environment)
327
328    def do_debugserver(self, **kwargs) -> None:
329        """Start the GTA server on a given port with the given extra parameters from cli."""
330        server_cmd = self.server_commands()
331        server_cmd.extend(self.tool_opt)
332        self.check_call(server_cmd)
333