#! /usr/bin/env python3 # Copyright (c) 2017 Linaro Limited. # Copyright (c) 2017 Open Source Foundries Limited. # # SPDX-License-Identifier: Apache-2.0 """Zephyr binary runner core interfaces This provides the core ZephyrBinaryRunner class meant for public use, as well as some other helpers for concrete runner classes. """ import abc import argparse import errno import logging import os import platform import re import selectors import shlex import shutil import signal import socket import subprocess import sys from dataclasses import dataclass, field from enum import Enum from functools import partial from inspect import isabstract from typing import NamedTuple, NoReturn try: from elftools.elf.elffile import ELFFile ELFTOOLS_MISSING = False except ImportError: ELFTOOLS_MISSING = True # Turn on to enable just logging the commands that would be run (at # info rather than debug level), without actually running them. This # can break runners that are expecting output or if one command # depends on another, so it's just for debugging. _DRY_RUN = False _logger = logging.getLogger('runners') # FIXME: I assume this code belongs somewhere else, but i couldn't figure out # a good location for it, so i put it here for now # We could potentially search for RTT blocks in hex or bin files as well, # but since the magic string is "SEGGER RTT", i thought it might be better # to avoid, at the risk of false positives. def find_rtt_block(elf_file: str) -> int | None: if ELFTOOLS_MISSING: raise RuntimeError('the Python dependency elftools was missing; ' 'see the getting started guide for details on ' 'how to fix') with open(elf_file, 'rb') as f: elffile = ELFFile(f) for sect in elffile.iter_sections('SHT_SYMTAB'): symbols = sect.get_symbol_by_name('_SEGGER_RTT') if symbols is None: continue for s in symbols: return s.entry.get('st_value') return None class _DebugDummyPopen: def terminate(self): pass def wait(self): pass MAX_PORT = 49151 class NetworkPortHelper: '''Helper class for dealing with local IP network ports.''' def get_unused_ports(self, starting_from): '''Find unused network ports, starting at given values. starting_from is an iterable of ports the caller would like to use. The return value is an iterable of ports, in the same order, using the given values if they were unused, or the next sequentially available unused port otherwise. Ports may be bound between this call's check and actual usage, so callers still need to handle errors involving returned ports.''' start = list(starting_from) used = self._used_now() ret = [] for desired in start: port = desired while port in used: port += 1 if port > MAX_PORT: msg = "ports above {} are in use" raise ValueError(msg.format(desired)) used.add(port) ret.append(port) return ret def _used_now(self): handlers = { 'Windows': self._used_now_windows, 'Linux': self._used_now_linux, 'Darwin': self._used_now_darwin, } handler = handlers[platform.system()] return handler() def _used_now_windows(self): cmd = ['netstat', '-a', '-n', '-p', 'tcp'] return self._parser_windows(cmd) def _used_now_linux(self): cmd = ['ss', '-a', '-n', '-t'] return self._parser_linux(cmd) def _used_now_darwin(self): cmd = ['netstat', '-a', '-n', '-p', 'tcp'] return self._parser_darwin(cmd) @staticmethod def _parser_windows(cmd): out = subprocess.check_output(cmd).split(b'\r\n') used_bytes = [x.split()[1].rsplit(b':', 1)[1] for x in out if x.startswith(b' TCP')] return {int(b) for b in used_bytes} @staticmethod def _parser_linux(cmd): out = subprocess.check_output(cmd).splitlines()[1:] used_bytes = [s.split()[3].rsplit(b':', 1)[1] for s in out] return {int(b) for b in used_bytes} @staticmethod def _parser_darwin(cmd): out = subprocess.check_output(cmd).split(b'\n') used_bytes = [x.split()[3].rsplit(b':', 1)[1] for x in out if x.startswith(b'tcp')] return {int(b) for b in used_bytes} class BuildConfiguration: '''This helper class provides access to build-time configuration. Configuration options can be read as if the object were a dict, either object['CONFIG_FOO'] or object.get('CONFIG_FOO'). Kconfig configuration values are available (parsed from .config).''' config_prefix = 'CONFIG' def __init__(self, build_dir: str): self.build_dir = build_dir self.options: dict[str, str | int] = {} self.path = os.path.join(self.build_dir, 'zephyr', '.config') self._parse() def __contains__(self, item): return item in self.options def __getitem__(self, item): return self.options[item] def get(self, option, *args): return self.options.get(option, *args) def getboolean(self, option): '''If a boolean option is explicitly set to y or n, returns its value. Otherwise, falls back to False. ''' return self.options.get(option, False) def _parse(self): filename = self.path opt_value = re.compile(f'^(?P