1# Copyright (c) 2022 Nordic Semiconductor ASA
2#
3# SPDX-License-Identifier: Apache-2.0
4
5import argparse
6import os
7from pathlib import Path
8import sys
9import textwrap
10from urllib.parse import urlparse
11
12from west.commands import WestCommand
13
14from zephyr_ext_common import ZEPHYR_BASE
15
16sys.path.append(os.fspath(Path(__file__).parent.parent))
17import zephyr_module
18
19class Blobs(WestCommand):
20
21    DEFAULT_LIST_FMT = '{module} {status} {path} {type} {abspath}'
22
23    def __init__(self):
24        super().__init__(
25            'blobs',
26            # Keep this in sync with the string in west-commands.yml.
27            'work with binary blobs',
28            'Work with binary blobs',
29            accepts_unknown_args=False)
30
31    def do_add_parser(self, parser_adder):
32        parser = parser_adder.add_parser(
33            self.name,
34            help=self.help,
35            formatter_class=argparse.RawDescriptionHelpFormatter,
36            description=self.description,
37            epilog=textwrap.dedent(f'''\
38            FORMAT STRINGS
39            --------------
40
41            Blobs are listed using a Python 3 format string. Arguments
42            to the format string are accessed by name.
43
44            The default format string is:
45
46            "{self.DEFAULT_LIST_FMT}"
47
48            The following arguments are available:
49
50            - module: name of the module that contains this blob
51            - abspath: blob absolute path
52            - status: short status (A: present, M: hash failure, D: not present)
53            - path: blob local path from <module>/zephyr/blobs/
54            - sha256: blob SHA256 hash in hex
55            - type: type of blob
56            - version: version string
57            - license_path: path to the license file for the blob
58            - uri: URI to the remote location of the blob
59            - description: blob text description
60            - doc-url: URL to the documentation for this blob
61            '''))
62
63        # Remember to update west-completion.bash if you add or remove
64        # flags
65        parser.add_argument('subcmd', nargs=1,
66                            choices=['list', 'fetch', 'clean'],
67                            help='sub-command to execute')
68
69        parser.add_argument('modules', metavar='MODULE', nargs='*',
70                            help='''zephyr modules to operate on;
71                            all modules will be used if not given''')
72
73        group = parser.add_argument_group('west blob list options')
74        group.add_argument('-f', '--format',
75                            help='''format string to use to list each blob;
76                                    see FORMAT STRINGS below''')
77
78        return parser
79
80    def get_blobs(self, args):
81        blobs = []
82        modules = args.modules
83        all_modules = zephyr_module.parse_modules(ZEPHYR_BASE, self.manifest)
84        all_names = [m.meta.get('name', None) for m in all_modules]
85
86        unknown = set(modules) - set(all_names)
87
88        if len(unknown):
89            self.die(f'Unknown module(s): {unknown}')
90
91        for module in all_modules:
92            # Filter by module
93            module_name = module.meta.get('name', None)
94            if len(modules) and module_name not in modules:
95                continue
96
97            blobs += zephyr_module.process_blobs(module.project, module.meta)
98
99        return blobs
100
101    def list(self, args):
102        blobs = self.get_blobs(args)
103        fmt = args.format or self.DEFAULT_LIST_FMT
104        for blob in blobs:
105            self.inf(fmt.format(**blob))
106
107    def ensure_folder(self, path):
108        path.parent.mkdir(parents=True, exist_ok=True)
109
110    def fetch_blob(self, url, path):
111        scheme = urlparse(url).scheme
112        self.dbg(f'Fetching {path} with {scheme}')
113        import fetchers
114        fetcher = fetchers.get_fetcher_cls(scheme)
115
116        self.dbg(f'Found fetcher: {fetcher}')
117        inst = fetcher()
118        self.ensure_folder(path)
119        inst.fetch(url, path)
120
121    # Compare the checksum of a file we've just downloaded
122    # to the digest in blob metadata, warn user if they differ.
123    def verify_blob(self, blob) -> bool:
124        self.dbg('Verifying blob {module}: {abspath}'.format(**blob))
125
126        status = zephyr_module.get_blob_status(blob['abspath'], blob['sha256'])
127        if status == zephyr_module.BLOB_OUTDATED:
128            self.err(textwrap.dedent(
129                f'''\
130                The checksum of the downloaded file does not match that
131                in the blob metadata:
132                - if it is not certain that the download was successful,
133                  try running 'west blobs fetch {blob['module']}'
134                  to re-download the file
135                - if the error persists, please consider contacting
136                  the maintainers of the module so that they can check
137                  the corresponding blob metadata
138
139                Module: {blob['module']}
140                Blob:   {blob['path']}
141                URL:    {blob['url']}
142                Info:   {blob['description']}'''))
143            return False
144        return True
145
146    def fetch(self, args):
147        bad_checksum_count = 0
148        blobs = self.get_blobs(args)
149        for blob in blobs:
150            if blob['status'] == zephyr_module.BLOB_PRESENT:
151                self.dbg('Blob {module}: {abspath} is up to date'.format(**blob))
152                continue
153            self.inf('Fetching blob {module}: {abspath}'.format(**blob))
154            self.fetch_blob(blob['url'], blob['abspath'])
155            if not self.verify_blob(blob):
156                bad_checksum_count += 1
157
158        if bad_checksum_count:
159            self.err(f"{bad_checksum_count} blobs have bad checksums")
160            sys.exit(os.EX_DATAERR)
161
162    def clean(self, args):
163        blobs = self.get_blobs(args)
164        for blob in blobs:
165            if blob['status'] == zephyr_module.BLOB_NOT_PRESENT:
166                self.dbg('Blob {module}: {abspath} not in filesystem'.format(**blob))
167                continue
168            self.inf('Deleting blob {module}: {status} {abspath}'.format(**blob))
169            blob['abspath'].unlink()
170
171    def do_run(self, args, _):
172        self.dbg(f'subcmd: \'{args.subcmd[0]}\' modules: {args.modules}')
173
174        subcmd = getattr(self, args.subcmd[0])
175
176        if args.subcmd[0] != 'list' and args.format is not None:
177            self.die(f'unexpected --format argument; this is a "west blobs list" option')
178
179        subcmd(args)
180