1#! /usr/bin/env python3
2
3# Copyright (c) 2017 Linaro Limited.
4# Copyright (c) 2017 Open Source Foundries Limited.
5#
6# SPDX-License-Identifier: Apache-2.0
7
8"""Zephyr binary runner core interfaces
9
10This provides the core ZephyrBinaryRunner class meant for public use,
11as well as some other helpers for concrete runner classes.
12"""
13
14import abc
15import argparse
16import errno
17import logging
18import os
19import platform
20import re
21import selectors
22import shlex
23import shutil
24import signal
25import socket
26import subprocess
27import sys
28from dataclasses import dataclass, field
29from enum import Enum
30from functools import partial
31from inspect import isabstract
32from typing import NamedTuple, NoReturn
33
34try:
35    from elftools.elf.elffile import ELFFile
36    ELFTOOLS_MISSING = False
37except ImportError:
38    ELFTOOLS_MISSING = True
39
40
41# Turn on to enable just logging the commands that would be run (at
42# info rather than debug level), without actually running them. This
43# can break runners that are expecting output or if one command
44# depends on another, so it's just for debugging.
45_DRY_RUN = False
46
47_logger = logging.getLogger('runners')
48
49# FIXME: I assume this code belongs somewhere else, but i couldn't figure out
50# a good location for it, so i put it here for now
51# We could potentially search for RTT blocks in hex or bin files as well,
52# but since the magic string is "SEGGER RTT", i thought it might be better
53# to avoid, at the risk of false positives.
54def find_rtt_block(elf_file: str) -> int | None:
55    if ELFTOOLS_MISSING:
56        raise RuntimeError('the Python dependency elftools was missing; '
57                           'see the getting started guide for details on '
58                           'how to fix')
59
60    with open(elf_file, 'rb') as f:
61        elffile = ELFFile(f)
62        for sect in elffile.iter_sections('SHT_SYMTAB'):
63            symbols = sect.get_symbol_by_name('_SEGGER_RTT')
64            if symbols is None:
65                continue
66            for s in symbols:
67                return s.entry.get('st_value')
68    return None
69
70
71class _DebugDummyPopen:
72
73    def terminate(self):
74        pass
75
76    def wait(self):
77        pass
78
79
80MAX_PORT = 49151
81
82
83class NetworkPortHelper:
84    '''Helper class for dealing with local IP network ports.'''
85
86    def get_unused_ports(self, starting_from):
87        '''Find unused network ports, starting at given values.
88
89        starting_from is an iterable of ports the caller would like to use.
90
91        The return value is an iterable of ports, in the same order, using
92        the given values if they were unused, or the next sequentially
93        available unused port otherwise.
94
95        Ports may be bound between this call's check and actual usage, so
96        callers still need to handle errors involving returned ports.'''
97        start = list(starting_from)
98        used = self._used_now()
99        ret = []
100
101        for desired in start:
102            port = desired
103            while port in used:
104                port += 1
105                if port > MAX_PORT:
106                    msg = "ports above {} are in use"
107                    raise ValueError(msg.format(desired))
108            used.add(port)
109            ret.append(port)
110
111        return ret
112
113    def _used_now(self):
114        handlers = {
115            'Windows': self._used_now_windows,
116            'Linux': self._used_now_linux,
117            'Darwin': self._used_now_darwin,
118        }
119        handler = handlers[platform.system()]
120        return handler()
121
122    def _used_now_windows(self):
123        cmd = ['netstat', '-a', '-n', '-p', 'tcp']
124        return self._parser_windows(cmd)
125
126    def _used_now_linux(self):
127        cmd = ['ss', '-a', '-n', '-t']
128        return self._parser_linux(cmd)
129
130    def _used_now_darwin(self):
131        cmd = ['netstat', '-a', '-n', '-p', 'tcp']
132        return self._parser_darwin(cmd)
133
134    @staticmethod
135    def _parser_windows(cmd):
136        out = subprocess.check_output(cmd).split(b'\r\n')
137        used_bytes = [x.split()[1].rsplit(b':', 1)[1] for x in out
138                      if x.startswith(b'  TCP')]
139        return {int(b) for b in used_bytes}
140
141    @staticmethod
142    def _parser_linux(cmd):
143        out = subprocess.check_output(cmd).splitlines()[1:]
144        used_bytes = [s.split()[3].rsplit(b':', 1)[1] for s in out]
145        return {int(b) for b in used_bytes}
146
147    @staticmethod
148    def _parser_darwin(cmd):
149        out = subprocess.check_output(cmd).split(b'\n')
150        used_bytes = [x.split()[3].rsplit(b':', 1)[1] for x in out
151                      if x.startswith(b'tcp')]
152        return {int(b) for b in used_bytes}
153
154
155class BuildConfiguration:
156    '''This helper class provides access to build-time configuration.
157
158    Configuration options can be read as if the object were a dict,
159    either object['CONFIG_FOO'] or object.get('CONFIG_FOO').
160
161    Kconfig configuration values are available (parsed from .config).'''
162
163    config_prefix = 'CONFIG'
164
165    def __init__(self, build_dir: str):
166        self.build_dir = build_dir
167        self.options: dict[str, str | int] = {}
168        self.path = os.path.join(self.build_dir, 'zephyr', '.config')
169        self._parse()
170
171    def __contains__(self, item):
172        return item in self.options
173
174    def __getitem__(self, item):
175        return self.options[item]
176
177    def get(self, option, *args):
178        return self.options.get(option, *args)
179
180    def getboolean(self, option):
181        '''If a boolean option is explicitly set to y or n,
182        returns its value. Otherwise, falls back to False.
183        '''
184        return self.options.get(option, False)
185
186    def _parse(self):
187        filename = self.path
188
189        opt_value = re.compile(f'^(?P<option>{self.config_prefix}_[A-Za-z0-9_]+)=(?P<value>.*)$')
190        not_set = re.compile(f'^# (?P<option>{self.config_prefix}_[A-Za-z0-9_]+) is not set$')
191
192        with open(filename) as f:
193            for line in f:
194                match = opt_value.match(line)
195                if match:
196                    value = match.group('value').rstrip()
197                    if value.startswith('"') and value.endswith('"'):
198                        # A string literal should have the quotes stripped,
199                        # but otherwise be left as is.
200                        value = value[1:-1]
201                    elif value == 'y':
202                        # The character 'y' is a boolean option
203                        # that is set to True.
204                        value = True
205                    else:
206                        # Neither a string nor 'y', so try to parse it
207                        # as an integer.
208                        try:
209                            base = 16 if value.startswith('0x') else 10
210                            self.options[match.group('option')] = int(value, base=base)
211                            continue
212                        except ValueError:
213                            pass
214
215                    self.options[match.group('option')] = value
216                    continue
217
218                match = not_set.match(line)
219                if match:
220                    # '# CONFIG_FOO is not set' means a boolean option is false.
221                    self.options[match.group('option')] = False
222
223class SysbuildConfiguration(BuildConfiguration):
224    '''This helper class provides access to sysbuild-time configuration.
225
226    Configuration options can be read as if the object were a dict,
227    either object['SB_CONFIG_FOO'] or object.get('SB_CONFIG_FOO').
228
229    Kconfig configuration values are available (parsed from .config).'''
230
231    config_prefix = 'SB_CONFIG'
232
233    def _parse(self):
234        # If the build does not use sysbuild, skip parsing the file.
235        if not os.path.exists(self.path):
236            return
237        super()._parse()
238
239class MissingProgram(FileNotFoundError):
240    '''FileNotFoundError subclass for missing program dependencies.
241
242    No significant changes from the parent FileNotFoundError; this is
243    useful for explicitly signaling that the file in question is a
244    program that some class requires to proceed.
245
246    The filename attribute contains the missing program.'''
247
248    def __init__(self, program):
249        super().__init__(errno.ENOENT, os.strerror(errno.ENOENT), program)
250
251
252_RUNNERCAPS_COMMANDS = {'flash', 'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'}
253
254@dataclass
255class RunnerCaps:
256    '''This class represents a runner class's capabilities.
257
258    Each capability is represented as an attribute with the same
259    name. Flag attributes are True or False.
260
261    Available capabilities:
262
263    - commands: set of supported commands; default is {'flash',
264      'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'}.
265
266    - dev_id: whether the runner supports device identifiers, in the form of an
267      -i, --dev-id option. This is useful when the user has multiple debuggers
268      connected to a single computer, in order to select which one will be used
269      with the command provided.
270
271    - mult_dev_ids: whether the runner supports multiple device identifiers
272      for a single operation, allowing for bulk flashing of devices.
273
274    - flash_addr: whether the runner supports flashing to an
275      arbitrary address. Default is False. If true, the runner
276      must honor the --dt-flash option.
277
278    - erase: whether the runner supports an --erase option, which
279      does a mass-erase of the entire addressable flash on the target
280      before flashing. On multi-core SoCs, this may only erase portions of
281      flash specific the actual target core. (This option can be useful for
282      things like clearing out old settings values or other subsystem state
283      that may affect the behavior of the zephyr image. It is also sometimes
284      needed by SoCs which have flash-like areas that can't be sector
285      erased by the underlying tool before flashing; UICR on nRF SoCs
286      is one example.)
287
288    - reset: whether the runner supports a --reset option, which
289      resets the device after a flash operation is complete.
290
291    - extload: whether the runner supports a --extload option, which
292      must be given one time and is passed on to the underlying tool
293      that the runner wraps.
294
295    - tool_opt: whether the runner supports a --tool-opt (-O) option, which
296      can be given multiple times and is passed on to the underlying tool
297      that the runner wraps.
298
299    - file: whether the runner supports a --file option, which specifies
300      exactly the file that should be used to flash, overriding any default
301      discovered in the build directory.
302
303    - hide_load_files: whether the elf/hex/bin file arguments should be hidden.
304
305    - rtt: whether the runner supports SEGGER RTT. This adds a --rtt-address
306      option.
307    '''
308
309    commands: set[str] = field(default_factory=lambda: set(_RUNNERCAPS_COMMANDS))
310    dev_id: bool = False
311    mult_dev_ids: bool = False
312    flash_addr: bool = False
313    erase: bool = False
314    reset: bool = False
315    extload: bool = False
316    tool_opt: bool = False
317    file: bool = False
318    hide_load_files: bool = False
319    rtt: bool = False  # This capability exists separately from the rtt command
320                       # to allow other commands to use the rtt address
321
322    def __post_init__(self):
323        if self.mult_dev_ids and not self.dev_id:
324            raise RuntimeError('dev_id must be set along mult_dev_ids')
325        if not self.commands.issubset(_RUNNERCAPS_COMMANDS):
326            raise ValueError(f'{self.commands=} contains invalid command')
327
328
329def _missing_cap(cls: type['ZephyrBinaryRunner'], option: str) -> NoReturn:
330    # Helper function that's called when an option was given on the
331    # command line that corresponds to a missing capability in the
332    # runner class cls.
333
334    raise ValueError(f"{cls.name()} doesn't support {option} option")
335
336
337class FileType(Enum):
338    OTHER = 0
339    HEX = 1
340    BIN = 2
341    ELF = 3
342
343
344class RunnerConfig(NamedTuple):
345    '''Runner execution-time configuration.
346
347    This is a common object shared by all runners. Individual runners
348    can register specific configuration options using their
349    do_add_parser() hooks.
350    '''
351    build_dir: str                  # application build directory
352    board_dir: str                  # board definition directory
353    elf_file: str | None         # zephyr.elf path, or None
354    exe_file: str | None         # zephyr.exe path, or None
355    hex_file: str | None         # zephyr.hex path, or None
356    bin_file: str | None         # zephyr.bin path, or None
357    uf2_file: str | None         # zephyr.uf2 path, or None
358    file: str | None             # binary file path (provided by the user), or None
359    file_type: FileType | None = FileType.OTHER  # binary file type
360    gdb: str | None = None       # path to a usable gdb
361    openocd: str | None = None   # path to a usable openocd
362    openocd_search: list[str] = []  # add these paths to the openocd search path
363    rtt_address: int | None = None # address of the rtt control block
364
365
366_YN_CHOICES = ['Y', 'y', 'N', 'n', 'yes', 'no', 'YES', 'NO']
367
368
369class _DTFlashAction(argparse.Action):
370
371    def __call__(self, parser, namespace, values, option_string=None):
372        if values.lower().startswith('y'):
373            namespace.dt_flash = True
374        else:
375            namespace.dt_flash = False
376
377
378class _ToggleAction(argparse.Action):
379
380    def __call__(self, parser, args, ignored, option):
381        setattr(args, self.dest, not option.startswith('--no-'))
382
383class DeprecatedAction(argparse.Action):
384
385    def __call__(self, parser, namespace, values, option_string=None):
386        _logger.warning(f'Argument {self.option_strings[0]} is deprecated' +
387                        (f' for your runner {self._cls.name()}'  if self._cls is not None else '') +
388                        f', use {self._replacement} instead.')
389        setattr(namespace, self.dest, values)
390
391def depr_action(*args, cls=None, replacement=None, **kwargs):
392    action = DeprecatedAction(*args, **kwargs)
393    action._cls = cls
394    action._replacement = replacement
395    return action
396
397class ZephyrBinaryRunner(abc.ABC):
398    '''Abstract superclass for binary runners (flashers, debuggers).
399
400    **Note**: this class's API has changed relatively rarely since it
401    as added, but it is not considered a stable Zephyr API, and may change
402    without notice.
403
404    With some exceptions, boards supported by Zephyr must provide
405    generic means to be flashed (have a Zephyr firmware binary
406    permanently installed on the device for running) and debugged
407    (have a breakpoint debugger and program loader on a host
408    workstation attached to a running target).
409
410    This is supported by four top-level commands managed by the
411    Zephyr build system:
412
413    - 'flash': flash a previously configured binary to the board,
414      start execution on the target, then return.
415
416    - 'debug': connect to the board via a debugging protocol, program
417      the flash, then drop the user into a debugger interface with
418      symbol tables loaded from the current binary, and block until it
419      exits.
420
421    - 'debugserver': connect via a board-specific debugging protocol,
422      then reset and halt the target. Ensure the user is now able to
423      connect to a debug server with symbol tables loaded from the
424      binary.
425
426    - 'attach': connect to the board via a debugging protocol, then drop
427      the user into a debugger interface with symbol tables loaded from
428      the current binary, and block until it exits. Unlike 'debug', this
429      command does not program the flash.
430
431    This class provides an API for these commands. Every subclass is
432    called a 'runner' for short. Each runner has a name (like
433    'pyocd'), and declares commands it can handle (like
434    'flash'). Boards (like 'nrf52dk/nrf52832') declare which runner(s)
435    are compatible with them to the Zephyr build system, along with
436    information on how to configure the runner to work with the board.
437
438    The build system will then place enough information in the build
439    directory to create and use runners with this class's create()
440    method, which provides a command line argument parsing API. You
441    can also create runners by instantiating subclasses directly.
442
443    In order to define your own runner, you need to:
444
445    1. Define a ZephyrBinaryRunner subclass, and implement its
446       abstract methods. You may need to override capabilities().
447
448    2. Make sure the Python module defining your runner class is
449       imported, e.g. by editing this package's __init__.py (otherwise,
450       get_runners() won't work).
451
452    3. Give your runner's name to the Zephyr build system in your
453       board's board.cmake.
454
455    Additional advice:
456
457    - If you need to import any non-standard-library modules, make sure
458      to catch ImportError and defer complaints about it to a RuntimeError
459      if one is missing. This avoids affecting users that don't require your
460      runner, while still making it clear what went wrong to users that do
461      require it that don't have the necessary modules installed.
462
463    - If you need to ask the user something (e.g. using input()), do it
464      in your create() classmethod, not do_run(). That ensures your
465      __init__() really has everything it needs to call do_run(), and also
466      avoids calling input() when not instantiating within a command line
467      application.
468
469    - Use self.logger to log messages using the standard library's
470      logging API; your logger is named "runner.<your-runner-name()>"
471
472    For command-line invocation from the Zephyr build system, runners
473    define their own argparse-based interface through the common
474    add_parser() (and runner-specific do_add_parser() it delegates
475    to), and provide a way to create instances of themselves from
476    a RunnerConfig and parsed runner-specific arguments via create().
477
478    Runners use a variety of host tools and configuration values, the
479    user interface to which is abstracted by this class. Each runner
480    subclass should take any values it needs to execute one of these
481    commands in its constructor.  The actual command execution is
482    handled in the run() method.'''
483
484    def __init__(self, cfg: RunnerConfig):
485        '''Initialize core runner state.'''
486
487        self.cfg = cfg
488        '''RunnerConfig for this instance.'''
489
490        self.logger = logging.getLogger(f'runners.{self.name()}')
491        '''logging.Logger for this instance.'''
492
493    @staticmethod
494    def get_runners() -> list[type['ZephyrBinaryRunner']]:
495        '''Get a list of all currently defined runner classes.'''
496        def inheritors(klass):
497            subclasses = set()
498            work = [klass]
499            while work:
500                parent = work.pop()
501                for child in parent.__subclasses__():
502                    if child not in subclasses:
503                        if not isabstract(child):
504                            subclasses.add(child)
505                        work.append(child)
506            return subclasses
507
508        return inheritors(ZephyrBinaryRunner)
509
510    @classmethod
511    @abc.abstractmethod
512    def name(cls) -> str:
513        '''Return this runner's user-visible name.
514
515        When choosing a name, pick something short and lowercase,
516        based on the name of the tool (like openocd, jlink, etc.) or
517        the target architecture/board (like xtensa etc.).'''
518
519    @classmethod
520    def capabilities(cls) -> RunnerCaps:
521        '''Returns a RunnerCaps representing this runner's capabilities.
522
523        This implementation returns the default capabilities.
524
525        Subclasses should override appropriately if needed.'''
526        return RunnerCaps()
527
528    @classmethod
529    def add_parser(cls, parser):
530        '''Adds a sub-command parser for this runner.
531
532        The given object, parser, is a sub-command parser from the
533        argparse module. For more details, refer to the documentation
534        for argparse.ArgumentParser.add_subparsers().
535
536        The lone common optional argument is:
537
538        * --dt-flash (if the runner capabilities includes flash_addr)
539
540        Runner-specific options are added through the do_add_parser()
541        hook.'''
542        # Unfortunately, the parser argument's type is not documented
543        # in typeshed, so we can't type annotate much here.
544
545        # Common options that depend on runner capabilities. If a
546        # capability is not supported, the option string or strings
547        # are added anyway, to prevent an individual runner class from
548        # using them to mean something else.
549        caps = cls.capabilities()
550
551        if caps.dev_id:
552            action = 'append' if caps.mult_dev_ids else 'store'
553            parser.add_argument('-i', '--dev-id',
554                                action=action,
555                                dest='dev_id',
556                                help=cls.dev_id_help())
557        else:
558            parser.add_argument('-i', '--dev-id', help=argparse.SUPPRESS)
559
560        if caps.flash_addr:
561            parser.add_argument('--dt-flash', default='n', choices=_YN_CHOICES,
562                                action=_DTFlashAction,
563                                help='''If 'yes', try to use flash address
564                                information from devicetree when flash
565                                addresses are unknown (e.g. when flashing a .bin)''')
566        else:
567            parser.add_argument('--dt-flash', help=argparse.SUPPRESS)
568
569        if caps.file:
570            parser.add_argument('-f', '--file',
571                                dest='file',
572                                help="path to binary file")
573            parser.add_argument('-t', '--file-type',
574                                dest='file_type',
575                                help="type of binary file")
576        else:
577            parser.add_argument('-f', '--file', help=argparse.SUPPRESS)
578            parser.add_argument('-t', '--file-type', help=argparse.SUPPRESS)
579
580        if caps.hide_load_files:
581            parser.add_argument('--elf-file', help=argparse.SUPPRESS)
582            parser.add_argument('--hex-file', help=argparse.SUPPRESS)
583            parser.add_argument('--bin-file', help=argparse.SUPPRESS)
584        else:
585            parser.add_argument('--elf-file',
586                                metavar='FILE',
587                                action=(partial(depr_action, cls=cls,
588                                                replacement='-f/--file') if caps.file else None),
589                                help='path to zephyr.elf'
590                                if not caps.file else 'Deprecated, use -f/--file instead.')
591            parser.add_argument('--hex-file',
592                                metavar='FILE',
593                                action=(partial(depr_action, cls=cls,
594                                                replacement='-f/--file') if caps.file else None),
595                                help='path to zephyr.hex'
596                                if not caps.file else 'Deprecated, use -f/--file instead.')
597            parser.add_argument('--bin-file',
598                                metavar='FILE',
599                                action=(partial(depr_action, cls=cls,
600                                                replacement='-f/--file') if caps.file else None),
601                                help='path to zephyr.bin'
602                                if not caps.file else 'Deprecated, use -f/--file instead.')
603
604        parser.add_argument('--erase', '--no-erase', nargs=0,
605                            action=_ToggleAction,
606                            help=("mass erase flash before loading, or don't. "
607                                  "Default action depends on each specific runner."
608                                  if caps.erase else argparse.SUPPRESS))
609
610        parser.add_argument('--reset', '--no-reset', nargs=0,
611                            action=_ToggleAction,
612                            help=("reset device after flashing, or don't. "
613                                  "Default action depends on each specific runner."
614                                  if caps.reset else argparse.SUPPRESS))
615
616        parser.add_argument('--extload', dest='extload',
617                            help=(cls.extload_help() if caps.extload
618                                  else argparse.SUPPRESS))
619
620        parser.add_argument('-O', '--tool-opt', dest='tool_opt',
621                            default=[], action='append',
622                            help=(cls.tool_opt_help() if caps.tool_opt
623                                  else argparse.SUPPRESS))
624
625        if caps.rtt:
626            parser.add_argument('--rtt-address', dest='rtt_address',
627                                type=lambda x: int(x, 0),
628                                help="""address of RTT control block. If not supplied,
629                                it will be autodetected if possible""")
630        else:
631            parser.add_argument('--rtt-address', help=argparse.SUPPRESS)
632
633        # Runner-specific options.
634        cls.do_add_parser(parser)
635
636    @classmethod
637    @abc.abstractmethod
638    def do_add_parser(cls, parser):
639        '''Hook for adding runner-specific options.'''
640
641    @classmethod  # noqa: B027
642    def args_from_previous_runner(cls, previous_runner,
643                                  args: argparse.Namespace):
644        '''Update arguments from a previously created runner.
645
646        This is intended for propagating relevant user responses
647        between multiple runs of the same runner, for example a
648        JTAG serial number.'''
649
650    @classmethod
651    def create(cls, cfg: RunnerConfig,
652               args: argparse.Namespace) -> 'ZephyrBinaryRunner':
653        '''Create an instance from command-line arguments.
654
655        - ``cfg``: runner configuration (pass to superclass __init__)
656        - ``args``: arguments parsed from execution environment, as
657          specified by ``add_parser()``.'''
658        caps = cls.capabilities()
659        if args.dev_id and not caps.dev_id:
660            _missing_cap(cls, '--dev-id')
661        if args.dt_flash and not caps.flash_addr:
662            _missing_cap(cls, '--dt-flash')
663        if args.erase and not caps.erase:
664            _missing_cap(cls, '--erase')
665        if args.reset and not caps.reset:
666            _missing_cap(cls, '--reset')
667        if args.extload and not caps.extload:
668            _missing_cap(cls, '--extload')
669        if args.tool_opt and not caps.tool_opt:
670            _missing_cap(cls, '--tool-opt')
671        if args.file and not caps.file:
672            _missing_cap(cls, '--file')
673        if args.file_type and not args.file:
674            raise ValueError("--file-type requires --file")
675        if args.file_type and not caps.file:
676            _missing_cap(cls, '--file-type')
677        if args.rtt_address and not caps.rtt:
678            _missing_cap(cls, '--rtt-address')
679
680        ret = cls.do_create(cfg, args)
681        if args.erase:
682            ret.logger.info('mass erase requested')
683        if args.reset:
684            ret.logger.info('reset after flashing requested')
685        return ret
686
687    @classmethod
688    @abc.abstractmethod
689    def do_create(cls, cfg: RunnerConfig,
690                  args: argparse.Namespace) -> 'ZephyrBinaryRunner':
691        '''Hook for instance creation from command line arguments.'''
692
693    @staticmethod
694    def get_flash_address(args: argparse.Namespace,
695                          build_conf: BuildConfiguration,
696                          default: int = 0x0) -> int:
697        '''Helper method for extracting a flash address.
698
699        If args.dt_flash is true, returns the address obtained from
700        ZephyrBinaryRunner.flash_address_from_build_conf(build_conf).
701
702        Otherwise (when args.dt_flash is False), the default value is
703        returned.'''
704        if args.dt_flash:
705            return ZephyrBinaryRunner.flash_address_from_build_conf(build_conf)
706        else:
707            return default
708
709    @staticmethod
710    def flash_address_from_build_conf(build_conf: BuildConfiguration):
711        '''If CONFIG_HAS_FLASH_LOAD_OFFSET is n in build_conf,
712        return the CONFIG_FLASH_BASE_ADDRESS value. Otherwise, return
713        CONFIG_FLASH_BASE_ADDRESS + CONFIG_FLASH_LOAD_OFFSET.
714        '''
715        if build_conf.getboolean('CONFIG_HAS_FLASH_LOAD_OFFSET'):
716            return (build_conf['CONFIG_FLASH_BASE_ADDRESS'] +
717                    build_conf['CONFIG_FLASH_LOAD_OFFSET'])
718        else:
719            return build_conf['CONFIG_FLASH_BASE_ADDRESS']
720
721    def run(self, command: str, **kwargs):
722        '''Runs command ('flash', 'debug', 'debugserver', 'attach').
723
724        This is the main entry point to this runner.'''
725        caps = self.capabilities()
726        if command not in caps.commands:
727            raise ValueError(f'runner {self.name()} does not implement command {command}')
728        self.do_run(command, **kwargs)
729
730    @abc.abstractmethod
731    def do_run(self, command: str, **kwargs):
732        '''Concrete runner; run() delegates to this. Implement in subclasses.
733
734        In case of an unsupported command, raise a ValueError.'''
735
736    @property
737    def build_conf(self) -> BuildConfiguration:
738        '''Get a BuildConfiguration for the build directory.'''
739        if not hasattr(self, '_build_conf'):
740            self._build_conf = BuildConfiguration(self.cfg.build_dir)
741        return self._build_conf
742
743    @property
744    def sysbuild_conf(self) -> SysbuildConfiguration:
745        '''Get a SysbuildConfiguration for the sysbuild directory.'''
746        if not hasattr(self, '_sysbuild_conf'):
747            self._sysbuild_conf = SysbuildConfiguration(os.path.dirname(self.cfg.build_dir))
748        return self._sysbuild_conf
749
750    @property
751    def thread_info_enabled(self) -> bool:
752        '''Returns True if self.build_conf has
753        CONFIG_DEBUG_THREAD_INFO enabled.
754        '''
755        return self.build_conf.getboolean('CONFIG_DEBUG_THREAD_INFO')
756
757    @classmethod
758    def dev_id_help(cls) -> str:
759        ''' Get the ArgParse help text for the --dev-id option.'''
760        help = '''Device identifier. Use it to select
761                  which debugger, device, node or instance to
762                  target when multiple ones are available or
763                  connected.'''
764        addendum = '''\nThis option can be present multiple times.''' if \
765                   cls.capabilities().mult_dev_ids else ''
766        return help + addendum
767
768    @classmethod
769    def extload_help(cls) -> str:
770        ''' Get the ArgParse help text for the --extload option.'''
771        return '''External loader to be used by stm32cubeprogrammer
772                  to program the targeted external memory.
773                  The runner requires the external loader (*.stldr) filename.
774                  This external loader (*.stldr) must be located within
775                  STM32CubeProgrammer/bin/ExternalLoader directory.'''
776
777    @classmethod
778    def tool_opt_help(cls) -> str:
779        ''' Get the ArgParse help text for the --tool-opt option.'''
780        return '''Option to pass on to the underlying tool used
781                  by this runner. This can be given multiple times;
782                  the resulting arguments will be given to the tool
783                  in the order they appear on the command line.'''
784
785    @staticmethod
786    def require(program: str, path: str | None = None) -> str:
787        '''Require that a program is installed before proceeding.
788
789        :param program: name of the program that is required,
790                        or path to a program binary.
791        :param path:    PATH where to search for the program binary.
792                        By default check on the system PATH.
793
794        If ``program`` is an absolute path to an existing program
795        binary, this call succeeds. Otherwise, try to find the program
796        by name on the system PATH or in the given PATH, if provided.
797
798        If the program can be found, its path is returned.
799        Otherwise, raises MissingProgram.'''
800        ret = shutil.which(program, path=path)
801        if ret is None:
802            raise MissingProgram(program)
803        return ret
804
805    def get_rtt_address(self) -> int | None:
806        '''Helper method for extracting a the RTT control block address.
807
808        If args.rtt_address was supplied, returns that.
809
810        Otherwise, attempt to locate an rtt block in the elf file.
811        If this is not found, None is returned'''
812        if self.cfg.rtt_address is not None:
813            return self.cfg.rtt_address
814        elif self.cfg.elf_file is not None:
815            return find_rtt_block(self.cfg.elf_file)
816        return None
817
818    def run_server_and_client(self, server, client, **kwargs):
819        '''Run a server that ignores SIGINT, and a client that handles it.
820
821        This routine portably:
822
823        - creates a Popen object for the ``server`` command which ignores
824          SIGINT
825        - runs ``client`` in a subprocess while temporarily ignoring SIGINT
826        - cleans up the server after the client exits.
827        - the keyword arguments, if any, will be passed down to both server and
828          client subprocess calls
829
830        It's useful to e.g. open a GDB server and client.'''
831        server_proc = self.popen_ignore_int(server, **kwargs)
832        try:
833            self.run_client(client, **kwargs)
834        finally:
835            server_proc.terminate()
836            server_proc.wait()
837
838    def run_client(self, client, **kwargs):
839        '''Run a client that handles SIGINT.'''
840        previous = signal.signal(signal.SIGINT, signal.SIG_IGN)
841        try:
842            self.check_call(client, **kwargs)
843        finally:
844            signal.signal(signal.SIGINT, previous)
845
846    def _log_cmd(self, cmd: list[str]):
847        escaped = ' '.join(shlex.quote(s) for s in cmd)
848        if not _DRY_RUN:
849            self.logger.debug(escaped)
850        else:
851            self.logger.info(escaped)
852
853    def call(self, cmd: list[str], **kwargs) -> int:
854        '''Subclass subprocess.call() wrapper.
855
856        Subclasses should use this method to run command in a
857        subprocess and get its return code, rather than
858        using subprocess directly, to keep accurate debug logs.
859        '''
860        self._log_cmd(cmd)
861        if _DRY_RUN:
862            return 0
863        return subprocess.call(cmd, **kwargs)
864
865    def check_call(self, cmd: list[str], **kwargs):
866        '''Subclass subprocess.check_call() wrapper.
867
868        Subclasses should use this method to run command in a
869        subprocess and check that it executed correctly, rather than
870        using subprocess directly, to keep accurate debug logs.
871        '''
872        self._log_cmd(cmd)
873        if _DRY_RUN:
874            return
875        subprocess.check_call(cmd, **kwargs)
876
877    def check_output(self, cmd: list[str], **kwargs) -> bytes:
878        '''Subclass subprocess.check_output() wrapper.
879
880        Subclasses should use this method to run command in a
881        subprocess and check that it executed correctly, rather than
882        using subprocess directly, to keep accurate debug logs.
883        '''
884        self._log_cmd(cmd)
885        if _DRY_RUN:
886            return b''
887        return subprocess.check_output(cmd, **kwargs)
888
889    def popen_ignore_int(self, cmd: list[str], **kwargs) -> subprocess.Popen:
890        '''Spawn a child command, ensuring it ignores SIGINT.
891
892        The returned subprocess.Popen object must be manually terminated.'''
893        cflags = 0
894        preexec = None
895        system = platform.system()
896
897        if system == 'Windows':
898            # We can't type check this line on Unix operating systems:
899            # mypy thinks the subprocess module has no such attribute.
900            cflags |= subprocess.CREATE_NEW_PROCESS_GROUP  # type: ignore
901        elif system in {'Linux', 'Darwin'}:
902            # We can't type check this on Windows for the same reason.
903            preexec = os.setsid # type: ignore
904
905        self._log_cmd(cmd)
906        if _DRY_RUN:
907            return _DebugDummyPopen()  # type: ignore
908
909        return subprocess.Popen(cmd, creationflags=cflags, preexec_fn=preexec, **kwargs)
910
911    def ensure_output(self, output_type: str) -> None:
912        '''Ensure self.cfg has a particular output artifact.
913
914        For example, ensure_output('bin') ensures that self.cfg.bin_file
915        refers to an existing file. Errors out if it's missing or undefined.
916
917        :param output_type: string naming the output type
918        '''
919        output_file = getattr(self.cfg, f'{output_type}_file', None)
920
921        if output_file is None:
922            err = f'{output_type} file location is unknown.'
923        elif not os.path.isfile(output_file):
924            err = f'{output_file} does not exist.'
925        else:
926            return
927
928        if output_type in ('elf', 'hex', 'bin', 'uf2'):
929            err += f' Try enabling CONFIG_BUILD_OUTPUT_{output_type.upper()}.'
930
931        # RuntimeError avoids a stack trace saved in run_common.
932        raise RuntimeError(err)
933
934    def run_telnet_client(self, host: str, port: int, active_sock=None) -> None:
935        '''
936        Run a telnet client for user interaction.
937        '''
938        # If the caller passed in an active socket, use that
939        if active_sock is not None:
940            sock = active_sock
941        elif shutil.which('nc') is not None:
942            # If a `nc` command is available, run it, as it will provide the
943            # best support for CONFIG_SHELL_VT100_COMMANDS etc.
944            client_cmd = ['nc', host, str(port)]
945            # Note: netcat (nc) does not handle sigint, so cannot use run_client()
946            self.check_call(client_cmd)
947            return
948        else:
949            # Start a new socket connection
950            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
951            sock.connect((host, port))
952
953        # Otherwise, use a pure python implementation. This will work well for logging,
954        # but input is line based only.
955        sel = selectors.DefaultSelector()
956        sel.register(sys.stdin, selectors.EVENT_READ)
957        sel.register(sock, selectors.EVENT_READ)
958        while True:
959            events = sel.select()
960            for key, _ in events:
961                if key.fileobj == sys.stdin:
962                    text = sys.stdin.readline()
963                    if text:
964                        sock.send(text.encode())
965
966                elif key.fileobj == sock:
967                    resp = sock.recv(2048)
968                    if resp:
969                        print(resp.decode())
970