1#! /usr/bin/env python3 2 3# Copyright (c) 2017 Linaro Limited. 4# Copyright (c) 2017 Open Source Foundries Limited. 5# 6# SPDX-License-Identifier: Apache-2.0 7 8"""Zephyr binary runner core interfaces 9 10This provides the core ZephyrBinaryRunner class meant for public use, 11as well as some other helpers for concrete runner classes. 12""" 13 14import abc 15import argparse 16import errno 17import logging 18import os 19import platform 20import re 21import selectors 22import shlex 23import shutil 24import signal 25import socket 26import subprocess 27import sys 28from dataclasses import dataclass, field 29from enum import Enum 30from functools import partial 31from inspect import isabstract 32from typing import NamedTuple, NoReturn 33 34try: 35 from elftools.elf.elffile import ELFFile 36 ELFTOOLS_MISSING = False 37except ImportError: 38 ELFTOOLS_MISSING = True 39 40 41# Turn on to enable just logging the commands that would be run (at 42# info rather than debug level), without actually running them. This 43# can break runners that are expecting output or if one command 44# depends on another, so it's just for debugging. 45_DRY_RUN = False 46 47_logger = logging.getLogger('runners') 48 49# FIXME: I assume this code belongs somewhere else, but i couldn't figure out 50# a good location for it, so i put it here for now 51# We could potentially search for RTT blocks in hex or bin files as well, 52# but since the magic string is "SEGGER RTT", i thought it might be better 53# to avoid, at the risk of false positives. 54def find_rtt_block(elf_file: str) -> int | None: 55 if ELFTOOLS_MISSING: 56 raise RuntimeError('the Python dependency elftools was missing; ' 57 'see the getting started guide for details on ' 58 'how to fix') 59 60 with open(elf_file, 'rb') as f: 61 elffile = ELFFile(f) 62 for sect in elffile.iter_sections('SHT_SYMTAB'): 63 symbols = sect.get_symbol_by_name('_SEGGER_RTT') 64 if symbols is None: 65 continue 66 for s in symbols: 67 return s.entry.get('st_value') 68 return None 69 70 71class _DebugDummyPopen: 72 73 def terminate(self): 74 pass 75 76 def wait(self): 77 pass 78 79 80MAX_PORT = 49151 81 82 83class NetworkPortHelper: 84 '''Helper class for dealing with local IP network ports.''' 85 86 def get_unused_ports(self, starting_from): 87 '''Find unused network ports, starting at given values. 88 89 starting_from is an iterable of ports the caller would like to use. 90 91 The return value is an iterable of ports, in the same order, using 92 the given values if they were unused, or the next sequentially 93 available unused port otherwise. 94 95 Ports may be bound between this call's check and actual usage, so 96 callers still need to handle errors involving returned ports.''' 97 start = list(starting_from) 98 used = self._used_now() 99 ret = [] 100 101 for desired in start: 102 port = desired 103 while port in used: 104 port += 1 105 if port > MAX_PORT: 106 msg = "ports above {} are in use" 107 raise ValueError(msg.format(desired)) 108 used.add(port) 109 ret.append(port) 110 111 return ret 112 113 def _used_now(self): 114 handlers = { 115 'Windows': self._used_now_windows, 116 'Linux': self._used_now_linux, 117 'Darwin': self._used_now_darwin, 118 } 119 handler = handlers[platform.system()] 120 return handler() 121 122 def _used_now_windows(self): 123 cmd = ['netstat', '-a', '-n', '-p', 'tcp'] 124 return self._parser_windows(cmd) 125 126 def _used_now_linux(self): 127 cmd = ['ss', '-a', '-n', '-t'] 128 return self._parser_linux(cmd) 129 130 def _used_now_darwin(self): 131 cmd = ['netstat', '-a', '-n', '-p', 'tcp'] 132 return self._parser_darwin(cmd) 133 134 @staticmethod 135 def _parser_windows(cmd): 136 out = subprocess.check_output(cmd).split(b'\r\n') 137 used_bytes = [x.split()[1].rsplit(b':', 1)[1] for x in out 138 if x.startswith(b' TCP')] 139 return {int(b) for b in used_bytes} 140 141 @staticmethod 142 def _parser_linux(cmd): 143 out = subprocess.check_output(cmd).splitlines()[1:] 144 used_bytes = [s.split()[3].rsplit(b':', 1)[1] for s in out] 145 return {int(b) for b in used_bytes} 146 147 @staticmethod 148 def _parser_darwin(cmd): 149 out = subprocess.check_output(cmd).split(b'\n') 150 used_bytes = [x.split()[3].rsplit(b':', 1)[1] for x in out 151 if x.startswith(b'tcp')] 152 return {int(b) for b in used_bytes} 153 154 155class BuildConfiguration: 156 '''This helper class provides access to build-time configuration. 157 158 Configuration options can be read as if the object were a dict, 159 either object['CONFIG_FOO'] or object.get('CONFIG_FOO'). 160 161 Kconfig configuration values are available (parsed from .config).''' 162 163 config_prefix = 'CONFIG' 164 165 def __init__(self, build_dir: str): 166 self.build_dir = build_dir 167 self.options: dict[str, str | int] = {} 168 self.path = os.path.join(self.build_dir, 'zephyr', '.config') 169 self._parse() 170 171 def __contains__(self, item): 172 return item in self.options 173 174 def __getitem__(self, item): 175 return self.options[item] 176 177 def get(self, option, *args): 178 return self.options.get(option, *args) 179 180 def getboolean(self, option): 181 '''If a boolean option is explicitly set to y or n, 182 returns its value. Otherwise, falls back to False. 183 ''' 184 return self.options.get(option, False) 185 186 def _parse(self): 187 filename = self.path 188 189 opt_value = re.compile(f'^(?P<option>{self.config_prefix}_[A-Za-z0-9_]+)=(?P<value>.*)$') 190 not_set = re.compile(f'^# (?P<option>{self.config_prefix}_[A-Za-z0-9_]+) is not set$') 191 192 with open(filename) as f: 193 for line in f: 194 match = opt_value.match(line) 195 if match: 196 value = match.group('value').rstrip() 197 if value.startswith('"') and value.endswith('"'): 198 # A string literal should have the quotes stripped, 199 # but otherwise be left as is. 200 value = value[1:-1] 201 elif value == 'y': 202 # The character 'y' is a boolean option 203 # that is set to True. 204 value = True 205 else: 206 # Neither a string nor 'y', so try to parse it 207 # as an integer. 208 try: 209 base = 16 if value.startswith('0x') else 10 210 self.options[match.group('option')] = int(value, base=base) 211 continue 212 except ValueError: 213 pass 214 215 self.options[match.group('option')] = value 216 continue 217 218 match = not_set.match(line) 219 if match: 220 # '# CONFIG_FOO is not set' means a boolean option is false. 221 self.options[match.group('option')] = False 222 223class SysbuildConfiguration(BuildConfiguration): 224 '''This helper class provides access to sysbuild-time configuration. 225 226 Configuration options can be read as if the object were a dict, 227 either object['SB_CONFIG_FOO'] or object.get('SB_CONFIG_FOO'). 228 229 Kconfig configuration values are available (parsed from .config).''' 230 231 config_prefix = 'SB_CONFIG' 232 233 def _parse(self): 234 # If the build does not use sysbuild, skip parsing the file. 235 if not os.path.exists(self.path): 236 return 237 super()._parse() 238 239class MissingProgram(FileNotFoundError): 240 '''FileNotFoundError subclass for missing program dependencies. 241 242 No significant changes from the parent FileNotFoundError; this is 243 useful for explicitly signaling that the file in question is a 244 program that some class requires to proceed. 245 246 The filename attribute contains the missing program.''' 247 248 def __init__(self, program): 249 super().__init__(errno.ENOENT, os.strerror(errno.ENOENT), program) 250 251 252_RUNNERCAPS_COMMANDS = {'flash', 'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'} 253 254@dataclass 255class RunnerCaps: 256 '''This class represents a runner class's capabilities. 257 258 Each capability is represented as an attribute with the same 259 name. Flag attributes are True or False. 260 261 Available capabilities: 262 263 - commands: set of supported commands; default is {'flash', 264 'debug', 'debugserver', 'attach', 'simulate', 'robot', 'rtt'}. 265 266 - dev_id: whether the runner supports device identifiers, in the form of an 267 -i, --dev-id option. This is useful when the user has multiple debuggers 268 connected to a single computer, in order to select which one will be used 269 with the command provided. 270 271 - flash_addr: whether the runner supports flashing to an 272 arbitrary address. Default is False. If true, the runner 273 must honor the --dt-flash option. 274 275 - erase: whether the runner supports an --erase option, which 276 does a mass-erase of the entire addressable flash on the target 277 before flashing. On multi-core SoCs, this may only erase portions of 278 flash specific the actual target core. (This option can be useful for 279 things like clearing out old settings values or other subsystem state 280 that may affect the behavior of the zephyr image. It is also sometimes 281 needed by SoCs which have flash-like areas that can't be sector 282 erased by the underlying tool before flashing; UICR on nRF SoCs 283 is one example.) 284 285 - reset: whether the runner supports a --reset option, which 286 resets the device after a flash operation is complete. 287 288 - extload: whether the runner supports a --extload option, which 289 must be given one time and is passed on to the underlying tool 290 that the runner wraps. 291 292 - tool_opt: whether the runner supports a --tool-opt (-O) option, which 293 can be given multiple times and is passed on to the underlying tool 294 that the runner wraps. 295 296 - file: whether the runner supports a --file option, which specifies 297 exactly the file that should be used to flash, overriding any default 298 discovered in the build directory. 299 300 - hide_load_files: whether the elf/hex/bin file arguments should be hidden. 301 302 - rtt: whether the runner supports SEGGER RTT. This adds a --rtt-address 303 option. 304 ''' 305 306 commands: set[str] = field(default_factory=lambda: set(_RUNNERCAPS_COMMANDS)) 307 dev_id: bool = False 308 flash_addr: bool = False 309 erase: bool = False 310 reset: bool = False 311 extload: bool = False 312 tool_opt: bool = False 313 file: bool = False 314 hide_load_files: bool = False 315 rtt: bool = False # This capability exists separately from the rtt command 316 # to allow other commands to use the rtt address 317 318 def __post_init__(self): 319 if not self.commands.issubset(_RUNNERCAPS_COMMANDS): 320 raise ValueError(f'{self.commands=} contains invalid command') 321 322 323def _missing_cap(cls: type['ZephyrBinaryRunner'], option: str) -> NoReturn: 324 # Helper function that's called when an option was given on the 325 # command line that corresponds to a missing capability in the 326 # runner class cls. 327 328 raise ValueError(f"{cls.name()} doesn't support {option} option") 329 330 331class FileType(Enum): 332 OTHER = 0 333 HEX = 1 334 BIN = 2 335 ELF = 3 336 337 338class RunnerConfig(NamedTuple): 339 '''Runner execution-time configuration. 340 341 This is a common object shared by all runners. Individual runners 342 can register specific configuration options using their 343 do_add_parser() hooks. 344 ''' 345 build_dir: str # application build directory 346 board_dir: str # board definition directory 347 elf_file: str | None # zephyr.elf path, or None 348 exe_file: str | None # zephyr.exe path, or None 349 hex_file: str | None # zephyr.hex path, or None 350 bin_file: str | None # zephyr.bin path, or None 351 uf2_file: str | None # zephyr.uf2 path, or None 352 file: str | None # binary file path (provided by the user), or None 353 file_type: FileType | None = FileType.OTHER # binary file type 354 gdb: str | None = None # path to a usable gdb 355 openocd: str | None = None # path to a usable openocd 356 openocd_search: list[str] = [] # add these paths to the openocd search path 357 rtt_address: int | None = None # address of the rtt control block 358 359 360_YN_CHOICES = ['Y', 'y', 'N', 'n', 'yes', 'no', 'YES', 'NO'] 361 362 363class _DTFlashAction(argparse.Action): 364 365 def __call__(self, parser, namespace, values, option_string=None): 366 if values.lower().startswith('y'): 367 namespace.dt_flash = True 368 else: 369 namespace.dt_flash = False 370 371 372class _ToggleAction(argparse.Action): 373 374 def __call__(self, parser, args, ignored, option): 375 setattr(args, self.dest, not option.startswith('--no-')) 376 377class DeprecatedAction(argparse.Action): 378 379 def __call__(self, parser, namespace, values, option_string=None): 380 _logger.warning(f'Argument {self.option_strings[0]} is deprecated' + 381 (f' for your runner {self._cls.name()}' if self._cls is not None else '') + 382 f', use {self._replacement} instead.') 383 setattr(namespace, self.dest, values) 384 385def depr_action(*args, cls=None, replacement=None, **kwargs): 386 action = DeprecatedAction(*args, **kwargs) 387 action._cls = cls 388 action._replacement = replacement 389 return action 390 391class ZephyrBinaryRunner(abc.ABC): 392 '''Abstract superclass for binary runners (flashers, debuggers). 393 394 **Note**: this class's API has changed relatively rarely since it 395 as added, but it is not considered a stable Zephyr API, and may change 396 without notice. 397 398 With some exceptions, boards supported by Zephyr must provide 399 generic means to be flashed (have a Zephyr firmware binary 400 permanently installed on the device for running) and debugged 401 (have a breakpoint debugger and program loader on a host 402 workstation attached to a running target). 403 404 This is supported by four top-level commands managed by the 405 Zephyr build system: 406 407 - 'flash': flash a previously configured binary to the board, 408 start execution on the target, then return. 409 410 - 'debug': connect to the board via a debugging protocol, program 411 the flash, then drop the user into a debugger interface with 412 symbol tables loaded from the current binary, and block until it 413 exits. 414 415 - 'debugserver': connect via a board-specific debugging protocol, 416 then reset and halt the target. Ensure the user is now able to 417 connect to a debug server with symbol tables loaded from the 418 binary. 419 420 - 'attach': connect to the board via a debugging protocol, then drop 421 the user into a debugger interface with symbol tables loaded from 422 the current binary, and block until it exits. Unlike 'debug', this 423 command does not program the flash. 424 425 This class provides an API for these commands. Every subclass is 426 called a 'runner' for short. Each runner has a name (like 427 'pyocd'), and declares commands it can handle (like 428 'flash'). Boards (like 'nrf52dk/nrf52832') declare which runner(s) 429 are compatible with them to the Zephyr build system, along with 430 information on how to configure the runner to work with the board. 431 432 The build system will then place enough information in the build 433 directory to create and use runners with this class's create() 434 method, which provides a command line argument parsing API. You 435 can also create runners by instantiating subclasses directly. 436 437 In order to define your own runner, you need to: 438 439 1. Define a ZephyrBinaryRunner subclass, and implement its 440 abstract methods. You may need to override capabilities(). 441 442 2. Make sure the Python module defining your runner class is 443 imported, e.g. by editing this package's __init__.py (otherwise, 444 get_runners() won't work). 445 446 3. Give your runner's name to the Zephyr build system in your 447 board's board.cmake. 448 449 Additional advice: 450 451 - If you need to import any non-standard-library modules, make sure 452 to catch ImportError and defer complaints about it to a RuntimeError 453 if one is missing. This avoids affecting users that don't require your 454 runner, while still making it clear what went wrong to users that do 455 require it that don't have the necessary modules installed. 456 457 - If you need to ask the user something (e.g. using input()), do it 458 in your create() classmethod, not do_run(). That ensures your 459 __init__() really has everything it needs to call do_run(), and also 460 avoids calling input() when not instantiating within a command line 461 application. 462 463 - Use self.logger to log messages using the standard library's 464 logging API; your logger is named "runner.<your-runner-name()>" 465 466 For command-line invocation from the Zephyr build system, runners 467 define their own argparse-based interface through the common 468 add_parser() (and runner-specific do_add_parser() it delegates 469 to), and provide a way to create instances of themselves from 470 a RunnerConfig and parsed runner-specific arguments via create(). 471 472 Runners use a variety of host tools and configuration values, the 473 user interface to which is abstracted by this class. Each runner 474 subclass should take any values it needs to execute one of these 475 commands in its constructor. The actual command execution is 476 handled in the run() method.''' 477 478 def __init__(self, cfg: RunnerConfig): 479 '''Initialize core runner state.''' 480 481 self.cfg = cfg 482 '''RunnerConfig for this instance.''' 483 484 self.logger = logging.getLogger(f'runners.{self.name()}') 485 '''logging.Logger for this instance.''' 486 487 @staticmethod 488 def get_runners() -> list[type['ZephyrBinaryRunner']]: 489 '''Get a list of all currently defined runner classes.''' 490 def inheritors(klass): 491 subclasses = set() 492 work = [klass] 493 while work: 494 parent = work.pop() 495 for child in parent.__subclasses__(): 496 if child not in subclasses: 497 if not isabstract(child): 498 subclasses.add(child) 499 work.append(child) 500 return subclasses 501 502 return inheritors(ZephyrBinaryRunner) 503 504 @classmethod 505 @abc.abstractmethod 506 def name(cls) -> str: 507 '''Return this runner's user-visible name. 508 509 When choosing a name, pick something short and lowercase, 510 based on the name of the tool (like openocd, jlink, etc.) or 511 the target architecture/board (like xtensa etc.).''' 512 513 @classmethod 514 def capabilities(cls) -> RunnerCaps: 515 '''Returns a RunnerCaps representing this runner's capabilities. 516 517 This implementation returns the default capabilities. 518 519 Subclasses should override appropriately if needed.''' 520 return RunnerCaps() 521 522 @classmethod 523 def add_parser(cls, parser): 524 '''Adds a sub-command parser for this runner. 525 526 The given object, parser, is a sub-command parser from the 527 argparse module. For more details, refer to the documentation 528 for argparse.ArgumentParser.add_subparsers(). 529 530 The lone common optional argument is: 531 532 * --dt-flash (if the runner capabilities includes flash_addr) 533 534 Runner-specific options are added through the do_add_parser() 535 hook.''' 536 # Unfortunately, the parser argument's type is not documented 537 # in typeshed, so we can't type annotate much here. 538 539 # Common options that depend on runner capabilities. If a 540 # capability is not supported, the option string or strings 541 # are added anyway, to prevent an individual runner class from 542 # using them to mean something else. 543 caps = cls.capabilities() 544 545 if caps.dev_id: 546 parser.add_argument('-i', '--dev-id', 547 dest='dev_id', 548 help=cls.dev_id_help()) 549 else: 550 parser.add_argument('-i', '--dev-id', help=argparse.SUPPRESS) 551 552 if caps.flash_addr: 553 parser.add_argument('--dt-flash', default='n', choices=_YN_CHOICES, 554 action=_DTFlashAction, 555 help='''If 'yes', try to use flash address 556 information from devicetree when flash 557 addresses are unknown (e.g. when flashing a .bin)''') 558 else: 559 parser.add_argument('--dt-flash', help=argparse.SUPPRESS) 560 561 if caps.file: 562 parser.add_argument('-f', '--file', 563 dest='file', 564 help="path to binary file") 565 parser.add_argument('-t', '--file-type', 566 dest='file_type', 567 help="type of binary file") 568 else: 569 parser.add_argument('-f', '--file', help=argparse.SUPPRESS) 570 parser.add_argument('-t', '--file-type', help=argparse.SUPPRESS) 571 572 if caps.hide_load_files: 573 parser.add_argument('--elf-file', help=argparse.SUPPRESS) 574 parser.add_argument('--hex-file', help=argparse.SUPPRESS) 575 parser.add_argument('--bin-file', help=argparse.SUPPRESS) 576 else: 577 parser.add_argument('--elf-file', 578 metavar='FILE', 579 action=(partial(depr_action, cls=cls, 580 replacement='-f/--file') if caps.file else None), 581 help='path to zephyr.elf' 582 if not caps.file else 'Deprecated, use -f/--file instead.') 583 parser.add_argument('--hex-file', 584 metavar='FILE', 585 action=(partial(depr_action, cls=cls, 586 replacement='-f/--file') if caps.file else None), 587 help='path to zephyr.hex' 588 if not caps.file else 'Deprecated, use -f/--file instead.') 589 parser.add_argument('--bin-file', 590 metavar='FILE', 591 action=(partial(depr_action, cls=cls, 592 replacement='-f/--file') if caps.file else None), 593 help='path to zephyr.bin' 594 if not caps.file else 'Deprecated, use -f/--file instead.') 595 596 parser.add_argument('--erase', '--no-erase', nargs=0, 597 action=_ToggleAction, 598 help=("mass erase flash before loading, or don't. " 599 "Default action depends on each specific runner." 600 if caps.erase else argparse.SUPPRESS)) 601 602 parser.add_argument('--reset', '--no-reset', nargs=0, 603 action=_ToggleAction, 604 help=("reset device after flashing, or don't. " 605 "Default action depends on each specific runner." 606 if caps.reset else argparse.SUPPRESS)) 607 608 parser.add_argument('--extload', dest='extload', 609 help=(cls.extload_help() if caps.extload 610 else argparse.SUPPRESS)) 611 612 parser.add_argument('-O', '--tool-opt', dest='tool_opt', 613 default=[], action='append', 614 help=(cls.tool_opt_help() if caps.tool_opt 615 else argparse.SUPPRESS)) 616 617 if caps.rtt: 618 parser.add_argument('--rtt-address', dest='rtt_address', 619 type=lambda x: int(x, 0), 620 help="""address of RTT control block. If not supplied, 621 it will be autodetected if possible""") 622 else: 623 parser.add_argument('--rtt-address', help=argparse.SUPPRESS) 624 625 # Runner-specific options. 626 cls.do_add_parser(parser) 627 628 @classmethod 629 @abc.abstractmethod 630 def do_add_parser(cls, parser): 631 '''Hook for adding runner-specific options.''' 632 633 @classmethod # noqa: B027 634 def args_from_previous_runner(cls, previous_runner, 635 args: argparse.Namespace): 636 '''Update arguments from a previously created runner. 637 638 This is intended for propagating relevant user responses 639 between multiple runs of the same runner, for example a 640 JTAG serial number.''' 641 642 @classmethod 643 def create(cls, cfg: RunnerConfig, 644 args: argparse.Namespace) -> 'ZephyrBinaryRunner': 645 '''Create an instance from command-line arguments. 646 647 - ``cfg``: runner configuration (pass to superclass __init__) 648 - ``args``: arguments parsed from execution environment, as 649 specified by ``add_parser()``.''' 650 caps = cls.capabilities() 651 if args.dev_id and not caps.dev_id: 652 _missing_cap(cls, '--dev-id') 653 if args.dt_flash and not caps.flash_addr: 654 _missing_cap(cls, '--dt-flash') 655 if args.erase and not caps.erase: 656 _missing_cap(cls, '--erase') 657 if args.reset and not caps.reset: 658 _missing_cap(cls, '--reset') 659 if args.extload and not caps.extload: 660 _missing_cap(cls, '--extload') 661 if args.tool_opt and not caps.tool_opt: 662 _missing_cap(cls, '--tool-opt') 663 if args.file and not caps.file: 664 _missing_cap(cls, '--file') 665 if args.file_type and not args.file: 666 raise ValueError("--file-type requires --file") 667 if args.file_type and not caps.file: 668 _missing_cap(cls, '--file-type') 669 if args.rtt_address and not caps.rtt: 670 _missing_cap(cls, '--rtt-address') 671 672 ret = cls.do_create(cfg, args) 673 if args.erase: 674 ret.logger.info('mass erase requested') 675 if args.reset: 676 ret.logger.info('reset after flashing requested') 677 return ret 678 679 @classmethod 680 @abc.abstractmethod 681 def do_create(cls, cfg: RunnerConfig, 682 args: argparse.Namespace) -> 'ZephyrBinaryRunner': 683 '''Hook for instance creation from command line arguments.''' 684 685 @staticmethod 686 def get_flash_address(args: argparse.Namespace, 687 build_conf: BuildConfiguration, 688 default: int = 0x0) -> int: 689 '''Helper method for extracting a flash address. 690 691 If args.dt_flash is true, returns the address obtained from 692 ZephyrBinaryRunner.flash_address_from_build_conf(build_conf). 693 694 Otherwise (when args.dt_flash is False), the default value is 695 returned.''' 696 if args.dt_flash: 697 return ZephyrBinaryRunner.flash_address_from_build_conf(build_conf) 698 else: 699 return default 700 701 @staticmethod 702 def flash_address_from_build_conf(build_conf: BuildConfiguration): 703 '''If CONFIG_HAS_FLASH_LOAD_OFFSET is n in build_conf, 704 return the CONFIG_FLASH_BASE_ADDRESS value. Otherwise, return 705 CONFIG_FLASH_BASE_ADDRESS + CONFIG_FLASH_LOAD_OFFSET. 706 ''' 707 if build_conf.getboolean('CONFIG_HAS_FLASH_LOAD_OFFSET'): 708 return (build_conf['CONFIG_FLASH_BASE_ADDRESS'] + 709 build_conf['CONFIG_FLASH_LOAD_OFFSET']) 710 else: 711 return build_conf['CONFIG_FLASH_BASE_ADDRESS'] 712 713 def run(self, command: str, **kwargs): 714 '''Runs command ('flash', 'debug', 'debugserver', 'attach'). 715 716 This is the main entry point to this runner.''' 717 caps = self.capabilities() 718 if command not in caps.commands: 719 raise ValueError(f'runner {self.name()} does not implement command {command}') 720 self.do_run(command, **kwargs) 721 722 @abc.abstractmethod 723 def do_run(self, command: str, **kwargs): 724 '''Concrete runner; run() delegates to this. Implement in subclasses. 725 726 In case of an unsupported command, raise a ValueError.''' 727 728 @property 729 def build_conf(self) -> BuildConfiguration: 730 '''Get a BuildConfiguration for the build directory.''' 731 if not hasattr(self, '_build_conf'): 732 self._build_conf = BuildConfiguration(self.cfg.build_dir) 733 return self._build_conf 734 735 @property 736 def sysbuild_conf(self) -> SysbuildConfiguration: 737 '''Get a SysbuildConfiguration for the sysbuild directory.''' 738 if not hasattr(self, '_sysbuild_conf'): 739 self._sysbuild_conf = SysbuildConfiguration(os.path.dirname(self.cfg.build_dir)) 740 return self._sysbuild_conf 741 742 @property 743 def thread_info_enabled(self) -> bool: 744 '''Returns True if self.build_conf has 745 CONFIG_DEBUG_THREAD_INFO enabled. 746 ''' 747 return self.build_conf.getboolean('CONFIG_DEBUG_THREAD_INFO') 748 749 @classmethod 750 def dev_id_help(cls) -> str: 751 ''' Get the ArgParse help text for the --dev-id option.''' 752 return '''Device identifier. Use it to select 753 which debugger, device, node or instance to 754 target when multiple ones are available or 755 connected.''' 756 757 @classmethod 758 def extload_help(cls) -> str: 759 ''' Get the ArgParse help text for the --extload option.''' 760 return '''External loader to be used by stm32cubeprogrammer 761 to program the targeted external memory. 762 The runner requires the external loader (*.stldr) filename. 763 This external loader (*.stldr) must be located within 764 STM32CubeProgrammer/bin/ExternalLoader directory.''' 765 766 @classmethod 767 def tool_opt_help(cls) -> str: 768 ''' Get the ArgParse help text for the --tool-opt option.''' 769 return '''Option to pass on to the underlying tool used 770 by this runner. This can be given multiple times; 771 the resulting arguments will be given to the tool 772 in the order they appear on the command line.''' 773 774 @staticmethod 775 def require(program: str, path: str | None = None) -> str: 776 '''Require that a program is installed before proceeding. 777 778 :param program: name of the program that is required, 779 or path to a program binary. 780 :param path: PATH where to search for the program binary. 781 By default check on the system PATH. 782 783 If ``program`` is an absolute path to an existing program 784 binary, this call succeeds. Otherwise, try to find the program 785 by name on the system PATH or in the given PATH, if provided. 786 787 If the program can be found, its path is returned. 788 Otherwise, raises MissingProgram.''' 789 ret = shutil.which(program, path=path) 790 if ret is None: 791 raise MissingProgram(program) 792 return ret 793 794 def get_rtt_address(self) -> int | None: 795 '''Helper method for extracting a the RTT control block address. 796 797 If args.rtt_address was supplied, returns that. 798 799 Otherwise, attempt to locate an rtt block in the elf file. 800 If this is not found, None is returned''' 801 if self.cfg.rtt_address is not None: 802 return self.cfg.rtt_address 803 elif self.cfg.elf_file is not None: 804 return find_rtt_block(self.cfg.elf_file) 805 return None 806 807 def run_server_and_client(self, server, client, **kwargs): 808 '''Run a server that ignores SIGINT, and a client that handles it. 809 810 This routine portably: 811 812 - creates a Popen object for the ``server`` command which ignores 813 SIGINT 814 - runs ``client`` in a subprocess while temporarily ignoring SIGINT 815 - cleans up the server after the client exits. 816 - the keyword arguments, if any, will be passed down to both server and 817 client subprocess calls 818 819 It's useful to e.g. open a GDB server and client.''' 820 server_proc = self.popen_ignore_int(server, **kwargs) 821 try: 822 self.run_client(client, **kwargs) 823 finally: 824 server_proc.terminate() 825 server_proc.wait() 826 827 def run_client(self, client, **kwargs): 828 '''Run a client that handles SIGINT.''' 829 previous = signal.signal(signal.SIGINT, signal.SIG_IGN) 830 try: 831 self.check_call(client, **kwargs) 832 finally: 833 signal.signal(signal.SIGINT, previous) 834 835 def _log_cmd(self, cmd: list[str]): 836 escaped = ' '.join(shlex.quote(s) for s in cmd) 837 if not _DRY_RUN: 838 self.logger.debug(escaped) 839 else: 840 self.logger.info(escaped) 841 842 def call(self, cmd: list[str], **kwargs) -> int: 843 '''Subclass subprocess.call() wrapper. 844 845 Subclasses should use this method to run command in a 846 subprocess and get its return code, rather than 847 using subprocess directly, to keep accurate debug logs. 848 ''' 849 self._log_cmd(cmd) 850 if _DRY_RUN: 851 return 0 852 return subprocess.call(cmd, **kwargs) 853 854 def check_call(self, cmd: list[str], **kwargs): 855 '''Subclass subprocess.check_call() wrapper. 856 857 Subclasses should use this method to run command in a 858 subprocess and check that it executed correctly, rather than 859 using subprocess directly, to keep accurate debug logs. 860 ''' 861 self._log_cmd(cmd) 862 if _DRY_RUN: 863 return 864 subprocess.check_call(cmd, **kwargs) 865 866 def check_output(self, cmd: list[str], **kwargs) -> bytes: 867 '''Subclass subprocess.check_output() wrapper. 868 869 Subclasses should use this method to run command in a 870 subprocess and check that it executed correctly, rather than 871 using subprocess directly, to keep accurate debug logs. 872 ''' 873 self._log_cmd(cmd) 874 if _DRY_RUN: 875 return b'' 876 return subprocess.check_output(cmd, **kwargs) 877 878 def popen_ignore_int(self, cmd: list[str], **kwargs) -> subprocess.Popen: 879 '''Spawn a child command, ensuring it ignores SIGINT. 880 881 The returned subprocess.Popen object must be manually terminated.''' 882 cflags = 0 883 preexec = None 884 system = platform.system() 885 886 if system == 'Windows': 887 # We can't type check this line on Unix operating systems: 888 # mypy thinks the subprocess module has no such attribute. 889 cflags |= subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore 890 elif system in {'Linux', 'Darwin'}: 891 # We can't type check this on Windows for the same reason. 892 preexec = os.setsid # type: ignore 893 894 self._log_cmd(cmd) 895 if _DRY_RUN: 896 return _DebugDummyPopen() # type: ignore 897 898 return subprocess.Popen(cmd, creationflags=cflags, preexec_fn=preexec, **kwargs) 899 900 def ensure_output(self, output_type: str) -> None: 901 '''Ensure self.cfg has a particular output artifact. 902 903 For example, ensure_output('bin') ensures that self.cfg.bin_file 904 refers to an existing file. Errors out if it's missing or undefined. 905 906 :param output_type: string naming the output type 907 ''' 908 output_file = getattr(self.cfg, f'{output_type}_file', None) 909 910 if output_file is None: 911 err = f'{output_type} file location is unknown.' 912 elif not os.path.isfile(output_file): 913 err = f'{output_file} does not exist.' 914 else: 915 return 916 917 if output_type in ('elf', 'hex', 'bin', 'uf2'): 918 err += f' Try enabling CONFIG_BUILD_OUTPUT_{output_type.upper()}.' 919 920 # RuntimeError avoids a stack trace saved in run_common. 921 raise RuntimeError(err) 922 923 def run_telnet_client(self, host: str, port: int) -> None: 924 ''' 925 Run a telnet client for user interaction. 926 ''' 927 # If a `nc` command is available, run it, as it will provide the best support for 928 # CONFIG_SHELL_VT100_COMMANDS etc. 929 if shutil.which('nc') is not None: 930 client_cmd = ['nc', host, str(port)] 931 # Note: netcat (nc) does not handle sigint, so cannot use run_client() 932 self.check_call(client_cmd) 933 return 934 935 # Otherwise, use a pure python implementation. This will work well for logging, 936 # but input is line based only. 937 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 938 sock.connect((host, port)) 939 sel = selectors.DefaultSelector() 940 sel.register(sys.stdin, selectors.EVENT_READ) 941 sel.register(sock, selectors.EVENT_READ) 942 while True: 943 events = sel.select() 944 for key, _ in events: 945 if key.fileobj == sys.stdin: 946 text = sys.stdin.readline() 947 if text: 948 sock.send(text.encode()) 949 950 elif key.fileobj == sock: 951 resp = sock.recv(2048) 952 if resp: 953 print(resp.decode()) 954