# Copyright (c) 2022 Nordic Semiconductor ASA # # SPDX-License-Identifier: Apache-2.0 import argparse import os import re import shutil import sys import textwrap from pathlib import Path from urllib.parse import urlparse from west.commands import WestCommand from zephyr_ext_common import ZEPHYR_BASE sys.path.append(os.fspath(Path(__file__).parent.parent)) import zephyr_module class Blobs(WestCommand): DEFAULT_LIST_FMT = '{module} {status} {path} {type} {abspath}' def __init__(self): super().__init__( 'blobs', # Keep this in sync with the string in west-commands.yml. 'work with binary blobs', 'Work with binary blobs', accepts_unknown_args=False, ) def do_add_parser(self, parser_adder): parser = parser_adder.add_parser( self.name, help=self.help, formatter_class=argparse.RawDescriptionHelpFormatter, description=self.description, epilog=textwrap.dedent(f'''\ FORMAT STRINGS -------------- Blobs are listed using a Python 3 format string. Arguments to the format string are accessed by name. The default format string is: "{self.DEFAULT_LIST_FMT}" The following arguments are available: - module: name of the module that contains this blob - abspath: blob absolute path - status: short status (A: present, M: hash failure, D: not present) - path: blob local path from /zephyr/blobs/ - sha256: blob SHA256 hash in hex - type: type of blob - version: version string - license_path: path to the license file for the blob - license-abspath: absolute path to the license file for the blob - click-through: need license click-through or not - uri: URI to the remote location of the blob - description: blob text description - doc-url: URL to the documentation for this blob '''), ) # Remember to update west-completion.bash if you add or remove # flags parser.add_argument( 'subcmd', nargs=1, choices=['list', 'fetch', 'clean'], help='sub-command to execute' ) parser.add_argument( 'modules', metavar='MODULE', nargs='*', help='''zephyr modules to operate on; all modules will be used if not given''', ) group = parser.add_argument_group('west blob list options') group.add_argument( '-f', '--format', help='''format string to use to list each blob; see FORMAT STRINGS below''', ) group = parser.add_argument_group('west blobs fetch options') group.add_argument( '-l', '--allow-regex', help='''Regex pattern to apply to the blob local path. Only local paths matching this regex will be fetched. Note that local paths are relative to the module directory''', ) group.add_argument( '-a', '--auto-accept', action='store_true', help='''auto accept license if the fetching needs click-through''', ) group.add_argument( '--cache-dirs', help='''Semicolon-separated list of directories to search for cached blobs before downloading. Cache files may use the original filename or be suffixed with `.`.''', ) group.add_argument( '--auto-cache', help='''Path to a directory that is automatically populated when a blob is downloaded. Cached blobs are stored using the original filename suffixed with `.`.''', ) return parser def get_blobs(self, args): blobs = [] modules = args.modules all_modules = zephyr_module.parse_modules(ZEPHYR_BASE, self.manifest) all_names = [m.meta.get('name', None) for m in all_modules] unknown = set(modules) - set(all_names) if len(unknown): self.die(f'Unknown module(s): {unknown}') for module in all_modules: # Filter by module module_name = module.meta.get('name', None) if len(modules) and module_name not in modules: continue blobs += zephyr_module.process_blobs(module.project, module.meta) return blobs def list(self, args): blobs = self.get_blobs(args) fmt = args.format or self.DEFAULT_LIST_FMT for blob in blobs: self.inf(fmt.format(**blob)) def ensure_folder(self, path): path.parent.mkdir(parents=True, exist_ok=True) def handle_auto_cache(self, blob, auto_cache_dir) -> Path: """ This function guarantees that a given blob exists in the auto-cache. It first checks whether the blob is already present. If so, it returns the path of this cached blob. If the blob is not yet cached, the blob is downloaded into the auto-cache directory and the path of the freshly cached blob is returned. """ cached_blob = self.get_cached_blob(blob, [auto_cache_dir]) if cached_blob: return cached_blob name = Path(blob['path']).name sha256 = blob['sha256'] self.download_blob(blob, auto_cache_dir / f'{name}.{sha256}') cached_blob = self.get_cached_blob(blob, [auto_cache_dir]) assert cached_blob, f'Blob {name} still not cached in auto-cache.' return cached_blob def get_cached_blob(self, blob, cache_dirs: list) -> Path | None: """ Look for a cached blob in the provided cache directories. A blob may be stored using either its original name or suffixed with its SHA256 hash (e.g. "."). Return the first matching path, or None if not found. """ name = Path(blob['path']).name sha256 = blob["sha256"] candidate_names = [ f"{name}.{sha256}", # suffixed version name, # original blob name ] for cache_dir in cache_dirs: if not cache_dir.exists(): continue for name in candidate_names: candidate_path = cache_dir / name if ( zephyr_module.get_blob_status(candidate_path, sha256) == zephyr_module.BLOB_PRESENT ): return candidate_path return None def download_blob(self, blob, path): '''Download a blob from its url to a given path.''' url = blob['url'] scheme = urlparse(url).scheme self.dbg(f'Fetching blob from url {url} with {scheme} to path: {path}') import fetchers fetcher = fetchers.get_fetcher_cls(scheme) self.dbg(f'Found fetcher: {fetcher}') inst = fetcher() self.ensure_folder(path) inst.fetch(url, path) def fetch_blob(self, args, blob): """ Ensures that the specified blob is available at its path. If caching is enabled and the blob exists in the cache, it is copied from there. Otherwise, the blob is downloaded from its URL and placed at the target path. """ path = Path(blob['abspath']) # collect existing cache dirs specified as args, otherwise from west config cache_dirs = args.cache_dirs auto_cache_dir = args.auto_cache if self.has_config: if cache_dirs is None: cache_dirs = self.config.get('blobs.cache-dirs') if auto_cache_dir is None: auto_cache_dir = self.config.get('blobs.auto-cache') # expand user home for each cache directory if auto_cache_dir is not None: auto_cache_dir = Path(auto_cache_dir).expanduser() if cache_dirs is not None: cache_dirs = [Path(p).expanduser() for p in cache_dirs.split(';') if p] # search for cached blob in the cache directories cached_blob = self.get_cached_blob(blob, cache_dirs or []) # If blob is not found in cache directories: Use auto-cache if enabled if not cached_blob and auto_cache_dir: cached_blob = self.handle_auto_cache(blob, auto_cache_dir) # Copy blob if it is cached, otherwise download it if cached_blob: self.dbg(f'Copy cached blob: {cached_blob}') self.ensure_folder(path) shutil.copy(cached_blob, path) else: self.download_blob(blob, path) # Compare the checksum of a file we've just downloaded # to the digest in blob metadata, warn user if they differ. def verify_blob(self, blob) -> bool: self.dbg(f"Verifying blob {blob['module']}: {blob['abspath']}") status = zephyr_module.get_blob_status(blob['abspath'], blob['sha256']) if status == zephyr_module.BLOB_OUTDATED: self.err( textwrap.dedent( f'''\ The checksum of the downloaded file does not match that in the blob metadata: - if it is not certain that the download was successful, try running 'west blobs fetch {blob['module']}' to re-download the file - if the error persists, please consider contacting the maintainers of the module so that they can check the corresponding blob metadata Module: {blob['module']} Blob: {blob['path']} URL: {blob['url']} Info: {blob['description']}''' ) ) return False return True def fetch(self, args): bad_checksum_count = 0 blobs = self.get_blobs(args) for blob in blobs: if blob['status'] == zephyr_module.BLOB_PRESENT: self.dbg(f"Blob {blob['module']}: {blob['abspath']} is up to date") continue # if args.allow_regex is set, use it to filter the blob by path if args.allow_regex and not re.match(args.allow_regex, blob['path']): self.dbg( f"Blob {blob['module']}: {blob['abspath']} does not match regex " f"'{args.allow_regex}', skipping" ) continue self.inf(f"Fetching blob {blob['module']}: {blob['abspath']}") if blob['click-through'] and not args.auto_accept: while True: user_input = input( "For this blob, need to read and accept " "license to continue. Read it?\n" "Please type 'y' or 'n' and press enter to confirm: " ) if user_input.upper() == "Y" or user_input.upper() == "N": break if user_input.upper() != "Y": self.wrn('Skip fetching this blob.') continue with open(blob['license-abspath'], encoding="utf-8") as license_file: license_content = license_file.read() print(license_content) while True: user_input = input( "Accept license to continue?\n" "Please type 'y' or 'n' and press enter to confirm: " ) if user_input.upper() == "Y" or user_input.upper() == "N": break if user_input.upper() != "Y": self.wrn('Skip fetching this blob.') continue self.fetch_blob(args, blob) if not self.verify_blob(blob): bad_checksum_count += 1 if bad_checksum_count: self.err(f"{bad_checksum_count} blobs have bad checksums") sys.exit(os.EX_DATAERR) def clean(self, args): blobs = self.get_blobs(args) for blob in blobs: if blob['status'] == zephyr_module.BLOB_NOT_PRESENT: self.dbg(f"Blob {blob['module']}: {blob['abspath']} not in filesystem") continue self.inf(f"Deleting blob {blob['module']}: {blob['status']} {blob['abspath']}") blob['abspath'].unlink() def do_run(self, args, _): self.dbg(f"subcmd: '{args.subcmd[0]}' modules: {args.modules}") subcmd = getattr(self, args.subcmd[0]) if args.subcmd[0] != 'list' and args.format is not None: self.die('unexpected --format argument; this is a "west blobs list" option') subcmd(args)