1# Copyright (c) 2022 Nordic Semiconductor ASA 2# 3# SPDX-License-Identifier: Apache-2.0 4 5import argparse 6import os 7import re 8import shutil 9import sys 10import textwrap 11from pathlib import Path 12from urllib.parse import urlparse 13 14from west.commands import WestCommand 15from zephyr_ext_common import ZEPHYR_BASE 16 17sys.path.append(os.fspath(Path(__file__).parent.parent)) 18import zephyr_module 19 20 21class Blobs(WestCommand): 22 DEFAULT_LIST_FMT = '{module} {status} {path} {type} {abspath}' 23 24 def __init__(self): 25 super().__init__( 26 'blobs', 27 # Keep this in sync with the string in west-commands.yml. 28 'work with binary blobs', 29 'Work with binary blobs', 30 accepts_unknown_args=False, 31 ) 32 33 def do_add_parser(self, parser_adder): 34 parser = parser_adder.add_parser( 35 self.name, 36 help=self.help, 37 formatter_class=argparse.RawDescriptionHelpFormatter, 38 description=self.description, 39 epilog=textwrap.dedent(f'''\ 40 FORMAT STRINGS 41 -------------- 42 43 Blobs are listed using a Python 3 format string. Arguments 44 to the format string are accessed by name. 45 46 The default format string is: 47 48 "{self.DEFAULT_LIST_FMT}" 49 50 The following arguments are available: 51 52 - module: name of the module that contains this blob 53 - abspath: blob absolute path 54 - status: short status (A: present, M: hash failure, D: not present) 55 - path: blob local path from <module>/zephyr/blobs/ 56 - sha256: blob SHA256 hash in hex 57 - type: type of blob 58 - version: version string 59 - license_path: path to the license file for the blob 60 - license-abspath: absolute path to the license file for the blob 61 - click-through: need license click-through or not 62 - uri: URI to the remote location of the blob 63 - description: blob text description 64 - doc-url: URL to the documentation for this blob 65 '''), 66 ) 67 68 # Remember to update west-completion.bash if you add or remove 69 # flags 70 parser.add_argument( 71 'subcmd', nargs=1, choices=['list', 'fetch', 'clean'], help='sub-command to execute' 72 ) 73 74 parser.add_argument( 75 'modules', 76 metavar='MODULE', 77 nargs='*', 78 help='''zephyr modules to operate on; 79 all modules will be used if not given''', 80 ) 81 82 group = parser.add_argument_group('west blob list options') 83 group.add_argument( 84 '-f', 85 '--format', 86 help='''format string to use to list each blob; 87 see FORMAT STRINGS below''', 88 ) 89 90 group = parser.add_argument_group('west blobs fetch options') 91 group.add_argument( 92 '-l', 93 '--allow-regex', 94 help='''Regex pattern to apply to the blob local path. 95 Only local paths matching this regex will be fetched. 96 Note that local paths are relative to the module directory''', 97 ) 98 group.add_argument( 99 '-a', 100 '--auto-accept', 101 action='store_true', 102 help='''auto accept license if the fetching needs click-through''', 103 ) 104 group.add_argument( 105 '--cache-dirs', 106 help='''Semicolon-separated list of directories to search for cached 107 blobs before downloading. Cache files may use the original 108 filename or be suffixed with `.<sha256>`.''', 109 ) 110 group.add_argument( 111 '--auto-cache', 112 help='''Path to a directory that is automatically populated when a blob 113 is downloaded. Cached blobs are stored using the original 114 filename suffixed with `.<sha256>`.''', 115 ) 116 117 return parser 118 119 def get_blobs(self, args): 120 blobs = [] 121 modules = args.modules 122 all_modules = zephyr_module.parse_modules(ZEPHYR_BASE, self.manifest) 123 all_names = [m.meta.get('name', None) for m in all_modules] 124 125 unknown = set(modules) - set(all_names) 126 127 if len(unknown): 128 self.die(f'Unknown module(s): {unknown}') 129 130 for module in all_modules: 131 # Filter by module 132 module_name = module.meta.get('name', None) 133 if len(modules) and module_name not in modules: 134 continue 135 136 blobs += zephyr_module.process_blobs(module.project, module.meta) 137 138 return blobs 139 140 def list(self, args): 141 blobs = self.get_blobs(args) 142 fmt = args.format or self.DEFAULT_LIST_FMT 143 for blob in blobs: 144 self.inf(fmt.format(**blob)) 145 146 def ensure_folder(self, path): 147 path.parent.mkdir(parents=True, exist_ok=True) 148 149 def handle_auto_cache(self, blob, auto_cache_dir) -> Path: 150 """ 151 This function guarantees that a given blob exists in the auto-cache. 152 It first checks whether the blob is already present. If so, it 153 returns the path of this cached blob. If the blob is not yet cached, 154 the blob is downloaded into the auto-cache directory and the path of 155 the freshly cached blob is returned. 156 """ 157 cached_blob = self.get_cached_blob(blob, [auto_cache_dir]) 158 if cached_blob: 159 return cached_blob 160 name = Path(blob['path']).name 161 sha256 = blob['sha256'] 162 self.download_blob(blob, auto_cache_dir / f'{name}.{sha256}') 163 cached_blob = self.get_cached_blob(blob, [auto_cache_dir]) 164 assert cached_blob, f'Blob {name} still not cached in auto-cache.' 165 return cached_blob 166 167 def get_cached_blob(self, blob, cache_dirs: list) -> Path | None: 168 """ 169 Look for a cached blob in the provided cache directories. 170 A blob may be stored using either its original name or suffixed with 171 its SHA256 hash (e.g. "<name>.<sha256>"). 172 Return the first matching path, or None if not found. 173 """ 174 name = Path(blob['path']).name 175 sha256 = blob["sha256"] 176 candidate_names = [ 177 f"{name}.{sha256}", # suffixed version 178 name, # original blob name 179 ] 180 181 for cache_dir in cache_dirs: 182 if not cache_dir.exists(): 183 continue 184 for name in candidate_names: 185 candidate_path = cache_dir / name 186 if ( 187 zephyr_module.get_blob_status(candidate_path, sha256) 188 == zephyr_module.BLOB_PRESENT 189 ): 190 return candidate_path 191 return None 192 193 def download_blob(self, blob, path): 194 '''Download a blob from its url to a given path.''' 195 url = blob['url'] 196 scheme = urlparse(url).scheme 197 self.dbg(f'Fetching blob from url {url} with {scheme} to path: {path}') 198 import fetchers 199 200 fetcher = fetchers.get_fetcher_cls(scheme) 201 self.dbg(f'Found fetcher: {fetcher}') 202 inst = fetcher() 203 self.ensure_folder(path) 204 inst.fetch(url, path) 205 206 def fetch_blob(self, args, blob): 207 """ 208 Ensures that the specified blob is available at its path. 209 If caching is enabled and the blob exists in the cache, it is copied 210 from there. Otherwise, the blob is downloaded from its URL and placed 211 at the target path. 212 """ 213 path = Path(blob['abspath']) 214 215 # collect existing cache dirs specified as args, otherwise from west config 216 cache_dirs = args.cache_dirs 217 auto_cache_dir = args.auto_cache 218 if self.has_config: 219 if cache_dirs is None: 220 cache_dirs = self.config.get('blobs.cache-dirs') 221 if auto_cache_dir is None: 222 auto_cache_dir = self.config.get('blobs.auto-cache') 223 224 # expand user home for each cache directory 225 if auto_cache_dir is not None: 226 auto_cache_dir = Path(auto_cache_dir).expanduser() 227 if cache_dirs is not None: 228 cache_dirs = [Path(p).expanduser() for p in cache_dirs.split(';') if p] 229 230 # search for cached blob in the cache directories 231 cached_blob = self.get_cached_blob(blob, cache_dirs or []) 232 233 # If blob is not found in cache directories: Use auto-cache if enabled 234 if not cached_blob and auto_cache_dir: 235 cached_blob = self.handle_auto_cache(blob, auto_cache_dir) 236 237 # Copy blob if it is cached, otherwise download it 238 if cached_blob: 239 self.dbg(f'Copy cached blob: {cached_blob}') 240 self.ensure_folder(path) 241 shutil.copy(cached_blob, path) 242 else: 243 self.download_blob(blob, path) 244 245 # Compare the checksum of a file we've just downloaded 246 # to the digest in blob metadata, warn user if they differ. 247 def verify_blob(self, blob) -> bool: 248 self.dbg(f"Verifying blob {blob['module']}: {blob['abspath']}") 249 250 status = zephyr_module.get_blob_status(blob['abspath'], blob['sha256']) 251 if status == zephyr_module.BLOB_OUTDATED: 252 self.err( 253 textwrap.dedent( 254 f'''\ 255 The checksum of the downloaded file does not match that 256 in the blob metadata: 257 - if it is not certain that the download was successful, 258 try running 'west blobs fetch {blob['module']}' 259 to re-download the file 260 - if the error persists, please consider contacting 261 the maintainers of the module so that they can check 262 the corresponding blob metadata 263 264 Module: {blob['module']} 265 Blob: {blob['path']} 266 URL: {blob['url']} 267 Info: {blob['description']}''' 268 ) 269 ) 270 return False 271 return True 272 273 def fetch(self, args): 274 bad_checksum_count = 0 275 blobs = self.get_blobs(args) 276 for blob in blobs: 277 if blob['status'] == zephyr_module.BLOB_PRESENT: 278 self.dbg(f"Blob {blob['module']}: {blob['abspath']} is up to date") 279 continue 280 281 # if args.allow_regex is set, use it to filter the blob by path 282 if args.allow_regex and not re.match(args.allow_regex, blob['path']): 283 self.dbg( 284 f"Blob {blob['module']}: {blob['abspath']} does not match regex " 285 f"'{args.allow_regex}', skipping" 286 ) 287 continue 288 self.inf(f"Fetching blob {blob['module']}: {blob['abspath']}") 289 290 if blob['click-through'] and not args.auto_accept: 291 while True: 292 user_input = input( 293 "For this blob, need to read and accept " 294 "license to continue. Read it?\n" 295 "Please type 'y' or 'n' and press enter to confirm: " 296 ) 297 if user_input.upper() == "Y" or user_input.upper() == "N": 298 break 299 300 if user_input.upper() != "Y": 301 self.wrn('Skip fetching this blob.') 302 continue 303 304 with open(blob['license-abspath'], encoding="utf-8") as license_file: 305 license_content = license_file.read() 306 print(license_content) 307 308 while True: 309 user_input = input( 310 "Accept license to continue?\n" 311 "Please type 'y' or 'n' and press enter to confirm: " 312 ) 313 if user_input.upper() == "Y" or user_input.upper() == "N": 314 break 315 316 if user_input.upper() != "Y": 317 self.wrn('Skip fetching this blob.') 318 continue 319 320 self.fetch_blob(args, blob) 321 if not self.verify_blob(blob): 322 bad_checksum_count += 1 323 324 if bad_checksum_count: 325 self.err(f"{bad_checksum_count} blobs have bad checksums") 326 sys.exit(os.EX_DATAERR) 327 328 def clean(self, args): 329 blobs = self.get_blobs(args) 330 for blob in blobs: 331 if blob['status'] == zephyr_module.BLOB_NOT_PRESENT: 332 self.dbg(f"Blob {blob['module']}: {blob['abspath']} not in filesystem") 333 continue 334 self.inf(f"Deleting blob {blob['module']}: {blob['status']} {blob['abspath']}") 335 blob['abspath'].unlink() 336 337 def do_run(self, args, _): 338 self.dbg(f"subcmd: '{args.subcmd[0]}' modules: {args.modules}") 339 340 subcmd = getattr(self, args.subcmd[0]) 341 342 if args.subcmd[0] != 'list' and args.format is not None: 343 self.die('unexpected --format argument; this is a "west blobs list" option') 344 345 subcmd(args) 346