1#! /usr/bin/env python3
2
3# Copyright (c) 2017 Linaro Limited.
4# Copyright (c) 2017 Open Source Foundries Limited.
5#
6# SPDX-License-Identifier: Apache-2.0
7
8"""Zephyr binary runner core interfaces
9
10This provides the core ZephyrBinaryRunner class meant for public use,
11as well as some other helpers for concrete runner classes.
12"""
13
14import abc
15import argparse
16import errno
17import logging
18import os
19import platform
20import re
21import selectors
22import shlex
23import shutil
24import signal
25import socket
26import subprocess
27import sys
28from dataclasses import dataclass, field
29from enum import Enum
30from functools import partial
31from inspect import isabstract
32from typing import NamedTuple, NoReturn
33
34try:
35    from elftools.elf.elffile import ELFFile
36    ELFTOOLS_MISSING = False
37except ImportError:
38    ELFTOOLS_MISSING = True
39
40
41# Turn on to enable just logging the commands that would be run (at
42# info rather than debug level), without actually running them. This
43# can break runners that are expecting output or if one command
44# depends on another, so it's just for debugging.
45_DRY_RUN = False
46
47_logger = logging.getLogger('runners')
48
49# FIXME: I assume this code belongs somewhere else, but i couldn't figure out
50# a good location for it, so i put it here for now
51# We could potentially search for RTT blocks in hex or bin files as well,
52# but since the magic string is "SEGGER RTT", i thought it might be better
53# to avoid, at the risk of false positives.
54def find_rtt_block(elf_file: str) -> int | None:
55    if ELFTOOLS_MISSING:
56        raise RuntimeError('the Python dependency elftools was missing; '
57                           'see the getting started guide for details on '
58                           'how to fix')
59
60    with open(elf_file, 'rb') as f:
61        elffile = ELFFile(f)
62        for sect in elffile.iter_sections('SHT_SYMTAB'):
63            symbols = sect.get_symbol_by_name('_SEGGER_RTT')
64            if symbols is None:
65                continue
66            for s in symbols:
67                return s.entry.get('st_value')
68    return None
69
70
71class _DebugDummyPopen:
72
73    def terminate(self):
74        pass
75
76    def wait(self):
77        pass
78
79
80MAX_PORT = 49151
81
82
83class NetworkPortHelper:
84    '''Helper class for dealing with local IP network ports.'''
85
86    def get_unused_ports(self, starting_from):
87        '''Find unused network ports, starting at given values.
88
89        starting_from is an iterable of ports the caller would like to use.
90
91        The return value is an iterable of ports, in the same order, using
92        the given values if they were unused, or the next sequentially
93        available unused port otherwise.
94
95        Ports may be bound between this call's check and actual usage, so
96        callers still need to handle errors involving returned ports.'''
97        start = list(starting_from)
98        used = self._used_now()
99        ret = []
100
101        for desired in start:
102            port = desired
103            while port in used:
104                port += 1
105                if port > MAX_PORT:
106                    msg = "ports above {} are in use"
107                    raise ValueError(msg.format(desired))
108            used.add(port)
109            ret.append(port)
110
111        return ret
112
113    def _used_now(self):
114        handlers = {
115            'Windows': self._used_now_windows,
116            'Linux': self._used_now_linux,
117            'Darwin': self._used_now_darwin,
118        }
119        handler = handlers[platform.system()]
120        return handler()
121
122    def _used_now_windows(self):
123        cmd = ['netstat', '-a', '-n', '-p', 'tcp']
124        return self._parser_windows(cmd)
125
126    def _used_now_linux(self):
127        cmd = ['ss', '-a', '-n', '-t']
128        return self._parser_linux(cmd)
129
130    def _used_now_darwin(self):
131        cmd = ['netstat', '-a', '-n', '-p', 'tcp']
132        return self._parser_darwin(cmd)
133
134    @staticmethod
135    def _parser_windows(cmd):
136        out = subprocess.check_output(cmd).split(b'\r\n')
137        used_bytes = [x.split()[1].rsplit(b':', 1)[1] for x in out
138                      if x.startswith(b'  TCP')]
139        return {int(b) for b in used_bytes}
140
141    @staticmethod
142    def _parser_linux(cmd):
143        out = subprocess.check_output(cmd).splitlines()[1:]
144        used_bytes = [s.split()[3].rsplit(b':', 1)[1] for s in out]
145        return {int(b) for b in used_bytes}
146
147    @staticmethod
148    def _parser_darwin(cmd):
149        out = subprocess.check_output(cmd).split(b'\n')
150        used_bytes = [x.split()[3].rsplit(b':', 1)[1] for x in out
151                      if x.startswith(b'tcp')]
152        return {int(b) for b in used_bytes}
153
154
155class BuildConfiguration:
156    '''This helper class provides access to build-time configuration.
157
158    Configuration options can be read as if the object were a dict,
159    either object['CONFIG_FOO'] or object.get('CONFIG_FOO').
160
161    Kconfig configuration values are available (parsed from .config).'''
162
163    config_prefix = 'CONFIG'
164
165    def __init__(self, build_dir: str):
166        self.build_dir = build_dir
167        self.options: dict[str, str | int] = {}
168        self.path = os.path.join(self.build_dir, 'zephyr', '.config')
169        self._parse()
170
171    def __contains__(self, item):
172        return item in self.options
173
174    def __getitem__(self, item):
175        return self.options[item]
176
177    def get(self, option, *args):
178        return self.options.get(option, *args)
179
180    def getboolean(self, option):
181        '''If a boolean option is explicitly set to y or n,
182        returns its value. Otherwise, falls back to False.
183        '''
184        return self.options.get(option, False)
185
186    def _parse(self):
187        filename = self.path
188
189        opt_value = re.compile(f'^(?P<option>{self.config_prefix}_[A-Za-z0-9_]+)=(?P<value>.*)$')
190        not_set = re.compile(f'^# (?P<option>{self.config_prefix}_[A-Za-z0-9_]+) is not set$')
191
192        with open(filename) as f:
193            for line in f:
194                match = opt_value.match(line)
195                if match:
196                    value = match.group('value').rstrip()
197                    if value.startswith('"') and value.endswith('"'):
198                        # A string literal should have the quotes stripped,
199                        # but otherwise be left as is.
200                        value = value[1:-1]
201                    elif value == 'y':
202                        # The character 'y' is a boolean option
203                        # that is set to True.
204                        value = True
205                    else:
206                        # Neither a string nor 'y', so try to parse it
207                        # as an integer.
208                        try:
209                            base = 16 if value.startswith('0x') else 10
210                            self.options[match.group('option')] = int(value, base=base)
211                            continue
212                        except ValueError:
213                            pass
214
215                    self.options[match.group('option')] = value
216                    continue
217
218                match = not_set.match(line)
219                if match:
220                    # '# CONFIG_FOO is not set' means a boolean option is false.
221                    self.options[match.group('option')] = False
222
223class SysbuildConfiguration(BuildConfiguration):
224    '''This helper class provides access to sysbuild-time configuration.
225
226    Configuration options can be read as if the object were a dict,
227    either object['SB_CONFIG_FOO'] or object.get('SB_CONFIG_FOO').
228
229    Kconfig configuration values are available (parsed from .config).'''
230
231    config_prefix = 'SB_CONFIG'
232
233    def _parse(self):
234        # If the build does not use sysbuild, skip parsing the file.
235        if not os.path.exists(self.path):
236            return
237        super()._parse()
238
239class MissingProgram(FileNotFoundError):
240    '''FileNotFoundError subclass for missing program dependencies.
241
242    No significant changes from the parent FileNotFoundError; this is
243    useful for explicitly signaling that the file in question is a
244    program that some class requires to proceed.
245
246    The filename attribute contains the missing program.'''
247
248    def __init__(self, program):
249        super().__init__(errno.ENOENT, os.strerror(errno.ENOENT), program)
250
251
252_RUNNERCAPS_COMMANDS = {'flash', 'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'}
253
254@dataclass
255class RunnerCaps:
256    '''This class represents a runner class's capabilities.
257
258    Each capability is represented as an attribute with the same
259    name. Flag attributes are True or False.
260
261    Available capabilities:
262
263    - commands: set of supported commands; default is {'flash',
264      'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'}.
265
266    - dev_id: whether the runner supports device identifiers, in the form of an
267      -i, --dev-id option. This is useful when the user has multiple debuggers
268      connected to a single computer, in order to select which one will be used
269      with the command provided.
270
271    - mult_dev_ids: whether the runner supports multiple device identifiers
272      for a single operation, allowing for bulk flashing of devices.
273
274    - flash_addr: whether the runner supports flashing to an
275      arbitrary address. Default is False. If true, the runner
276      must honor the --dt-flash option.
277
278    - erase: whether the runner supports an --erase option, which
279      does a mass-erase of the entire addressable flash on the target
280      before flashing. On multi-core SoCs, this may only erase portions of
281      flash specific the actual target core. (This option can be useful for
282      things like clearing out old settings values or other subsystem state
283      that may affect the behavior of the zephyr image. It is also sometimes
284      needed by SoCs which have flash-like areas that can't be sector
285      erased by the underlying tool before flashing; UICR on nRF SoCs
286      is one example.)
287
288    - reset: whether the runner supports a --reset option, which
289      resets the device after a flash operation is complete.
290
291    - extload: whether the runner supports a --extload option, which
292      must be given one time and is passed on to the underlying tool
293      that the runner wraps.
294
295    - tool_opt: whether the runner supports a --tool-opt (-O) option, which
296      can be given multiple times and is passed on to the underlying tool
297      that the runner wraps.
298
299    - file: whether the runner supports a --file option, which specifies
300      exactly the file that should be used to flash, overriding any default
301      discovered in the build directory.
302
303    - hide_load_files: whether the elf/hex/bin file arguments should be hidden.
304
305    - rtt: whether the runner supports SEGGER RTT. This adds a --rtt-address
306      option.
307
308    - skip_load: whether the runner supports the --load/--no-load option, which
309      allows skipping the load of image on target before starting a debug session
310      (this option only affects the 'debug' command)
311    '''
312
313    commands: set[str] = field(default_factory=lambda: set(_RUNNERCAPS_COMMANDS))
314    dev_id: bool = False
315    mult_dev_ids: bool = False
316    flash_addr: bool = False
317    erase: bool = False
318    reset: bool = False
319    extload: bool = False
320    tool_opt: bool = False
321    file: bool = False
322    hide_load_files: bool = False
323    rtt: bool = False  # This capability exists separately from the rtt command
324                       # to allow other commands to use the rtt address
325    dry_run: bool = False
326    skip_load: bool = False
327    batch_debug: bool = False # In batch mode, GDB exits with status 0 after loading;
328                              # for automated debugging, add --batch with 'monitor go',
329                              # 'disconnect', and 'quit' commands (named batch_debug in west),
330                              # unlike interactive debug mode (default),
331                              # which stops and waits for user input
332
333    def __post_init__(self):
334        if self.mult_dev_ids and not self.dev_id:
335            raise RuntimeError('dev_id must be set along mult_dev_ids')
336        if not self.commands.issubset(_RUNNERCAPS_COMMANDS):
337            raise ValueError(f'{self.commands=} contains invalid command')
338
339
340def _missing_cap(cls: type['ZephyrBinaryRunner'], option: str) -> NoReturn:
341    # Helper function that's called when an option was given on the
342    # command line that corresponds to a missing capability in the
343    # runner class cls.
344
345    raise ValueError(f"{cls.name()} doesn't support {option} option")
346
347
348class FileType(Enum):
349    OTHER = 0
350    HEX = 1
351    BIN = 2
352    ELF = 3
353    MOT = 4
354
355
356class RunnerConfig(NamedTuple):
357    '''Runner execution-time configuration.
358
359    This is a common object shared by all runners. Individual runners
360    can register specific configuration options using their
361    do_add_parser() hooks.
362    '''
363    build_dir: str                  # application build directory
364    board_dir: str                  # board definition directory
365    elf_file: str | None         # zephyr.elf path, or None
366    exe_file: str | None         # zephyr.exe path, or None
367    hex_file: str | None         # zephyr.hex path, or None
368    bin_file: str | None         # zephyr.bin path, or None
369    uf2_file: str | None         # zephyr.uf2 path, or None
370    mot_file: str | None         # zephyr.mot path
371    file: str | None             # binary file path (provided by the user), or None
372    file_type: FileType | None = FileType.OTHER  # binary file type
373    gdb: str | None = None       # path to a usable gdb
374    openocd: str | None = None   # path to a usable openocd
375    openocd_search: list[str] = []  # add these paths to the openocd search path
376    rtt_address: int | None = None # address of the rtt control block
377
378
379_YN_CHOICES = ['Y', 'y', 'N', 'n', 'yes', 'no', 'YES', 'NO']
380
381
382class _DTFlashAction(argparse.Action):
383
384    def __call__(self, parser, namespace, values, option_string=None):
385        if values.lower().startswith('y'):
386            namespace.dt_flash = True
387        else:
388            namespace.dt_flash = False
389
390
391class DeprecatedAction(argparse.Action):
392
393    def __call__(self, parser, namespace, values, option_string=None):
394        _logger.warning(f'Argument {self.option_strings[0]} is deprecated' +
395                        (f' for your runner {self._cls.name()}'  if self._cls is not None else '') +
396                        f', use {self._replacement} instead.')
397        setattr(namespace, self.dest, values)
398
399def depr_action(*args, cls=None, replacement=None, **kwargs):
400    action = DeprecatedAction(*args, **kwargs)
401    action._cls = cls
402    action._replacement = replacement
403    return action
404
405class ZephyrBinaryRunner(abc.ABC):
406    '''Abstract superclass for binary runners (flashers, debuggers).
407
408    **Note**: this class's API has changed relatively rarely since it
409    as added, but it is not considered a stable Zephyr API, and may change
410    without notice.
411
412    With some exceptions, boards supported by Zephyr must provide
413    generic means to be flashed (have a Zephyr firmware binary
414    permanently installed on the device for running) and debugged
415    (have a breakpoint debugger and program loader on a host
416    workstation attached to a running target).
417
418    This is supported by four top-level commands managed by the
419    Zephyr build system:
420
421    - 'flash': flash a previously configured binary to the board,
422      start execution on the target, then return.
423
424    - 'debug': connect to the board via a debugging protocol, program
425      the flash, then drop the user into a debugger interface with
426      symbol tables loaded from the current binary, and block until it
427      exits.
428
429    - 'debugserver': connect via a board-specific debugging protocol,
430      then reset and halt the target. Ensure the user is now able to
431      connect to a debug server with symbol tables loaded from the
432      binary.
433
434    - 'attach': connect to the board via a debugging protocol, then drop
435      the user into a debugger interface with symbol tables loaded from
436      the current binary, and block until it exits. Unlike 'debug', this
437      command does not program the flash.
438
439    This class provides an API for these commands. Every subclass is
440    called a 'runner' for short. Each runner has a name (like
441    'pyocd'), and declares commands it can handle (like
442    'flash'). Boards (like 'nrf52dk/nrf52832') declare which runner(s)
443    are compatible with them to the Zephyr build system, along with
444    information on how to configure the runner to work with the board.
445
446    The build system will then place enough information in the build
447    directory to create and use runners with this class's create()
448    method, which provides a command line argument parsing API. You
449    can also create runners by instantiating subclasses directly.
450
451    In order to define your own runner, you need to:
452
453    1. Define a ZephyrBinaryRunner subclass, and implement its
454       abstract methods. You may need to override capabilities().
455
456    2. Make sure the Python module defining your runner class is
457       imported, e.g. by editing this package's __init__.py (otherwise,
458       get_runners() won't work).
459
460    3. Give your runner's name to the Zephyr build system in your
461       board's board.cmake.
462
463    Additional advice:
464
465    - If you need to import any non-standard-library modules, make sure
466      to catch ImportError and defer complaints about it to a RuntimeError
467      if one is missing. This avoids affecting users that don't require your
468      runner, while still making it clear what went wrong to users that do
469      require it that don't have the necessary modules installed.
470
471    - If you need to ask the user something (e.g. using input()), do it
472      in your create() classmethod, not do_run(). That ensures your
473      __init__() really has everything it needs to call do_run(), and also
474      avoids calling input() when not instantiating within a command line
475      application.
476
477    - Use self.logger to log messages using the standard library's
478      logging API; your logger is named "runner.<your-runner-name()>"
479
480    For command-line invocation from the Zephyr build system, runners
481    define their own argparse-based interface through the common
482    add_parser() (and runner-specific do_add_parser() it delegates
483    to), and provide a way to create instances of themselves from
484    a RunnerConfig and parsed runner-specific arguments via create().
485
486    Runners use a variety of host tools and configuration values, the
487    user interface to which is abstracted by this class. Each runner
488    subclass should take any values it needs to execute one of these
489    commands in its constructor.  The actual command execution is
490    handled in the run() method.'''
491
492    def __init__(self, cfg: RunnerConfig):
493        '''Initialize core runner state.'''
494
495        self.cfg = cfg
496        '''RunnerConfig for this instance.'''
497
498        self.logger = logging.getLogger(f'runners.{self.name()}')
499        '''logging.Logger for this instance.'''
500
501        self.dry_run = _DRY_RUN
502        '''log commands instead of executing them. Can be set by subclasses'''
503
504    @staticmethod
505    def get_runners() -> list[type['ZephyrBinaryRunner']]:
506        '''Get a list of all currently defined runner classes.'''
507        def inheritors(klass):
508            subclasses = set()
509            work = [klass]
510            while work:
511                parent = work.pop()
512                for child in parent.__subclasses__():
513                    if child not in subclasses:
514                        if not isabstract(child):
515                            subclasses.add(child)
516                        work.append(child)
517            return subclasses
518
519        return inheritors(ZephyrBinaryRunner)
520
521    @classmethod
522    @abc.abstractmethod
523    def name(cls) -> str:
524        '''Return this runner's user-visible name.
525
526        When choosing a name, pick something short and lowercase,
527        based on the name of the tool (like openocd, jlink, etc.) or
528        the target architecture/board (like xtensa etc.).'''
529
530    @classmethod
531    def capabilities(cls) -> RunnerCaps:
532        '''Returns a RunnerCaps representing this runner's capabilities.
533
534        This implementation returns the default capabilities.
535
536        Subclasses should override appropriately if needed.'''
537        return RunnerCaps()
538
539    @classmethod
540    def add_parser(cls, parser):
541        '''Adds a sub-command parser for this runner.
542
543        The given object, parser, is a sub-command parser from the
544        argparse module. For more details, refer to the documentation
545        for argparse.ArgumentParser.add_subparsers().
546
547        The lone common optional argument is:
548
549        * --dt-flash (if the runner capabilities includes flash_addr)
550
551        Runner-specific options are added through the do_add_parser()
552        hook.'''
553        # Unfortunately, the parser argument's type is not documented
554        # in typeshed, so we can't type annotate much here.
555
556        # Common options that depend on runner capabilities. If a
557        # capability is not supported, the option string or strings
558        # are added anyway, to prevent an individual runner class from
559        # using them to mean something else.
560        caps = cls.capabilities()
561
562        if caps.dev_id:
563            action = 'append' if caps.mult_dev_ids else 'store'
564            parser.add_argument('-i', '--dev-id',
565                                action=action,
566                                dest='dev_id',
567                                help=cls.dev_id_help())
568        else:
569            parser.add_argument('-i', '--dev-id', help=argparse.SUPPRESS)
570
571        if caps.flash_addr:
572            parser.add_argument('--dt-flash', default=False, choices=_YN_CHOICES,
573                                action=_DTFlashAction,
574                                help='''If 'yes', try to use flash address
575                                information from devicetree when flash
576                                addresses are unknown (e.g. when flashing a .bin)''')
577        else:
578            parser.add_argument('--dt-flash', help=argparse.SUPPRESS)
579
580        if caps.file:
581            parser.add_argument('-f', '--file',
582                                dest='file',
583                                help="path to binary file")
584            parser.add_argument('-t', '--file-type',
585                                dest='file_type',
586                                help="type of binary file")
587        else:
588            parser.add_argument('-f', '--file', help=argparse.SUPPRESS)
589            parser.add_argument('-t', '--file-type', help=argparse.SUPPRESS)
590
591        if caps.hide_load_files:
592            parser.add_argument('--elf-file', help=argparse.SUPPRESS)
593            parser.add_argument('--hex-file', help=argparse.SUPPRESS)
594            parser.add_argument('--bin-file', help=argparse.SUPPRESS)
595            parser.add_argument('--mot-file', help=argparse.SUPPRESS)
596        else:
597            parser.add_argument('--elf-file',
598                                metavar='FILE',
599                                action=(partial(depr_action, cls=cls,
600                                                replacement='-f/--file') if caps.file else None),
601                                help='path to zephyr.elf'
602                                if not caps.file else 'Deprecated, use -f/--file instead.')
603            parser.add_argument('--hex-file',
604                                metavar='FILE',
605                                action=(partial(depr_action, cls=cls,
606                                                replacement='-f/--file') if caps.file else None),
607                                help='path to zephyr.hex'
608                                if not caps.file else 'Deprecated, use -f/--file instead.')
609            parser.add_argument('--bin-file',
610                                metavar='FILE',
611                                action=(partial(depr_action, cls=cls,
612                                                replacement='-f/--file') if caps.file else None),
613                                help='path to zephyr.bin'
614                                if not caps.file else 'Deprecated, use -f/--file instead.')
615            parser.add_argument('--mot-file',
616                                metavar='FILE',
617                                action=(partial(depr_action, cls=cls,
618                                                replacement='-f/--file') if caps.file else None),
619                                help='path to zephyr.mot'
620                                if not caps.file else 'Deprecated, use -f/--file instead.')
621
622        parser.add_argument('--erase', action=argparse.BooleanOptionalAction,
623                            help=("mass erase flash before loading, or don't. "
624                                  "Default action depends on each specific runner."
625                                  if caps.erase else argparse.SUPPRESS))
626
627        parser.add_argument('--reset', action=argparse.BooleanOptionalAction,
628                            help=("reset device after flashing, or don't. "
629                                  "Default action depends on each specific runner."
630                                  if caps.reset else argparse.SUPPRESS))
631
632        parser.add_argument('--extload', dest='extload',
633                            help=(cls.extload_help() if caps.extload
634                                  else argparse.SUPPRESS))
635
636        parser.add_argument('-O', '--tool-opt', dest='tool_opt',
637                            default=[], action='append',
638                            help=(cls.tool_opt_help() if caps.tool_opt
639                                  else argparse.SUPPRESS))
640
641        if caps.rtt:
642            parser.add_argument('--rtt-address', dest='rtt_address',
643                                type=lambda x: int(x, 0),
644                                help="""address of RTT control block. If not supplied,
645                                it will be autodetected if possible""")
646        else:
647            parser.add_argument('--rtt-address', help=argparse.SUPPRESS)
648
649        parser.add_argument('--dry-run', action='store_true',
650                            help=('''Print all the commands without actually
651                            executing them''' if caps.dry_run else argparse.SUPPRESS))
652
653        # by default, 'west debug' is expected to flash before starting the session
654        parser.add_argument('--load', action=argparse.BooleanOptionalAction,
655                            help=("load image on target before 'west debug' session"
656                                  if caps.skip_load else argparse.SUPPRESS),
657                            default=True)
658
659        parser.add_argument('--batch', action=argparse.BooleanOptionalAction,
660                            help="enable west debug batch mode"
661                            if caps.batch_debug else argparse.SUPPRESS)
662
663        # Runner-specific options.
664        cls.do_add_parser(parser)
665
666    @classmethod
667    @abc.abstractmethod
668    def do_add_parser(cls, parser):
669        '''Hook for adding runner-specific options.'''
670
671    @classmethod  # noqa: B027
672    def args_from_previous_runner(cls, previous_runner,
673                                  args: argparse.Namespace):
674        '''Update arguments from a previously created runner.
675
676        This is intended for propagating relevant user responses
677        between multiple runs of the same runner, for example a
678        JTAG serial number.'''
679
680    @classmethod
681    def create(cls, cfg: RunnerConfig,
682               args: argparse.Namespace) -> 'ZephyrBinaryRunner':
683        '''Create an instance from command-line arguments.
684
685        - ``cfg``: runner configuration (pass to superclass __init__)
686        - ``args``: arguments parsed from execution environment, as
687          specified by ``add_parser()``.'''
688        caps = cls.capabilities()
689        if args.dev_id and not caps.dev_id:
690            _missing_cap(cls, '--dev-id')
691        if args.dt_flash and not caps.flash_addr:
692            _missing_cap(cls, '--dt-flash')
693        if args.erase and not caps.erase:
694            _missing_cap(cls, '--erase')
695        if args.reset and not caps.reset:
696            _missing_cap(cls, '--reset')
697        if args.extload and not caps.extload:
698            _missing_cap(cls, '--extload')
699        if args.tool_opt and not caps.tool_opt:
700            _missing_cap(cls, '--tool-opt')
701        if args.file and not caps.file:
702            _missing_cap(cls, '--file')
703        if args.file_type and not args.file:
704            raise ValueError("--file-type requires --file")
705        if args.file_type and not caps.file:
706            _missing_cap(cls, '--file-type')
707        if args.rtt_address and not caps.rtt:
708            _missing_cap(cls, '--rtt-address')
709        if args.dry_run and not caps.dry_run:
710            _missing_cap(cls, '--dry-run')
711
712        ret = cls.do_create(cfg, args)
713        if args.erase:
714            ret.logger.info('mass erase requested')
715        if args.reset:
716            ret.logger.info('reset after flashing requested')
717        return ret
718
719    @classmethod
720    @abc.abstractmethod
721    def do_create(cls, cfg: RunnerConfig,
722                  args: argparse.Namespace) -> 'ZephyrBinaryRunner':
723        '''Hook for instance creation from command line arguments.'''
724
725    @staticmethod
726    def get_flash_address(args: argparse.Namespace,
727                          build_conf: BuildConfiguration,
728                          default: int = 0x0) -> int:
729        '''Helper method for extracting a flash address.
730
731        If args.dt_flash is true, returns the address obtained from
732        ZephyrBinaryRunner.flash_address_from_build_conf(build_conf).
733
734        Otherwise (when args.dt_flash is False), the default value is
735        returned.'''
736        if args.dt_flash:
737            return ZephyrBinaryRunner.flash_address_from_build_conf(build_conf)
738        else:
739            return default
740
741    @staticmethod
742    def flash_address_from_build_conf(build_conf: BuildConfiguration):
743        '''If CONFIG_HAS_FLASH_LOAD_OFFSET is n in build_conf,
744        return the CONFIG_FLASH_BASE_ADDRESS value. Otherwise, return
745        CONFIG_FLASH_BASE_ADDRESS + CONFIG_FLASH_LOAD_OFFSET.
746        '''
747        if build_conf.getboolean('CONFIG_HAS_FLASH_LOAD_OFFSET'):
748            return (build_conf['CONFIG_FLASH_BASE_ADDRESS'] +
749                    build_conf['CONFIG_FLASH_LOAD_OFFSET'])
750        else:
751            return build_conf['CONFIG_FLASH_BASE_ADDRESS']
752
753    @staticmethod
754    def sram_address_from_build_conf(build_conf: BuildConfiguration):
755        '''return CONFIG_SRAM_BASE_ADDRESS.
756        '''
757        return build_conf['CONFIG_SRAM_BASE_ADDRESS']
758
759    def run(self, command: str, **kwargs):
760        '''Runs command ('flash', 'debug', 'debugserver', 'attach').
761
762        This is the main entry point to this runner.'''
763        caps = self.capabilities()
764        if command not in caps.commands:
765            raise ValueError(f'runner {self.name()} does not implement command {command}')
766        self.do_run(command, **kwargs)
767
768    @abc.abstractmethod
769    def do_run(self, command: str, **kwargs):
770        '''Concrete runner; run() delegates to this. Implement in subclasses.
771
772        In case of an unsupported command, raise a ValueError.'''
773
774    @property
775    def build_conf(self) -> BuildConfiguration:
776        '''Get a BuildConfiguration for the build directory.'''
777        if not hasattr(self, '_build_conf'):
778            self._build_conf = BuildConfiguration(self.cfg.build_dir)
779        return self._build_conf
780
781    @property
782    def sysbuild_conf(self) -> SysbuildConfiguration:
783        '''Get a SysbuildConfiguration for the sysbuild directory.'''
784        if not hasattr(self, '_sysbuild_conf'):
785            self._sysbuild_conf = SysbuildConfiguration(os.path.dirname(self.cfg.build_dir))
786        return self._sysbuild_conf
787
788    @property
789    def thread_info_enabled(self) -> bool:
790        '''Returns True if self.build_conf has
791        CONFIG_DEBUG_THREAD_INFO enabled.
792        '''
793        return self.build_conf.getboolean('CONFIG_DEBUG_THREAD_INFO')
794
795    @classmethod
796    def dev_id_help(cls) -> str:
797        ''' Get the ArgParse help text for the --dev-id option.'''
798        help = '''Device identifier. Use it to select
799                  which debugger, device, node or instance to
800                  target when multiple ones are available or
801                  connected.'''
802        addendum = '''\nThis option can be present multiple times.''' if \
803                   cls.capabilities().mult_dev_ids else ''
804        return help + addendum
805
806    @classmethod
807    def extload_help(cls) -> str:
808        ''' Get the ArgParse help text for the --extload option.'''
809        return '''External loader to be used by stm32cubeprogrammer
810                  to program the targeted external memory.
811                  The runner requires the external loader (*.stldr) filename.
812                  This external loader (*.stldr) must be located within
813                  STM32CubeProgrammer/bin/ExternalLoader directory.'''
814
815    @classmethod
816    def tool_opt_help(cls) -> str:
817        ''' Get the ArgParse help text for the --tool-opt option.'''
818        return '''Option to pass on to the underlying tool used
819                  by this runner. This can be given multiple times;
820                  the resulting arguments will be given to the tool
821                  in the order they appear on the command line.'''
822
823    @staticmethod
824    def require(program: str, path: str | None = None) -> str:
825        '''Require that a program is installed before proceeding.
826
827        :param program: name of the program that is required,
828                        or path to a program binary.
829        :param path:    PATH where to search for the program binary.
830                        By default check on the system PATH.
831
832        If ``program`` is an absolute path to an existing program
833        binary, this call succeeds. Otherwise, try to find the program
834        by name on the system PATH or in the given PATH, if provided.
835
836        If the program can be found, its path is returned.
837        Otherwise, raises MissingProgram.'''
838        ret = shutil.which(program, path=path)
839        if ret is None:
840            raise MissingProgram(program)
841        return ret
842
843    def get_rtt_address(self) -> int | None:
844        '''Helper method for extracting a the RTT control block address.
845
846        If args.rtt_address was supplied, returns that.
847
848        Otherwise, attempt to locate an rtt block in the elf file.
849        If this is not found, None is returned'''
850        if self.cfg.rtt_address is not None:
851            return self.cfg.rtt_address
852        elif self.cfg.elf_file is not None:
853            return find_rtt_block(self.cfg.elf_file)
854        return None
855
856    def run_server_and_client(self, server, client, **kwargs):
857        '''Run a server that ignores SIGINT, and a client that handles it.
858
859        This routine portably:
860
861        - creates a Popen object for the ``server`` command which ignores
862          SIGINT
863        - runs ``client`` in a subprocess while temporarily ignoring SIGINT
864        - cleans up the server after the client exits.
865        - the keyword arguments, if any, will be passed down to both server and
866          client subprocess calls
867
868        It's useful to e.g. open a GDB server and client.'''
869        server_proc = self.popen_ignore_int(server, **kwargs)
870        try:
871            self.run_client(client, **kwargs)
872        finally:
873            server_proc.terminate()
874            server_proc.wait()
875
876    def run_client(self, client, **kwargs):
877        '''Run a client that handles SIGINT.'''
878        previous = signal.signal(signal.SIGINT, signal.SIG_IGN)
879        try:
880            self.check_call(client, **kwargs)
881        finally:
882            signal.signal(signal.SIGINT, previous)
883
884    def _log_cmd(self, cmd: list[str]):
885        escaped = ' '.join(shlex.quote(s) for s in cmd)
886        if not self.dry_run:
887            self.logger.debug(escaped)
888        else:
889            self.logger.info(escaped)
890
891    def call(self, cmd: list[str], **kwargs) -> int:
892        '''Subclass subprocess.call() wrapper.
893
894        Subclasses should use this method to run command in a
895        subprocess and get its return code, rather than
896        using subprocess directly, to keep accurate debug logs.
897        '''
898        self._log_cmd(cmd)
899        if self.dry_run:
900            return 0
901        return subprocess.call(cmd, **kwargs)
902
903    def check_call(self, cmd: list[str], **kwargs):
904        '''Subclass subprocess.check_call() wrapper.
905
906        Subclasses should use this method to run command in a
907        subprocess and check that it executed correctly, rather than
908        using subprocess directly, to keep accurate debug logs.
909        '''
910        self._log_cmd(cmd)
911        if self.dry_run:
912            return
913        subprocess.check_call(cmd, **kwargs)
914
915    def check_output(self, cmd: list[str], **kwargs) -> bytes:
916        '''Subclass subprocess.check_output() wrapper.
917
918        Subclasses should use this method to run command in a
919        subprocess and check that it executed correctly, rather than
920        using subprocess directly, to keep accurate debug logs.
921        '''
922        self._log_cmd(cmd)
923        if self.dry_run:
924            return b''
925        return subprocess.check_output(cmd, **kwargs)
926
927    def popen_ignore_int(self, cmd: list[str], **kwargs) -> subprocess.Popen:
928        '''Spawn a child command, ensuring it ignores SIGINT.
929
930        The returned subprocess.Popen object must be manually terminated.'''
931        cflags = 0
932        preexec = None
933        system = platform.system()
934
935        if system == 'Windows':
936            # We can't type check this line on Unix operating systems:
937            # mypy thinks the subprocess module has no such attribute.
938            cflags |= subprocess.CREATE_NEW_PROCESS_GROUP  # type: ignore
939        elif system in {'Linux', 'Darwin'}:
940            # We can't type check this on Windows for the same reason.
941            preexec = os.setsid # type: ignore
942
943        self._log_cmd(cmd)
944        if self.dry_run:
945            return _DebugDummyPopen()  # type: ignore
946
947        return subprocess.Popen(cmd, creationflags=cflags, preexec_fn=preexec, **kwargs)
948
949    def ensure_output(self, output_type: str) -> None:
950        '''Ensure self.cfg has a particular output artifact.
951
952        For example, ensure_output('bin') ensures that self.cfg.bin_file
953        refers to an existing file. Errors out if it's missing or undefined.
954
955        :param output_type: string naming the output type
956        '''
957        output_file = getattr(self.cfg, f'{output_type}_file', None)
958
959        if output_file is None:
960            err = f'{output_type} file location is unknown.'
961        elif not os.path.isfile(output_file):
962            err = f'{output_file} does not exist.'
963        else:
964            return
965
966        if output_type in ('elf', 'hex', 'bin', 'uf2'):
967            err += f' Try enabling CONFIG_BUILD_OUTPUT_{output_type.upper()}.'
968
969        # RuntimeError avoids a stack trace saved in run_common.
970        raise RuntimeError(err)
971
972    def run_telnet_client(self, host: str, port: int, active_sock=None) -> None:
973        '''
974        Run a telnet client for user interaction.
975        '''
976        # If the caller passed in an active socket, use that
977        if active_sock is not None:
978            sock = active_sock
979        elif shutil.which('nc') is not None:
980            # If a `nc` command is available, run it, as it will provide the
981            # best support for CONFIG_SHELL_VT100_COMMANDS etc.
982            client_cmd = ['nc', host, str(port)]
983            # Note: netcat (nc) does not handle sigint, so cannot use run_client()
984            self.check_call(client_cmd)
985            return
986        else:
987            # Start a new socket connection
988            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
989            sock.connect((host, port))
990
991        # Otherwise, use a pure python implementation. This will work well for logging,
992        # but input is line based only.
993        sel = selectors.DefaultSelector()
994        sel.register(sys.stdin, selectors.EVENT_READ)
995        sel.register(sock, selectors.EVENT_READ)
996        while True:
997            events = sel.select()
998            for key, _ in events:
999                if key.fileobj == sys.stdin:
1000                    text = sys.stdin.readline()
1001                    if text:
1002                        sock.send(text.encode())
1003
1004                elif key.fileobj == sock:
1005                    resp = sock.recv(2048)
1006                    if resp:
1007                        print(resp.decode(), end='')
1008