1# Copyright (c) 2022 Nordic Semiconductor ASA
2#
3# SPDX-License-Identifier: Apache-2.0
4
5import argparse
6import os
7import re
8import shutil
9import sys
10import textwrap
11from pathlib import Path
12from urllib.parse import urlparse
13
14from west.commands import WestCommand
15from zephyr_ext_common import ZEPHYR_BASE
16
17sys.path.append(os.fspath(Path(__file__).parent.parent))
18import zephyr_module
19
20
21class Blobs(WestCommand):
22    DEFAULT_LIST_FMT = '{module} {status} {path} {type} {abspath}'
23
24    def __init__(self):
25        super().__init__(
26            'blobs',
27            # Keep this in sync with the string in west-commands.yml.
28            'work with binary blobs',
29            'Work with binary blobs',
30            accepts_unknown_args=False,
31        )
32
33    def do_add_parser(self, parser_adder):
34        parser = parser_adder.add_parser(
35            self.name,
36            help=self.help,
37            formatter_class=argparse.RawDescriptionHelpFormatter,
38            description=self.description,
39            epilog=textwrap.dedent(f'''\
40            FORMAT STRINGS
41            --------------
42
43            Blobs are listed using a Python 3 format string. Arguments
44            to the format string are accessed by name.
45
46            The default format string is:
47
48            "{self.DEFAULT_LIST_FMT}"
49
50            The following arguments are available:
51
52            - module: name of the module that contains this blob
53            - abspath: blob absolute path
54            - status: short status (A: present, M: hash failure, D: not present)
55            - path: blob local path from <module>/zephyr/blobs/
56            - sha256: blob SHA256 hash in hex
57            - type: type of blob
58            - version: version string
59            - license_path: path to the license file for the blob
60            - license-abspath: absolute path to the license file for the blob
61            - click-through: need license click-through or not
62            - uri: URI to the remote location of the blob
63            - description: blob text description
64            - doc-url: URL to the documentation for this blob
65            '''),
66        )
67
68        # Remember to update west-completion.bash if you add or remove
69        # flags
70        parser.add_argument(
71            'subcmd', nargs=1, choices=['list', 'fetch', 'clean'], help='sub-command to execute'
72        )
73
74        parser.add_argument(
75            'modules',
76            metavar='MODULE',
77            nargs='*',
78            help='''zephyr modules to operate on;
79                    all modules will be used if not given''',
80        )
81
82        group = parser.add_argument_group('west blob list options')
83        group.add_argument(
84            '-f',
85            '--format',
86            help='''format string to use to list each blob;
87                    see FORMAT STRINGS below''',
88        )
89
90        group = parser.add_argument_group('west blobs fetch options')
91        group.add_argument(
92            '-l',
93            '--allow-regex',
94            help='''Regex pattern to apply to the blob local path.
95                    Only local paths matching this regex will be fetched.
96                    Note that local paths are relative to the module directory''',
97        )
98        group.add_argument(
99            '-a',
100            '--auto-accept',
101            action='store_true',
102            help='''auto accept license if the fetching needs click-through''',
103        )
104        group.add_argument(
105            '--cache-dirs',
106            help='''Semicolon-separated list of directories to search for cached
107                    blobs before downloading. Cache files may use the original
108                    filename or be suffixed with `.<sha256>`.''',
109        )
110        group.add_argument(
111            '--auto-cache',
112            help='''Path to a directory that is automatically populated when a blob
113                    is downloaded. Cached blobs are stored using the original
114                    filename suffixed with `.<sha256>`.''',
115        )
116
117        return parser
118
119    def get_blobs(self, args):
120        blobs = []
121        modules = args.modules
122        all_modules = zephyr_module.parse_modules(ZEPHYR_BASE, self.manifest)
123        all_names = [m.meta.get('name', None) for m in all_modules]
124
125        unknown = set(modules) - set(all_names)
126
127        if len(unknown):
128            self.die(f'Unknown module(s): {unknown}')
129
130        for module in all_modules:
131            # Filter by module
132            module_name = module.meta.get('name', None)
133            if len(modules) and module_name not in modules:
134                continue
135
136            blobs += zephyr_module.process_blobs(module.project, module.meta)
137
138        return blobs
139
140    def list(self, args):
141        blobs = self.get_blobs(args)
142        fmt = args.format or self.DEFAULT_LIST_FMT
143        for blob in blobs:
144            self.inf(fmt.format(**blob))
145
146    def ensure_folder(self, path):
147        path.parent.mkdir(parents=True, exist_ok=True)
148
149    def handle_auto_cache(self, blob, auto_cache_dir) -> Path:
150        """
151        This function guarantees that a given blob exists in the auto-cache.
152        It first checks whether the blob is already present. If so, it
153        returns the path of this cached blob. If the blob is not yet cached,
154        the blob is downloaded into the auto-cache directory and the path of
155        the freshly cached blob is returned.
156        """
157        cached_blob = self.get_cached_blob(blob, [auto_cache_dir])
158        if cached_blob:
159            return cached_blob
160        name = Path(blob['path']).name
161        sha256 = blob['sha256']
162        self.download_blob(blob, auto_cache_dir / f'{name}.{sha256}')
163        cached_blob = self.get_cached_blob(blob, [auto_cache_dir])
164        assert cached_blob, f'Blob {name} still not cached in auto-cache.'
165        return cached_blob
166
167    def get_cached_blob(self, blob, cache_dirs: list) -> Path | None:
168        """
169        Look for a cached blob in the provided cache directories.
170        A blob may be stored using either its original name or suffixed with
171        its SHA256 hash (e.g. "<name>.<sha256>").
172        Return the first matching path, or None if not found.
173        """
174        name = Path(blob['path']).name
175        sha256 = blob["sha256"]
176        candidate_names = [
177            f"{name}.{sha256}",  # suffixed version
178            name,  # original blob name
179        ]
180
181        for cache_dir in cache_dirs:
182            if not cache_dir.exists():
183                continue
184            for name in candidate_names:
185                candidate_path = cache_dir / name
186                if (
187                    zephyr_module.get_blob_status(candidate_path, sha256)
188                    == zephyr_module.BLOB_PRESENT
189                ):
190                    return candidate_path
191        return None
192
193    def download_blob(self, blob, path):
194        '''Download a blob from its url to a given path.'''
195        url = blob['url']
196        scheme = urlparse(url).scheme
197        self.dbg(f'Fetching blob from url {url} with {scheme} to path: {path}')
198        import fetchers
199
200        fetcher = fetchers.get_fetcher_cls(scheme)
201        self.dbg(f'Found fetcher: {fetcher}')
202        inst = fetcher()
203        self.ensure_folder(path)
204        inst.fetch(url, path)
205
206    def fetch_blob(self, args, blob):
207        """
208        Ensures that the specified blob is available at its path.
209        If caching is enabled and the blob exists in the cache, it is copied
210        from there. Otherwise, the blob is downloaded from its URL and placed
211        at the target path.
212        """
213        path = Path(blob['abspath'])
214
215        # collect existing cache dirs specified as args, otherwise from west config
216        cache_dirs = args.cache_dirs
217        auto_cache_dir = args.auto_cache
218        if self.has_config:
219            if cache_dirs is None:
220                cache_dirs = self.config.get('blobs.cache-dirs')
221            if auto_cache_dir is None:
222                auto_cache_dir = self.config.get('blobs.auto-cache')
223
224        # expand user home for each cache directory
225        if auto_cache_dir is not None:
226            auto_cache_dir = Path(auto_cache_dir).expanduser()
227        if cache_dirs is not None:
228            cache_dirs = [Path(p).expanduser() for p in cache_dirs.split(';') if p]
229
230        # search for cached blob in the cache directories
231        cached_blob = self.get_cached_blob(blob, cache_dirs or [])
232
233        # If blob is not found in cache directories: Use auto-cache if enabled
234        if not cached_blob and auto_cache_dir:
235            cached_blob = self.handle_auto_cache(blob, auto_cache_dir)
236
237        # Copy blob if it is cached, otherwise download it
238        if cached_blob:
239            self.dbg(f'Copy cached blob: {cached_blob}')
240            self.ensure_folder(path)
241            shutil.copy(cached_blob, path)
242        else:
243            self.download_blob(blob, path)
244
245    # Compare the checksum of a file we've just downloaded
246    # to the digest in blob metadata, warn user if they differ.
247    def verify_blob(self, blob) -> bool:
248        self.dbg(f"Verifying blob {blob['module']}: {blob['abspath']}")
249
250        status = zephyr_module.get_blob_status(blob['abspath'], blob['sha256'])
251        if status == zephyr_module.BLOB_OUTDATED:
252            self.err(
253                textwrap.dedent(
254                    f'''\
255                The checksum of the downloaded file does not match that
256                in the blob metadata:
257                - if it is not certain that the download was successful,
258                  try running 'west blobs fetch {blob['module']}'
259                  to re-download the file
260                - if the error persists, please consider contacting
261                  the maintainers of the module so that they can check
262                  the corresponding blob metadata
263
264                Module: {blob['module']}
265                Blob:   {blob['path']}
266                URL:    {blob['url']}
267                Info:   {blob['description']}'''
268                )
269            )
270            return False
271        return True
272
273    def fetch(self, args):
274        bad_checksum_count = 0
275        blobs = self.get_blobs(args)
276        for blob in blobs:
277            if blob['status'] == zephyr_module.BLOB_PRESENT:
278                self.dbg(f"Blob {blob['module']}: {blob['abspath']} is up to date")
279                continue
280
281            # if args.allow_regex is set, use it to filter the blob by path
282            if args.allow_regex and not re.match(args.allow_regex, blob['path']):
283                self.dbg(
284                    f"Blob {blob['module']}: {blob['abspath']} does not match regex "
285                    f"'{args.allow_regex}', skipping"
286                )
287                continue
288            self.inf(f"Fetching blob {blob['module']}: {blob['abspath']}")
289
290            if blob['click-through'] and not args.auto_accept:
291                while True:
292                    user_input = input(
293                        "For this blob, need to read and accept "
294                        "license to continue. Read it?\n"
295                        "Please type 'y' or 'n' and press enter to confirm: "
296                    )
297                    if user_input.upper() == "Y" or user_input.upper() == "N":
298                        break
299
300                if user_input.upper() != "Y":
301                    self.wrn('Skip fetching this blob.')
302                    continue
303
304                with open(blob['license-abspath'], encoding="utf-8") as license_file:
305                    license_content = license_file.read()
306                    print(license_content)
307
308                while True:
309                    user_input = input(
310                        "Accept license to continue?\n"
311                        "Please type 'y' or 'n' and press enter to confirm: "
312                    )
313                    if user_input.upper() == "Y" or user_input.upper() == "N":
314                        break
315
316                if user_input.upper() != "Y":
317                    self.wrn('Skip fetching this blob.')
318                    continue
319
320            self.fetch_blob(args, blob)
321            if not self.verify_blob(blob):
322                bad_checksum_count += 1
323
324        if bad_checksum_count:
325            self.err(f"{bad_checksum_count} blobs have bad checksums")
326            sys.exit(os.EX_DATAERR)
327
328    def clean(self, args):
329        blobs = self.get_blobs(args)
330        for blob in blobs:
331            if blob['status'] == zephyr_module.BLOB_NOT_PRESENT:
332                self.dbg(f"Blob {blob['module']}: {blob['abspath']} not in filesystem")
333                continue
334            self.inf(f"Deleting blob {blob['module']}: {blob['status']} {blob['abspath']}")
335            blob['abspath'].unlink()
336
337    def do_run(self, args, _):
338        self.dbg(f"subcmd: '{args.subcmd[0]}' modules: {args.modules}")
339
340        subcmd = getattr(self, args.subcmd[0])
341
342        if args.subcmd[0] != 'list' and args.format is not None:
343            self.die('unexpected --format argument; this is a "west blobs list" option')
344
345        subcmd(args)
346