1#! /usr/bin/env python3 2 3# Copyright (c) 2017 Linaro Limited. 4# Copyright (c) 2017 Open Source Foundries Limited. 5# 6# SPDX-License-Identifier: Apache-2.0 7 8"""Zephyr binary runner core interfaces 9 10This provides the core ZephyrBinaryRunner class meant for public use, 11as well as some other helpers for concrete runner classes. 12""" 13 14import abc 15import argparse 16import errno 17import logging 18import os 19import platform 20import shlex 21import shutil 22import signal 23import subprocess 24import re 25from dataclasses import dataclass, field 26from functools import partial 27from enum import Enum 28from inspect import isabstract 29from typing import Dict, List, NamedTuple, NoReturn, Optional, Set, Type, \ 30 Union 31 32# Turn on to enable just logging the commands that would be run (at 33# info rather than debug level), without actually running them. This 34# can break runners that are expecting output or if one command 35# depends on another, so it's just for debugging. 36_DRY_RUN = False 37 38_logger = logging.getLogger('runners') 39 40 41class _DebugDummyPopen: 42 43 def terminate(self): 44 pass 45 46 def wait(self): 47 pass 48 49 50MAX_PORT = 49151 51 52 53class NetworkPortHelper: 54 '''Helper class for dealing with local IP network ports.''' 55 56 def get_unused_ports(self, starting_from): 57 '''Find unused network ports, starting at given values. 58 59 starting_from is an iterable of ports the caller would like to use. 60 61 The return value is an iterable of ports, in the same order, using 62 the given values if they were unused, or the next sequentially 63 available unused port otherwise. 64 65 Ports may be bound between this call's check and actual usage, so 66 callers still need to handle errors involving returned ports.''' 67 start = list(starting_from) 68 used = self._used_now() 69 ret = [] 70 71 for desired in start: 72 port = desired 73 while port in used: 74 port += 1 75 if port > MAX_PORT: 76 msg = "ports above {} are in use" 77 raise ValueError(msg.format(desired)) 78 used.add(port) 79 ret.append(port) 80 81 return ret 82 83 def _used_now(self): 84 handlers = { 85 'Windows': self._used_now_windows, 86 'Linux': self._used_now_linux, 87 'Darwin': self._used_now_darwin, 88 } 89 handler = handlers[platform.system()] 90 return handler() 91 92 def _used_now_windows(self): 93 cmd = ['netstat', '-a', '-n', '-p', 'tcp'] 94 return self._parser_windows(cmd) 95 96 def _used_now_linux(self): 97 cmd = ['ss', '-a', '-n', '-t'] 98 return self._parser_linux(cmd) 99 100 def _used_now_darwin(self): 101 cmd = ['netstat', '-a', '-n', '-p', 'tcp'] 102 return self._parser_darwin(cmd) 103 104 @staticmethod 105 def _parser_windows(cmd): 106 out = subprocess.check_output(cmd).split(b'\r\n') 107 used_bytes = [x.split()[1].rsplit(b':', 1)[1] for x in out 108 if x.startswith(b' TCP')] 109 return {int(b) for b in used_bytes} 110 111 @staticmethod 112 def _parser_linux(cmd): 113 out = subprocess.check_output(cmd).splitlines()[1:] 114 used_bytes = [s.split()[3].rsplit(b':', 1)[1] for s in out] 115 return {int(b) for b in used_bytes} 116 117 @staticmethod 118 def _parser_darwin(cmd): 119 out = subprocess.check_output(cmd).split(b'\n') 120 used_bytes = [x.split()[3].rsplit(b':', 1)[1] for x in out 121 if x.startswith(b'tcp')] 122 return {int(b) for b in used_bytes} 123 124 125class BuildConfiguration: 126 '''This helper class provides access to build-time configuration. 127 128 Configuration options can be read as if the object were a dict, 129 either object['CONFIG_FOO'] or object.get('CONFIG_FOO'). 130 131 Kconfig configuration values are available (parsed from .config).''' 132 133 def __init__(self, build_dir: str): 134 self.build_dir = build_dir 135 self.options: Dict[str, Union[str, int]] = {} 136 self.path = os.path.join(self.build_dir, 'zephyr', '.config') 137 self._parse() 138 139 def __contains__(self, item): 140 return item in self.options 141 142 def __getitem__(self, item): 143 return self.options[item] 144 145 def get(self, option, *args): 146 return self.options.get(option, *args) 147 148 def getboolean(self, option): 149 '''If a boolean option is explicitly set to y or n, 150 returns its value. Otherwise, falls back to False. 151 ''' 152 return self.options.get(option, False) 153 154 def _parse(self): 155 filename = self.path 156 opt_value = re.compile('^(?P<option>CONFIG_[A-Za-z0-9_]+)=(?P<value>.*)$') 157 not_set = re.compile('^# (?P<option>CONFIG_[A-Za-z0-9_]+) is not set$') 158 159 with open(filename, 'r') as f: 160 for line in f: 161 match = opt_value.match(line) 162 if match: 163 value = match.group('value').rstrip() 164 if value.startswith('"') and value.endswith('"'): 165 # A string literal should have the quotes stripped, 166 # but otherwise be left as is. 167 value = value[1:-1] 168 elif value == 'y': 169 # The character 'y' is a boolean option 170 # that is set to True. 171 value = True 172 else: 173 # Neither a string nor 'y', so try to parse it 174 # as an integer. 175 try: 176 base = 16 if value.startswith('0x') else 10 177 self.options[match.group('option')] = int(value, base=base) 178 continue 179 except ValueError: 180 pass 181 182 self.options[match.group('option')] = value 183 continue 184 185 match = not_set.match(line) 186 if match: 187 # '# CONFIG_FOO is not set' means a boolean option is false. 188 self.options[match.group('option')] = False 189 190class MissingProgram(FileNotFoundError): 191 '''FileNotFoundError subclass for missing program dependencies. 192 193 No significant changes from the parent FileNotFoundError; this is 194 useful for explicitly signaling that the file in question is a 195 program that some class requires to proceed. 196 197 The filename attribute contains the missing program.''' 198 199 def __init__(self, program): 200 super().__init__(errno.ENOENT, os.strerror(errno.ENOENT), program) 201 202 203_RUNNERCAPS_COMMANDS = {'flash', 'debug', 'debugserver', 'attach'} 204 205@dataclass 206class RunnerCaps: 207 '''This class represents a runner class's capabilities. 208 209 Each capability is represented as an attribute with the same 210 name. Flag attributes are True or False. 211 212 Available capabilities: 213 214 - commands: set of supported commands; default is {'flash', 215 'debug', 'debugserver', 'attach'}. 216 217 - dev_id: whether the runner supports device identifiers, in the form of an 218 -i, --dev-id option. This is useful when the user has multiple debuggers 219 connected to a single computer, in order to select which one will be used 220 with the command provided. 221 222 - flash_addr: whether the runner supports flashing to an 223 arbitrary address. Default is False. If true, the runner 224 must honor the --dt-flash option. 225 226 - erase: whether the runner supports an --erase option, which 227 does a mass-erase of the entire addressable flash on the target 228 before flashing. On multi-core SoCs, this may only erase portions of 229 flash specific the actual target core. (This option can be useful for 230 things like clearing out old settings values or other subsystem state 231 that may affect the behavior of the zephyr image. It is also sometimes 232 needed by SoCs which have flash-like areas that can't be sector 233 erased by the underlying tool before flashing; UICR on nRF SoCs 234 is one example.) 235 236 - reset: whether the runner supports a --reset option, which 237 resets the device after a flash operation is complete. 238 239 - tool_opt: whether the runner supports a --tool-opt (-O) option, which 240 can be given multiple times and is passed on to the underlying tool 241 that the runner wraps. 242 243 - file: whether the runner supports a --file option, which specifies 244 exactly the file that should be used to flash, overriding any default 245 discovered in the build directory. 246 ''' 247 248 commands: Set[str] = field(default_factory=lambda: set(_RUNNERCAPS_COMMANDS)) 249 dev_id: bool = False 250 flash_addr: bool = False 251 erase: bool = False 252 reset: bool = False 253 tool_opt: bool = False 254 file: bool = False 255 256 def __post_init__(self): 257 if not self.commands.issubset(_RUNNERCAPS_COMMANDS): 258 raise ValueError(f'{self.commands=} contains invalid command') 259 260 261def _missing_cap(cls: Type['ZephyrBinaryRunner'], option: str) -> NoReturn: 262 # Helper function that's called when an option was given on the 263 # command line that corresponds to a missing capability in the 264 # runner class cls. 265 266 raise ValueError(f"{cls.name()} doesn't support {option} option") 267 268 269class FileType(Enum): 270 OTHER = 0 271 HEX = 1 272 BIN = 2 273 ELF = 3 274 275 276class RunnerConfig(NamedTuple): 277 '''Runner execution-time configuration. 278 279 This is a common object shared by all runners. Individual runners 280 can register specific configuration options using their 281 do_add_parser() hooks. 282 ''' 283 build_dir: str # application build directory 284 board_dir: str # board definition directory 285 elf_file: Optional[str] # zephyr.elf path, or None 286 exe_file: Optional[str] # zephyr.exe path, or None 287 hex_file: Optional[str] # zephyr.hex path, or None 288 bin_file: Optional[str] # zephyr.bin path, or None 289 uf2_file: Optional[str] # zephyr.uf2 path, or None 290 file: Optional[str] # binary file path (provided by the user), or None 291 file_type: Optional[FileType] = FileType.OTHER # binary file type 292 gdb: Optional[str] = None # path to a usable gdb 293 openocd: Optional[str] = None # path to a usable openocd 294 openocd_search: List[str] = [] # add these paths to the openocd search path 295 296 297_YN_CHOICES = ['Y', 'y', 'N', 'n', 'yes', 'no', 'YES', 'NO'] 298 299 300class _DTFlashAction(argparse.Action): 301 302 def __call__(self, parser, namespace, values, option_string=None): 303 if values.lower().startswith('y'): 304 namespace.dt_flash = True 305 else: 306 namespace.dt_flash = False 307 308 309class _ToggleAction(argparse.Action): 310 311 def __call__(self, parser, args, ignored, option): 312 setattr(args, self.dest, not option.startswith('--no-')) 313 314class DeprecatedAction(argparse.Action): 315 316 def __call__(self, parser, namespace, values, option_string=None): 317 _logger.warning(f'Argument {self.option_strings[0]} is deprecated' + 318 (f' for your runner {self._cls.name()}' if self._cls is not None else '') + 319 f', use {self._replacement} instead.') 320 setattr(namespace, self.dest, values) 321 322def depr_action(*args, cls=None, replacement=None, **kwargs): 323 action = DeprecatedAction(*args, **kwargs) 324 setattr(action, '_cls', cls) 325 setattr(action, '_replacement', replacement) 326 return action 327 328class ZephyrBinaryRunner(abc.ABC): 329 '''Abstract superclass for binary runners (flashers, debuggers). 330 331 **Note**: this class's API has changed relatively rarely since it 332 as added, but it is not considered a stable Zephyr API, and may change 333 without notice. 334 335 With some exceptions, boards supported by Zephyr must provide 336 generic means to be flashed (have a Zephyr firmware binary 337 permanently installed on the device for running) and debugged 338 (have a breakpoint debugger and program loader on a host 339 workstation attached to a running target). 340 341 This is supported by four top-level commands managed by the 342 Zephyr build system: 343 344 - 'flash': flash a previously configured binary to the board, 345 start execution on the target, then return. 346 347 - 'debug': connect to the board via a debugging protocol, program 348 the flash, then drop the user into a debugger interface with 349 symbol tables loaded from the current binary, and block until it 350 exits. 351 352 - 'debugserver': connect via a board-specific debugging protocol, 353 then reset and halt the target. Ensure the user is now able to 354 connect to a debug server with symbol tables loaded from the 355 binary. 356 357 - 'attach': connect to the board via a debugging protocol, then drop 358 the user into a debugger interface with symbol tables loaded from 359 the current binary, and block until it exits. Unlike 'debug', this 360 command does not program the flash. 361 362 This class provides an API for these commands. Every subclass is 363 called a 'runner' for short. Each runner has a name (like 364 'pyocd'), and declares commands it can handle (like 365 'flash'). Boards (like 'nrf52dk_nrf52832') declare which runner(s) 366 are compatible with them to the Zephyr build system, along with 367 information on how to configure the runner to work with the board. 368 369 The build system will then place enough information in the build 370 directory to create and use runners with this class's create() 371 method, which provides a command line argument parsing API. You 372 can also create runners by instantiating subclasses directly. 373 374 In order to define your own runner, you need to: 375 376 1. Define a ZephyrBinaryRunner subclass, and implement its 377 abstract methods. You may need to override capabilities(). 378 379 2. Make sure the Python module defining your runner class is 380 imported, e.g. by editing this package's __init__.py (otherwise, 381 get_runners() won't work). 382 383 3. Give your runner's name to the Zephyr build system in your 384 board's board.cmake. 385 386 Additional advice: 387 388 - If you need to import any non-standard-library modules, make sure 389 to catch ImportError and defer complaints about it to a RuntimeError 390 if one is missing. This avoids affecting users that don't require your 391 runner, while still making it clear what went wrong to users that do 392 require it that don't have the necessary modules installed. 393 394 - If you need to ask the user something (e.g. using input()), do it 395 in your create() classmethod, not do_run(). That ensures your 396 __init__() really has everything it needs to call do_run(), and also 397 avoids calling input() when not instantiating within a command line 398 application. 399 400 - Use self.logger to log messages using the standard library's 401 logging API; your logger is named "runner.<your-runner-name()>" 402 403 For command-line invocation from the Zephyr build system, runners 404 define their own argparse-based interface through the common 405 add_parser() (and runner-specific do_add_parser() it delegates 406 to), and provide a way to create instances of themselves from 407 a RunnerConfig and parsed runner-specific arguments via create(). 408 409 Runners use a variety of host tools and configuration values, the 410 user interface to which is abstracted by this class. Each runner 411 subclass should take any values it needs to execute one of these 412 commands in its constructor. The actual command execution is 413 handled in the run() method.''' 414 415 def __init__(self, cfg: RunnerConfig): 416 '''Initialize core runner state.''' 417 418 self.cfg = cfg 419 '''RunnerConfig for this instance.''' 420 421 self.logger = logging.getLogger('runners.{}'.format(self.name())) 422 '''logging.Logger for this instance.''' 423 424 @staticmethod 425 def get_runners() -> List[Type['ZephyrBinaryRunner']]: 426 '''Get a list of all currently defined runner classes.''' 427 def inheritors(klass): 428 subclasses = set() 429 work = [klass] 430 while work: 431 parent = work.pop() 432 for child in parent.__subclasses__(): 433 if child not in subclasses: 434 if not isabstract(child): 435 subclasses.add(child) 436 work.append(child) 437 return subclasses 438 439 return inheritors(ZephyrBinaryRunner) 440 441 @classmethod 442 @abc.abstractmethod 443 def name(cls) -> str: 444 '''Return this runner's user-visible name. 445 446 When choosing a name, pick something short and lowercase, 447 based on the name of the tool (like openocd, jlink, etc.) or 448 the target architecture/board (like xtensa etc.).''' 449 450 @classmethod 451 def capabilities(cls) -> RunnerCaps: 452 '''Returns a RunnerCaps representing this runner's capabilities. 453 454 This implementation returns the default capabilities. 455 456 Subclasses should override appropriately if needed.''' 457 return RunnerCaps() 458 459 @classmethod 460 def add_parser(cls, parser): 461 '''Adds a sub-command parser for this runner. 462 463 The given object, parser, is a sub-command parser from the 464 argparse module. For more details, refer to the documentation 465 for argparse.ArgumentParser.add_subparsers(). 466 467 The lone common optional argument is: 468 469 * --dt-flash (if the runner capabilities includes flash_addr) 470 471 Runner-specific options are added through the do_add_parser() 472 hook.''' 473 # Unfortunately, the parser argument's type is not documented 474 # in typeshed, so we can't type annotate much here. 475 476 # Common options that depend on runner capabilities. If a 477 # capability is not supported, the option string or strings 478 # are added anyway, to prevent an individual runner class from 479 # using them to mean something else. 480 caps = cls.capabilities() 481 482 if caps.dev_id: 483 parser.add_argument('-i', '--dev-id', 484 dest='dev_id', 485 help=cls.dev_id_help()) 486 else: 487 parser.add_argument('-i', '--dev-id', help=argparse.SUPPRESS) 488 489 if caps.flash_addr: 490 parser.add_argument('--dt-flash', default='n', choices=_YN_CHOICES, 491 action=_DTFlashAction, 492 help='''If 'yes', try to use flash address 493 information from devicetree when flash 494 addresses are unknown (e.g. when flashing a .bin)''') 495 else: 496 parser.add_argument('--dt-flash', help=argparse.SUPPRESS) 497 498 if caps.file: 499 parser.add_argument('-f', '--file', 500 dest='file', 501 help="path to binary file") 502 parser.add_argument('-t', '--file-type', 503 dest='file_type', 504 help="type of binary file") 505 else: 506 parser.add_argument('-f', '--file', help=argparse.SUPPRESS) 507 parser.add_argument('-t', '--file-type', help=argparse.SUPPRESS) 508 509 parser.add_argument('--elf-file', 510 metavar='FILE', 511 action=(partial(depr_action, cls=cls, replacement='-f/--file') if caps.file else None), 512 help='path to zephyr.elf' if not caps.file else 'Deprecated, use -f/--file instead.') 513 parser.add_argument('--hex-file', 514 metavar='FILE', 515 action=(partial(depr_action, cls=cls, replacement='-f/--file') if caps.file else None), 516 help='path to zephyr.hex' if not caps.file else 'Deprecated, use -f/--file instead.') 517 parser.add_argument('--bin-file', 518 metavar='FILE', 519 action=(partial(depr_action, cls=cls, replacement='-f/--file') if caps.file else None), 520 help='path to zephyr.bin' if not caps.file else 'Deprecated, use -f/--file instead.') 521 522 parser.add_argument('--erase', '--no-erase', nargs=0, 523 action=_ToggleAction, 524 help=("mass erase flash before loading, or don't. " 525 "Default action depends on each specific runner." 526 if caps.erase else argparse.SUPPRESS)) 527 528 parser.add_argument('--reset', '--no-reset', nargs=0, 529 action=_ToggleAction, 530 help=("reset device after flashing, or don't. " 531 "Default action depends on each specific runner." 532 if caps.reset else argparse.SUPPRESS)) 533 534 parser.add_argument('-O', '--tool-opt', dest='tool_opt', 535 default=[], action='append', 536 help=(cls.tool_opt_help() if caps.tool_opt 537 else argparse.SUPPRESS)) 538 539 # Runner-specific options. 540 cls.do_add_parser(parser) 541 542 @classmethod 543 @abc.abstractmethod 544 def do_add_parser(cls, parser): 545 '''Hook for adding runner-specific options.''' 546 547 @classmethod 548 def create(cls, cfg: RunnerConfig, 549 args: argparse.Namespace) -> 'ZephyrBinaryRunner': 550 '''Create an instance from command-line arguments. 551 552 - ``cfg``: runner configuration (pass to superclass __init__) 553 - ``args``: arguments parsed from execution environment, as 554 specified by ``add_parser()``.''' 555 caps = cls.capabilities() 556 if args.dev_id and not caps.dev_id: 557 _missing_cap(cls, '--dev-id') 558 if args.dt_flash and not caps.flash_addr: 559 _missing_cap(cls, '--dt-flash') 560 if args.erase and not caps.erase: 561 _missing_cap(cls, '--erase') 562 if args.reset and not caps.reset: 563 _missing_cap(cls, '--reset') 564 if args.tool_opt and not caps.tool_opt: 565 _missing_cap(cls, '--tool-opt') 566 if args.file and not caps.file: 567 _missing_cap(cls, '--file') 568 if args.file_type and not args.file: 569 raise ValueError("--file-type requires --file") 570 if args.file_type and not caps.file: 571 _missing_cap(cls, '--file-type') 572 573 ret = cls.do_create(cfg, args) 574 if args.erase: 575 ret.logger.info('mass erase requested') 576 if args.reset: 577 ret.logger.info('reset after flashing requested') 578 return ret 579 580 @classmethod 581 @abc.abstractmethod 582 def do_create(cls, cfg: RunnerConfig, 583 args: argparse.Namespace) -> 'ZephyrBinaryRunner': 584 '''Hook for instance creation from command line arguments.''' 585 586 @staticmethod 587 def get_flash_address(args: argparse.Namespace, 588 build_conf: BuildConfiguration, 589 default: int = 0x0) -> int: 590 '''Helper method for extracting a flash address. 591 592 If args.dt_flash is true, returns the address obtained from 593 ZephyrBinaryRunner.flash_address_from_build_conf(build_conf). 594 595 Otherwise (when args.dt_flash is False), the default value is 596 returned.''' 597 if args.dt_flash: 598 return ZephyrBinaryRunner.flash_address_from_build_conf(build_conf) 599 else: 600 return default 601 602 @staticmethod 603 def flash_address_from_build_conf(build_conf: BuildConfiguration): 604 '''If CONFIG_HAS_FLASH_LOAD_OFFSET is n in build_conf, 605 return the CONFIG_FLASH_BASE_ADDRESS value. Otherwise, return 606 CONFIG_FLASH_BASE_ADDRESS + CONFIG_FLASH_LOAD_OFFSET. 607 ''' 608 if build_conf.getboolean('CONFIG_HAS_FLASH_LOAD_OFFSET'): 609 return (build_conf['CONFIG_FLASH_BASE_ADDRESS'] + 610 build_conf['CONFIG_FLASH_LOAD_OFFSET']) 611 else: 612 return build_conf['CONFIG_FLASH_BASE_ADDRESS'] 613 614 def run(self, command: str, **kwargs): 615 '''Runs command ('flash', 'debug', 'debugserver', 'attach'). 616 617 This is the main entry point to this runner.''' 618 caps = self.capabilities() 619 if command not in caps.commands: 620 raise ValueError('runner {} does not implement command {}'.format( 621 self.name(), command)) 622 self.do_run(command, **kwargs) 623 624 @abc.abstractmethod 625 def do_run(self, command: str, **kwargs): 626 '''Concrete runner; run() delegates to this. Implement in subclasses. 627 628 In case of an unsupported command, raise a ValueError.''' 629 630 @property 631 def build_conf(self) -> BuildConfiguration: 632 '''Get a BuildConfiguration for the build directory.''' 633 if not hasattr(self, '_build_conf'): 634 self._build_conf = BuildConfiguration(self.cfg.build_dir) 635 return self._build_conf 636 637 @property 638 def thread_info_enabled(self) -> bool: 639 '''Returns True if self.build_conf has 640 CONFIG_DEBUG_THREAD_INFO enabled. 641 ''' 642 return self.build_conf.getboolean('CONFIG_DEBUG_THREAD_INFO') 643 644 @classmethod 645 def dev_id_help(cls) -> str: 646 ''' Get the ArgParse help text for the --dev-id option.''' 647 return '''Device identifier. Use it to select 648 which debugger, device, node or instance to 649 target when multiple ones are available or 650 connected.''' 651 652 @classmethod 653 def tool_opt_help(cls) -> str: 654 ''' Get the ArgParse help text for the --tool-opt option.''' 655 return '''Option to pass on to the underlying tool used 656 by this runner. This can be given multiple times; 657 the resulting arguments will be given to the tool 658 in the order they appear on the command line.''' 659 660 @staticmethod 661 def require(program: str, path: Optional[str] = None) -> str: 662 '''Require that a program is installed before proceeding. 663 664 :param program: name of the program that is required, 665 or path to a program binary. 666 :param path: PATH where to search for the program binary. 667 By default check on the system PATH. 668 669 If ``program`` is an absolute path to an existing program 670 binary, this call succeeds. Otherwise, try to find the program 671 by name on the system PATH or in the given PATH, if provided. 672 673 If the program can be found, its path is returned. 674 Otherwise, raises MissingProgram.''' 675 ret = shutil.which(program, path=path) 676 if ret is None: 677 raise MissingProgram(program) 678 return ret 679 680 def run_server_and_client(self, server, client, **kwargs): 681 '''Run a server that ignores SIGINT, and a client that handles it. 682 683 This routine portably: 684 685 - creates a Popen object for the ``server`` command which ignores 686 SIGINT 687 - runs ``client`` in a subprocess while temporarily ignoring SIGINT 688 - cleans up the server after the client exits. 689 - the keyword arguments, if any, will be passed down to both server and 690 client subprocess calls 691 692 It's useful to e.g. open a GDB server and client.''' 693 server_proc = self.popen_ignore_int(server, **kwargs) 694 try: 695 self.run_client(client, **kwargs) 696 finally: 697 server_proc.terminate() 698 server_proc.wait() 699 700 def run_client(self, client, **kwargs): 701 '''Run a client that handles SIGINT.''' 702 previous = signal.signal(signal.SIGINT, signal.SIG_IGN) 703 try: 704 self.check_call(client, **kwargs) 705 finally: 706 signal.signal(signal.SIGINT, previous) 707 708 def _log_cmd(self, cmd: List[str]): 709 escaped = ' '.join(shlex.quote(s) for s in cmd) 710 if not _DRY_RUN: 711 self.logger.debug(escaped) 712 else: 713 self.logger.info(escaped) 714 715 def call(self, cmd: List[str], **kwargs) -> int: 716 '''Subclass subprocess.call() wrapper. 717 718 Subclasses should use this method to run command in a 719 subprocess and get its return code, rather than 720 using subprocess directly, to keep accurate debug logs. 721 ''' 722 self._log_cmd(cmd) 723 if _DRY_RUN: 724 return 0 725 return subprocess.call(cmd, **kwargs) 726 727 def check_call(self, cmd: List[str], **kwargs): 728 '''Subclass subprocess.check_call() wrapper. 729 730 Subclasses should use this method to run command in a 731 subprocess and check that it executed correctly, rather than 732 using subprocess directly, to keep accurate debug logs. 733 ''' 734 self._log_cmd(cmd) 735 if _DRY_RUN: 736 return 737 subprocess.check_call(cmd, **kwargs) 738 739 def check_output(self, cmd: List[str], **kwargs) -> bytes: 740 '''Subclass subprocess.check_output() wrapper. 741 742 Subclasses should use this method to run command in a 743 subprocess and check that it executed correctly, rather than 744 using subprocess directly, to keep accurate debug logs. 745 ''' 746 self._log_cmd(cmd) 747 if _DRY_RUN: 748 return b'' 749 return subprocess.check_output(cmd, **kwargs) 750 751 def popen_ignore_int(self, cmd: List[str], **kwargs) -> subprocess.Popen: 752 '''Spawn a child command, ensuring it ignores SIGINT. 753 754 The returned subprocess.Popen object must be manually terminated.''' 755 cflags = 0 756 preexec = None 757 system = platform.system() 758 759 if system == 'Windows': 760 # We can't type check this line on Unix operating systems: 761 # mypy thinks the subprocess module has no such attribute. 762 cflags |= subprocess.CREATE_NEW_PROCESS_GROUP # type: ignore 763 elif system in {'Linux', 'Darwin'}: 764 # We can't type check this on Windows for the same reason. 765 preexec = os.setsid # type: ignore 766 767 self._log_cmd(cmd) 768 if _DRY_RUN: 769 return _DebugDummyPopen() # type: ignore 770 771 return subprocess.Popen(cmd, creationflags=cflags, preexec_fn=preexec, **kwargs) 772 773 def ensure_output(self, output_type: str) -> None: 774 '''Ensure self.cfg has a particular output artifact. 775 776 For example, ensure_output('bin') ensures that self.cfg.bin_file 777 refers to an existing file. Errors out if it's missing or undefined. 778 779 :param output_type: string naming the output type 780 ''' 781 output_file = getattr(self.cfg, f'{output_type}_file', None) 782 783 if output_file is None: 784 err = f'{output_type} file location is unknown.' 785 elif not os.path.isfile(output_file): 786 err = f'{output_file} does not exist.' 787 else: 788 return 789 790 if output_type in ('elf', 'hex', 'bin', 'uf2'): 791 err += f' Try enabling CONFIG_BUILD_OUTPUT_{output_type.upper()}.' 792 793 # RuntimeError avoids a stack trace saved in run_common. 794 raise RuntimeError(err) 795