1#! /usr/bin/env python3
2
3# Copyright (c) 2017 Linaro Limited.
4# Copyright (c) 2017 Open Source Foundries Limited.
5#
6# SPDX-License-Identifier: Apache-2.0
7
8"""Zephyr binary runner core interfaces
9
10This provides the core ZephyrBinaryRunner class meant for public use,
11as well as some other helpers for concrete runner classes.
12"""
13
14import abc
15import argparse
16import errno
17import logging
18import os
19import platform
20import re
21import selectors
22import shlex
23import shutil
24import signal
25import socket
26import subprocess
27import sys
28from dataclasses import dataclass, field
29from enum import Enum
30from functools import partial
31from inspect import isabstract
32from typing import NamedTuple, NoReturn
33
34try:
35    from elftools.elf.elffile import ELFFile
36    ELFTOOLS_MISSING = False
37except ImportError:
38    ELFTOOLS_MISSING = True
39
40
41# Turn on to enable just logging the commands that would be run (at
42# info rather than debug level), without actually running them. This
43# can break runners that are expecting output or if one command
44# depends on another, so it's just for debugging.
45_DRY_RUN = False
46
47_logger = logging.getLogger('runners')
48
49# FIXME: I assume this code belongs somewhere else, but i couldn't figure out
50# a good location for it, so i put it here for now
51# We could potentially search for RTT blocks in hex or bin files as well,
52# but since the magic string is "SEGGER RTT", i thought it might be better
53# to avoid, at the risk of false positives.
54def find_rtt_block(elf_file: str) -> int | None:
55    if ELFTOOLS_MISSING:
56        raise RuntimeError('the Python dependency elftools was missing; '
57                           'see the getting started guide for details on '
58                           'how to fix')
59
60    with open(elf_file, 'rb') as f:
61        elffile = ELFFile(f)
62        for sect in elffile.iter_sections('SHT_SYMTAB'):
63            symbols = sect.get_symbol_by_name('_SEGGER_RTT')
64            if symbols is None:
65                continue
66            for s in symbols:
67                return s.entry.get('st_value')
68    return None
69
70
71class _DebugDummyPopen:
72
73    def terminate(self):
74        pass
75
76    def wait(self):
77        pass
78
79
80MAX_PORT = 49151
81
82
83class NetworkPortHelper:
84    '''Helper class for dealing with local IP network ports.'''
85
86    def get_unused_ports(self, starting_from):
87        '''Find unused network ports, starting at given values.
88
89        starting_from is an iterable of ports the caller would like to use.
90
91        The return value is an iterable of ports, in the same order, using
92        the given values if they were unused, or the next sequentially
93        available unused port otherwise.
94
95        Ports may be bound between this call's check and actual usage, so
96        callers still need to handle errors involving returned ports.'''
97        start = list(starting_from)
98        used = self._used_now()
99        ret = []
100
101        for desired in start:
102            port = desired
103            while port in used:
104                port += 1
105                if port > MAX_PORT:
106                    msg = "ports above {} are in use"
107                    raise ValueError(msg.format(desired))
108            used.add(port)
109            ret.append(port)
110
111        return ret
112
113    def _used_now(self):
114        handlers = {
115            'Windows': self._used_now_windows,
116            'Linux': self._used_now_linux,
117            'Darwin': self._used_now_darwin,
118        }
119        handler = handlers[platform.system()]
120        return handler()
121
122    def _used_now_windows(self):
123        cmd = ['netstat', '-a', '-n', '-p', 'tcp']
124        return self._parser_windows(cmd)
125
126    def _used_now_linux(self):
127        cmd = ['ss', '-a', '-n', '-t']
128        return self._parser_linux(cmd)
129
130    def _used_now_darwin(self):
131        cmd = ['netstat', '-a', '-n', '-p', 'tcp']
132        return self._parser_darwin(cmd)
133
134    @staticmethod
135    def _parser_windows(cmd):
136        out = subprocess.check_output(cmd).split(b'\r\n')
137        used_bytes = [x.split()[1].rsplit(b':', 1)[1] for x in out
138                      if x.startswith(b'  TCP')]
139        return {int(b) for b in used_bytes}
140
141    @staticmethod
142    def _parser_linux(cmd):
143        out = subprocess.check_output(cmd).splitlines()[1:]
144        used_bytes = [s.split()[3].rsplit(b':', 1)[1] for s in out]
145        return {int(b) for b in used_bytes}
146
147    @staticmethod
148    def _parser_darwin(cmd):
149        out = subprocess.check_output(cmd).split(b'\n')
150        used_bytes = [x.split()[3].rsplit(b':', 1)[1] for x in out
151                      if x.startswith(b'tcp')]
152        return {int(b) for b in used_bytes}
153
154
155class BuildConfiguration:
156    '''This helper class provides access to build-time configuration.
157
158    Configuration options can be read as if the object were a dict,
159    either object['CONFIG_FOO'] or object.get('CONFIG_FOO').
160
161    Kconfig configuration values are available (parsed from .config).'''
162
163    config_prefix = 'CONFIG'
164
165    def __init__(self, build_dir: str):
166        self.build_dir = build_dir
167        self.options: dict[str, str | int] = {}
168        self.path = os.path.join(self.build_dir, 'zephyr', '.config')
169        self._parse()
170
171    def __contains__(self, item):
172        return item in self.options
173
174    def __getitem__(self, item):
175        return self.options[item]
176
177    def get(self, option, *args):
178        return self.options.get(option, *args)
179
180    def getboolean(self, option):
181        '''If a boolean option is explicitly set to y or n,
182        returns its value. Otherwise, falls back to False.
183        '''
184        return self.options.get(option, False)
185
186    def _parse(self):
187        filename = self.path
188
189        opt_value = re.compile(f'^(?P<option>{self.config_prefix}_[A-Za-z0-9_]+)=(?P<value>.*)$')
190        not_set = re.compile(f'^# (?P<option>{self.config_prefix}_[A-Za-z0-9_]+) is not set$')
191
192        with open(filename) as f:
193            for line in f:
194                match = opt_value.match(line)
195                if match:
196                    value = match.group('value').rstrip()
197                    if value.startswith('"') and value.endswith('"'):
198                        # A string literal should have the quotes stripped,
199                        # but otherwise be left as is.
200                        value = value[1:-1]
201                    elif value == 'y':
202                        # The character 'y' is a boolean option
203                        # that is set to True.
204                        value = True
205                    else:
206                        # Neither a string nor 'y', so try to parse it
207                        # as an integer.
208                        try:
209                            base = 16 if value.startswith('0x') else 10
210                            self.options[match.group('option')] = int(value, base=base)
211                            continue
212                        except ValueError:
213                            pass
214
215                    self.options[match.group('option')] = value
216                    continue
217
218                match = not_set.match(line)
219                if match:
220                    # '# CONFIG_FOO is not set' means a boolean option is false.
221                    self.options[match.group('option')] = False
222
223class SysbuildConfiguration(BuildConfiguration):
224    '''This helper class provides access to sysbuild-time configuration.
225
226    Configuration options can be read as if the object were a dict,
227    either object['SB_CONFIG_FOO'] or object.get('SB_CONFIG_FOO').
228
229    Kconfig configuration values are available (parsed from .config).'''
230
231    config_prefix = 'SB_CONFIG'
232
233    def _parse(self):
234        # If the build does not use sysbuild, skip parsing the file.
235        if not os.path.exists(self.path):
236            return
237        super()._parse()
238
239class MissingProgram(FileNotFoundError):
240    '''FileNotFoundError subclass for missing program dependencies.
241
242    No significant changes from the parent FileNotFoundError; this is
243    useful for explicitly signaling that the file in question is a
244    program that some class requires to proceed.
245
246    The filename attribute contains the missing program.'''
247
248    def __init__(self, program):
249        super().__init__(errno.ENOENT, os.strerror(errno.ENOENT), program)
250
251
252_RUNNERCAPS_COMMANDS = {'flash', 'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'}
253
254@dataclass
255class RunnerCaps:
256    '''This class represents a runner class's capabilities.
257
258    Each capability is represented as an attribute with the same
259    name. Flag attributes are True or False.
260
261    Available capabilities:
262
263    - commands: set of supported commands; default is {'flash',
264      'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'}.
265
266    - dev_id: whether the runner supports device identifiers, in the form of an
267      -i, --dev-id option. This is useful when the user has multiple debuggers
268      connected to a single computer, in order to select which one will be used
269      with the command provided.
270
271    - flash_addr: whether the runner supports flashing to an
272      arbitrary address. Default is False. If true, the runner
273      must honor the --dt-flash option.
274
275    - erase: whether the runner supports an --erase option, which
276      does a mass-erase of the entire addressable flash on the target
277      before flashing. On multi-core SoCs, this may only erase portions of
278      flash specific the actual target core. (This option can be useful for
279      things like clearing out old settings values or other subsystem state
280      that may affect the behavior of the zephyr image. It is also sometimes
281      needed by SoCs which have flash-like areas that can't be sector
282      erased by the underlying tool before flashing; UICR on nRF SoCs
283      is one example.)
284
285    - reset: whether the runner supports a --reset option, which
286      resets the device after a flash operation is complete.
287
288    - extload: whether the runner supports a --extload option, which
289      must be given one time and is passed on to the underlying tool
290      that the runner wraps.
291
292    - tool_opt: whether the runner supports a --tool-opt (-O) option, which
293      can be given multiple times and is passed on to the underlying tool
294      that the runner wraps.
295
296    - file: whether the runner supports a --file option, which specifies
297      exactly the file that should be used to flash, overriding any default
298      discovered in the build directory.
299
300    - hide_load_files: whether the elf/hex/bin file arguments should be hidden.
301
302    - rtt: whether the runner supports SEGGER RTT. This adds a --rtt-address
303      option.
304    '''
305
306    commands: set[str] = field(default_factory=lambda: set(_RUNNERCAPS_COMMANDS))
307    dev_id: bool = False
308    flash_addr: bool = False
309    erase: bool = False
310    reset: bool = False
311    extload: bool = False
312    tool_opt: bool = False
313    file: bool = False
314    hide_load_files: bool = False
315    rtt: bool = False  # This capability exists separately from the rtt command
316                       # to allow other commands to use the rtt address
317
318    def __post_init__(self):
319        if not self.commands.issubset(_RUNNERCAPS_COMMANDS):
320            raise ValueError(f'{self.commands=} contains invalid command')
321
322
323def _missing_cap(cls: type['ZephyrBinaryRunner'], option: str) -> NoReturn:
324    # Helper function that's called when an option was given on the
325    # command line that corresponds to a missing capability in the
326    # runner class cls.
327
328    raise ValueError(f"{cls.name()} doesn't support {option} option")
329
330
331class FileType(Enum):
332    OTHER = 0
333    HEX = 1
334    BIN = 2
335    ELF = 3
336
337
338class RunnerConfig(NamedTuple):
339    '''Runner execution-time configuration.
340
341    This is a common object shared by all runners. Individual runners
342    can register specific configuration options using their
343    do_add_parser() hooks.
344    '''
345    build_dir: str                  # application build directory
346    board_dir: str                  # board definition directory
347    elf_file: str | None         # zephyr.elf path, or None
348    exe_file: str | None         # zephyr.exe path, or None
349    hex_file: str | None         # zephyr.hex path, or None
350    bin_file: str | None         # zephyr.bin path, or None
351    uf2_file: str | None         # zephyr.uf2 path, or None
352    file: str | None             # binary file path (provided by the user), or None
353    file_type: FileType | None = FileType.OTHER  # binary file type
354    gdb: str | None = None       # path to a usable gdb
355    openocd: str | None = None   # path to a usable openocd
356    openocd_search: list[str] = []  # add these paths to the openocd search path
357    rtt_address: int | None = None # address of the rtt control block
358
359
360_YN_CHOICES = ['Y', 'y', 'N', 'n', 'yes', 'no', 'YES', 'NO']
361
362
363class _DTFlashAction(argparse.Action):
364
365    def __call__(self, parser, namespace, values, option_string=None):
366        if values.lower().startswith('y'):
367            namespace.dt_flash = True
368        else:
369            namespace.dt_flash = False
370
371
372class _ToggleAction(argparse.Action):
373
374    def __call__(self, parser, args, ignored, option):
375        setattr(args, self.dest, not option.startswith('--no-'))
376
377class DeprecatedAction(argparse.Action):
378
379    def __call__(self, parser, namespace, values, option_string=None):
380        _logger.warning(f'Argument {self.option_strings[0]} is deprecated' +
381                        (f' for your runner {self._cls.name()}'  if self._cls is not None else '') +
382                        f', use {self._replacement} instead.')
383        setattr(namespace, self.dest, values)
384
385def depr_action(*args, cls=None, replacement=None, **kwargs):
386    action = DeprecatedAction(*args, **kwargs)
387    action._cls = cls
388    action._replacement = replacement
389    return action
390
391class ZephyrBinaryRunner(abc.ABC):
392    '''Abstract superclass for binary runners (flashers, debuggers).
393
394    **Note**: this class's API has changed relatively rarely since it
395    as added, but it is not considered a stable Zephyr API, and may change
396    without notice.
397
398    With some exceptions, boards supported by Zephyr must provide
399    generic means to be flashed (have a Zephyr firmware binary
400    permanently installed on the device for running) and debugged
401    (have a breakpoint debugger and program loader on a host
402    workstation attached to a running target).
403
404    This is supported by four top-level commands managed by the
405    Zephyr build system:
406
407    - 'flash': flash a previously configured binary to the board,
408      start execution on the target, then return.
409
410    - 'debug': connect to the board via a debugging protocol, program
411      the flash, then drop the user into a debugger interface with
412      symbol tables loaded from the current binary, and block until it
413      exits.
414
415    - 'debugserver': connect via a board-specific debugging protocol,
416      then reset and halt the target. Ensure the user is now able to
417      connect to a debug server with symbol tables loaded from the
418      binary.
419
420    - 'attach': connect to the board via a debugging protocol, then drop
421      the user into a debugger interface with symbol tables loaded from
422      the current binary, and block until it exits. Unlike 'debug', this
423      command does not program the flash.
424
425    This class provides an API for these commands. Every subclass is
426    called a 'runner' for short. Each runner has a name (like
427    'pyocd'), and declares commands it can handle (like
428    'flash'). Boards (like 'nrf52dk/nrf52832') declare which runner(s)
429    are compatible with them to the Zephyr build system, along with
430    information on how to configure the runner to work with the board.
431
432    The build system will then place enough information in the build
433    directory to create and use runners with this class's create()
434    method, which provides a command line argument parsing API. You
435    can also create runners by instantiating subclasses directly.
436
437    In order to define your own runner, you need to:
438
439    1. Define a ZephyrBinaryRunner subclass, and implement its
440       abstract methods. You may need to override capabilities().
441
442    2. Make sure the Python module defining your runner class is
443       imported, e.g. by editing this package's __init__.py (otherwise,
444       get_runners() won't work).
445
446    3. Give your runner's name to the Zephyr build system in your
447       board's board.cmake.
448
449    Additional advice:
450
451    - If you need to import any non-standard-library modules, make sure
452      to catch ImportError and defer complaints about it to a RuntimeError
453      if one is missing. This avoids affecting users that don't require your
454      runner, while still making it clear what went wrong to users that do
455      require it that don't have the necessary modules installed.
456
457    - If you need to ask the user something (e.g. using input()), do it
458      in your create() classmethod, not do_run(). That ensures your
459      __init__() really has everything it needs to call do_run(), and also
460      avoids calling input() when not instantiating within a command line
461      application.
462
463    - Use self.logger to log messages using the standard library's
464      logging API; your logger is named "runner.<your-runner-name()>"
465
466    For command-line invocation from the Zephyr build system, runners
467    define their own argparse-based interface through the common
468    add_parser() (and runner-specific do_add_parser() it delegates
469    to), and provide a way to create instances of themselves from
470    a RunnerConfig and parsed runner-specific arguments via create().
471
472    Runners use a variety of host tools and configuration values, the
473    user interface to which is abstracted by this class. Each runner
474    subclass should take any values it needs to execute one of these
475    commands in its constructor.  The actual command execution is
476    handled in the run() method.'''
477
478    def __init__(self, cfg: RunnerConfig):
479        '''Initialize core runner state.'''
480
481        self.cfg = cfg
482        '''RunnerConfig for this instance.'''
483
484        self.logger = logging.getLogger(f'runners.{self.name()}')
485        '''logging.Logger for this instance.'''
486
487    @staticmethod
488    def get_runners() -> list[type['ZephyrBinaryRunner']]:
489        '''Get a list of all currently defined runner classes.'''
490        def inheritors(klass):
491            subclasses = set()
492            work = [klass]
493            while work:
494                parent = work.pop()
495                for child in parent.__subclasses__():
496                    if child not in subclasses:
497                        if not isabstract(child):
498                            subclasses.add(child)
499                        work.append(child)
500            return subclasses
501
502        return inheritors(ZephyrBinaryRunner)
503
504    @classmethod
505    @abc.abstractmethod
506    def name(cls) -> str:
507        '''Return this runner's user-visible name.
508
509        When choosing a name, pick something short and lowercase,
510        based on the name of the tool (like openocd, jlink, etc.) or
511        the target architecture/board (like xtensa etc.).'''
512
513    @classmethod
514    def capabilities(cls) -> RunnerCaps:
515        '''Returns a RunnerCaps representing this runner's capabilities.
516
517        This implementation returns the default capabilities.
518
519        Subclasses should override appropriately if needed.'''
520        return RunnerCaps()
521
522    @classmethod
523    def add_parser(cls, parser):
524        '''Adds a sub-command parser for this runner.
525
526        The given object, parser, is a sub-command parser from the
527        argparse module. For more details, refer to the documentation
528        for argparse.ArgumentParser.add_subparsers().
529
530        The lone common optional argument is:
531
532        * --dt-flash (if the runner capabilities includes flash_addr)
533
534        Runner-specific options are added through the do_add_parser()
535        hook.'''
536        # Unfortunately, the parser argument's type is not documented
537        # in typeshed, so we can't type annotate much here.
538
539        # Common options that depend on runner capabilities. If a
540        # capability is not supported, the option string or strings
541        # are added anyway, to prevent an individual runner class from
542        # using them to mean something else.
543        caps = cls.capabilities()
544
545        if caps.dev_id:
546            parser.add_argument('-i', '--dev-id',
547                                dest='dev_id',
548                                help=cls.dev_id_help())
549        else:
550            parser.add_argument('-i', '--dev-id', help=argparse.SUPPRESS)
551
552        if caps.flash_addr:
553            parser.add_argument('--dt-flash', default='n', choices=_YN_CHOICES,
554                                action=_DTFlashAction,
555                                help='''If 'yes', try to use flash address
556                                information from devicetree when flash
557                                addresses are unknown (e.g. when flashing a .bin)''')
558        else:
559            parser.add_argument('--dt-flash', help=argparse.SUPPRESS)
560
561        if caps.file:
562            parser.add_argument('-f', '--file',
563                                dest='file',
564                                help="path to binary file")
565            parser.add_argument('-t', '--file-type',
566                                dest='file_type',
567                                help="type of binary file")
568        else:
569            parser.add_argument('-f', '--file', help=argparse.SUPPRESS)
570            parser.add_argument('-t', '--file-type', help=argparse.SUPPRESS)
571
572        if caps.hide_load_files:
573            parser.add_argument('--elf-file', help=argparse.SUPPRESS)
574            parser.add_argument('--hex-file', help=argparse.SUPPRESS)
575            parser.add_argument('--bin-file', help=argparse.SUPPRESS)
576        else:
577            parser.add_argument('--elf-file',
578                                metavar='FILE',
579                                action=(partial(depr_action, cls=cls,
580                                                replacement='-f/--file') if caps.file else None),
581                                help='path to zephyr.elf'
582                                if not caps.file else 'Deprecated, use -f/--file instead.')
583            parser.add_argument('--hex-file',
584                                metavar='FILE',
585                                action=(partial(depr_action, cls=cls,
586                                                replacement='-f/--file') if caps.file else None),
587                                help='path to zephyr.hex'
588                                if not caps.file else 'Deprecated, use -f/--file instead.')
589            parser.add_argument('--bin-file',
590                                metavar='FILE',
591                                action=(partial(depr_action, cls=cls,
592                                                replacement='-f/--file') if caps.file else None),
593                                help='path to zephyr.bin'
594                                if not caps.file else 'Deprecated, use -f/--file instead.')
595
596        parser.add_argument('--erase', '--no-erase', nargs=0,
597                            action=_ToggleAction,
598                            help=("mass erase flash before loading, or don't. "
599                                  "Default action depends on each specific runner."
600                                  if caps.erase else argparse.SUPPRESS))
601
602        parser.add_argument('--reset', '--no-reset', nargs=0,
603                            action=_ToggleAction,
604                            help=("reset device after flashing, or don't. "
605                                  "Default action depends on each specific runner."
606                                  if caps.reset else argparse.SUPPRESS))
607
608        parser.add_argument('--extload', dest='extload',
609                            help=(cls.extload_help() if caps.extload
610                                  else argparse.SUPPRESS))
611
612        parser.add_argument('-O', '--tool-opt', dest='tool_opt',
613                            default=[], action='append',
614                            help=(cls.tool_opt_help() if caps.tool_opt
615                                  else argparse.SUPPRESS))
616
617        if caps.rtt:
618            parser.add_argument('--rtt-address', dest='rtt_address',
619                                type=lambda x: int(x, 0),
620                                help="""address of RTT control block. If not supplied,
621                                it will be autodetected if possible""")
622        else:
623            parser.add_argument('--rtt-address', help=argparse.SUPPRESS)
624
625        # Runner-specific options.
626        cls.do_add_parser(parser)
627
628    @classmethod
629    @abc.abstractmethod
630    def do_add_parser(cls, parser):
631        '''Hook for adding runner-specific options.'''
632
633    @classmethod  # noqa: B027
634    def args_from_previous_runner(cls, previous_runner,
635                                  args: argparse.Namespace):
636        '''Update arguments from a previously created runner.
637
638        This is intended for propagating relevant user responses
639        between multiple runs of the same runner, for example a
640        JTAG serial number.'''
641
642    @classmethod
643    def create(cls, cfg: RunnerConfig,
644               args: argparse.Namespace) -> 'ZephyrBinaryRunner':
645        '''Create an instance from command-line arguments.
646
647        - ``cfg``: runner configuration (pass to superclass __init__)
648        - ``args``: arguments parsed from execution environment, as
649          specified by ``add_parser()``.'''
650        caps = cls.capabilities()
651        if args.dev_id and not caps.dev_id:
652            _missing_cap(cls, '--dev-id')
653        if args.dt_flash and not caps.flash_addr:
654            _missing_cap(cls, '--dt-flash')
655        if args.erase and not caps.erase:
656            _missing_cap(cls, '--erase')
657        if args.reset and not caps.reset:
658            _missing_cap(cls, '--reset')
659        if args.extload and not caps.extload:
660            _missing_cap(cls, '--extload')
661        if args.tool_opt and not caps.tool_opt:
662            _missing_cap(cls, '--tool-opt')
663        if args.file and not caps.file:
664            _missing_cap(cls, '--file')
665        if args.file_type and not args.file:
666            raise ValueError("--file-type requires --file")
667        if args.file_type and not caps.file:
668            _missing_cap(cls, '--file-type')
669        if args.rtt_address and not caps.rtt:
670            _missing_cap(cls, '--rtt-address')
671
672        ret = cls.do_create(cfg, args)
673        if args.erase:
674            ret.logger.info('mass erase requested')
675        if args.reset:
676            ret.logger.info('reset after flashing requested')
677        return ret
678
679    @classmethod
680    @abc.abstractmethod
681    def do_create(cls, cfg: RunnerConfig,
682                  args: argparse.Namespace) -> 'ZephyrBinaryRunner':
683        '''Hook for instance creation from command line arguments.'''
684
685    @staticmethod
686    def get_flash_address(args: argparse.Namespace,
687                          build_conf: BuildConfiguration,
688                          default: int = 0x0) -> int:
689        '''Helper method for extracting a flash address.
690
691        If args.dt_flash is true, returns the address obtained from
692        ZephyrBinaryRunner.flash_address_from_build_conf(build_conf).
693
694        Otherwise (when args.dt_flash is False), the default value is
695        returned.'''
696        if args.dt_flash:
697            return ZephyrBinaryRunner.flash_address_from_build_conf(build_conf)
698        else:
699            return default
700
701    @staticmethod
702    def flash_address_from_build_conf(build_conf: BuildConfiguration):
703        '''If CONFIG_HAS_FLASH_LOAD_OFFSET is n in build_conf,
704        return the CONFIG_FLASH_BASE_ADDRESS value. Otherwise, return
705        CONFIG_FLASH_BASE_ADDRESS + CONFIG_FLASH_LOAD_OFFSET.
706        '''
707        if build_conf.getboolean('CONFIG_HAS_FLASH_LOAD_OFFSET'):
708            return (build_conf['CONFIG_FLASH_BASE_ADDRESS'] +
709                    build_conf['CONFIG_FLASH_LOAD_OFFSET'])
710        else:
711            return build_conf['CONFIG_FLASH_BASE_ADDRESS']
712
713    def run(self, command: str, **kwargs):
714        '''Runs command ('flash', 'debug', 'debugserver', 'attach').
715
716        This is the main entry point to this runner.'''
717        caps = self.capabilities()
718        if command not in caps.commands:
719            raise ValueError(f'runner {self.name()} does not implement command {command}')
720        self.do_run(command, **kwargs)
721
722    @abc.abstractmethod
723    def do_run(self, command: str, **kwargs):
724        '''Concrete runner; run() delegates to this. Implement in subclasses.
725
726        In case of an unsupported command, raise a ValueError.'''
727
728    @property
729    def build_conf(self) -> BuildConfiguration:
730        '''Get a BuildConfiguration for the build directory.'''
731        if not hasattr(self, '_build_conf'):
732            self._build_conf = BuildConfiguration(self.cfg.build_dir)
733        return self._build_conf
734
735    @property
736    def sysbuild_conf(self) -> SysbuildConfiguration:
737        '''Get a SysbuildConfiguration for the sysbuild directory.'''
738        if not hasattr(self, '_sysbuild_conf'):
739            self._sysbuild_conf = SysbuildConfiguration(os.path.dirname(self.cfg.build_dir))
740        return self._sysbuild_conf
741
742    @property
743    def thread_info_enabled(self) -> bool:
744        '''Returns True if self.build_conf has
745        CONFIG_DEBUG_THREAD_INFO enabled.
746        '''
747        return self.build_conf.getboolean('CONFIG_DEBUG_THREAD_INFO')
748
749    @classmethod
750    def dev_id_help(cls) -> str:
751        ''' Get the ArgParse help text for the --dev-id option.'''
752        return '''Device identifier. Use it to select
753                  which debugger, device, node or instance to
754                  target when multiple ones are available or
755                  connected.'''
756
757    @classmethod
758    def extload_help(cls) -> str:
759        ''' Get the ArgParse help text for the --extload option.'''
760        return '''External loader to be used by stm32cubeprogrammer
761                  to program the targeted external memory.
762                  The runner requires the external loader (*.stldr) filename.
763                  This external loader (*.stldr) must be located within
764                  STM32CubeProgrammer/bin/ExternalLoader directory.'''
765
766    @classmethod
767    def tool_opt_help(cls) -> str:
768        ''' Get the ArgParse help text for the --tool-opt option.'''
769        return '''Option to pass on to the underlying tool used
770                  by this runner. This can be given multiple times;
771                  the resulting arguments will be given to the tool
772                  in the order they appear on the command line.'''
773
774    @staticmethod
775    def require(program: str, path: str | None = None) -> str:
776        '''Require that a program is installed before proceeding.
777
778        :param program: name of the program that is required,
779                        or path to a program binary.
780        :param path:    PATH where to search for the program binary.
781                        By default check on the system PATH.
782
783        If ``program`` is an absolute path to an existing program
784        binary, this call succeeds. Otherwise, try to find the program
785        by name on the system PATH or in the given PATH, if provided.
786
787        If the program can be found, its path is returned.
788        Otherwise, raises MissingProgram.'''
789        ret = shutil.which(program, path=path)
790        if ret is None:
791            raise MissingProgram(program)
792        return ret
793
794    def get_rtt_address(self) -> int | None:
795        '''Helper method for extracting a the RTT control block address.
796
797        If args.rtt_address was supplied, returns that.
798
799        Otherwise, attempt to locate an rtt block in the elf file.
800        If this is not found, None is returned'''
801        if self.cfg.rtt_address is not None:
802            return self.cfg.rtt_address
803        elif self.cfg.elf_file is not None:
804            return find_rtt_block(self.cfg.elf_file)
805        return None
806
807    def run_server_and_client(self, server, client, **kwargs):
808        '''Run a server that ignores SIGINT, and a client that handles it.
809
810        This routine portably:
811
812        - creates a Popen object for the ``server`` command which ignores
813          SIGINT
814        - runs ``client`` in a subprocess while temporarily ignoring SIGINT
815        - cleans up the server after the client exits.
816        - the keyword arguments, if any, will be passed down to both server and
817          client subprocess calls
818
819        It's useful to e.g. open a GDB server and client.'''
820        server_proc = self.popen_ignore_int(server, **kwargs)
821        try:
822            self.run_client(client, **kwargs)
823        finally:
824            server_proc.terminate()
825            server_proc.wait()
826
827    def run_client(self, client, **kwargs):
828        '''Run a client that handles SIGINT.'''
829        previous = signal.signal(signal.SIGINT, signal.SIG_IGN)
830        try:
831            self.check_call(client, **kwargs)
832        finally:
833            signal.signal(signal.SIGINT, previous)
834
835    def _log_cmd(self, cmd: list[str]):
836        escaped = ' '.join(shlex.quote(s) for s in cmd)
837        if not _DRY_RUN:
838            self.logger.debug(escaped)
839        else:
840            self.logger.info(escaped)
841
842    def call(self, cmd: list[str], **kwargs) -> int:
843        '''Subclass subprocess.call() wrapper.
844
845        Subclasses should use this method to run command in a
846        subprocess and get its return code, rather than
847        using subprocess directly, to keep accurate debug logs.
848        '''
849        self._log_cmd(cmd)
850        if _DRY_RUN:
851            return 0
852        return subprocess.call(cmd, **kwargs)
853
854    def check_call(self, cmd: list[str], **kwargs):
855        '''Subclass subprocess.check_call() wrapper.
856
857        Subclasses should use this method to run command in a
858        subprocess and check that it executed correctly, rather than
859        using subprocess directly, to keep accurate debug logs.
860        '''
861        self._log_cmd(cmd)
862        if _DRY_RUN:
863            return
864        subprocess.check_call(cmd, **kwargs)
865
866    def check_output(self, cmd: list[str], **kwargs) -> bytes:
867        '''Subclass subprocess.check_output() wrapper.
868
869        Subclasses should use this method to run command in a
870        subprocess and check that it executed correctly, rather than
871        using subprocess directly, to keep accurate debug logs.
872        '''
873        self._log_cmd(cmd)
874        if _DRY_RUN:
875            return b''
876        return subprocess.check_output(cmd, **kwargs)
877
878    def popen_ignore_int(self, cmd: list[str], **kwargs) -> subprocess.Popen:
879        '''Spawn a child command, ensuring it ignores SIGINT.
880
881        The returned subprocess.Popen object must be manually terminated.'''
882        cflags = 0
883        preexec = None
884        system = platform.system()
885
886        if system == 'Windows':
887            # We can't type check this line on Unix operating systems:
888            # mypy thinks the subprocess module has no such attribute.
889            cflags |= subprocess.CREATE_NEW_PROCESS_GROUP  # type: ignore
890        elif system in {'Linux', 'Darwin'}:
891            # We can't type check this on Windows for the same reason.
892            preexec = os.setsid # type: ignore
893
894        self._log_cmd(cmd)
895        if _DRY_RUN:
896            return _DebugDummyPopen()  # type: ignore
897
898        return subprocess.Popen(cmd, creationflags=cflags, preexec_fn=preexec, **kwargs)
899
900    def ensure_output(self, output_type: str) -> None:
901        '''Ensure self.cfg has a particular output artifact.
902
903        For example, ensure_output('bin') ensures that self.cfg.bin_file
904        refers to an existing file. Errors out if it's missing or undefined.
905
906        :param output_type: string naming the output type
907        '''
908        output_file = getattr(self.cfg, f'{output_type}_file', None)
909
910        if output_file is None:
911            err = f'{output_type} file location is unknown.'
912        elif not os.path.isfile(output_file):
913            err = f'{output_file} does not exist.'
914        else:
915            return
916
917        if output_type in ('elf', 'hex', 'bin', 'uf2'):
918            err += f' Try enabling CONFIG_BUILD_OUTPUT_{output_type.upper()}.'
919
920        # RuntimeError avoids a stack trace saved in run_common.
921        raise RuntimeError(err)
922
923    def run_telnet_client(self, host: str, port: int) -> None:
924        '''
925        Run a telnet client for user interaction.
926        '''
927        # If a `nc` command is available, run it, as it will provide the best support for
928        # CONFIG_SHELL_VT100_COMMANDS etc.
929        if shutil.which('nc') is not None:
930            client_cmd = ['nc', host, str(port)]
931            # Note: netcat (nc) does not handle sigint, so cannot use run_client()
932            self.check_call(client_cmd)
933            return
934
935        # Otherwise, use a pure python implementation. This will work well for logging,
936        # but input is line based only.
937        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
938        sock.connect((host, port))
939        sel = selectors.DefaultSelector()
940        sel.register(sys.stdin, selectors.EVENT_READ)
941        sel.register(sock, selectors.EVENT_READ)
942        while True:
943            events = sel.select()
944            for key, _ in events:
945                if key.fileobj == sys.stdin:
946                    text = sys.stdin.readline()
947                    if text:
948                        sock.send(text.encode())
949
950                elif key.fileobj == sock:
951                    resp = sock.recv(2048)
952                    if resp:
953                        print(resp.decode())
954