1# Copyright (c) 2024 TOKITA Hiroshi
2#
3# SPDX-License-Identifier: Apache-2.0
4
5import argparse
6import hashlib
7import io
8import os
9import patoolib
10import platform
11import re
12import requests
13import semver
14import shutil
15import subprocess
16import tempfile
17import textwrap
18import tqdm
19import zcmake
20from pathlib import Path
21
22from west.commands import WestCommand
23
24
25class Sdk(WestCommand):
26    def __init__(self):
27        super().__init__(
28            "sdk",
29            "manage Zephyr SDK",
30            "List and Install Zephyr SDK",
31        )
32
33    def do_add_parser(self, parser_adder):
34        parser = parser_adder.add_parser(
35            self.name,
36            help=self.help,
37            description=self.description,
38            formatter_class=argparse.RawDescriptionHelpFormatter,
39            epilog=textwrap.dedent(
40                """
41            Listing SDKs:
42
43                Run 'west sdk' or 'west sdk list' to list installed SDKs.
44                See 'west sdk list --help' for details.
45
46
47            Installing SDK:
48
49                Run 'west sdk install' to install Zephyr SDK.
50                See 'west sdk install --help' for details.
51            """
52            ),
53        )
54
55        subparsers_gen = parser.add_subparsers(
56            metavar="<subcommand>",
57            dest="subcommand",
58            help="select a subcommand. If omitted, treat it as the 'list' selected.",
59        )
60
61        subparsers_gen.add_parser(
62            "list",
63            help="list installed Zephyr SDKs",
64            formatter_class=argparse.RawDescriptionHelpFormatter,
65            epilog=textwrap.dedent(
66                """
67            Listing SDKs:
68
69                Run 'west sdk' or 'west sdk list' command information about available SDKs is displayed.
70            """
71            ),
72        )
73
74        install_args_parser = subparsers_gen.add_parser(
75            "install",
76            help="install Zephyr SDK",
77            formatter_class=argparse.RawDescriptionHelpFormatter,
78            epilog=textwrap.dedent(
79                """
80            Installing SDK:
81
82                Run 'west sdk install' to install Zephyr SDK.
83
84                Set --version option to install a specific version of the SDK.
85                If not specified, the install version is detected from "${ZEPHYR_BASE}/SDK_VERSION file.
86                SDKs older than 0.14.1 are not supported.
87
88                You can specify the installation directory with --install-dir or --install-base.
89                If the specified version of the SDK is already installed,
90                the already installed SDK will be used regardless of the settings of
91                --install-dir and --install-base.
92
93                Typically, Zephyr SDK archives contain only one directory named zephyr-sdk-<version>
94                at the top level.
95                The SDK archive is extracted to the home directory if both --install-dir and --install-base
96                are not specified.
97                In this case, SDK will install into ${HOME}/zephyr-sdk-<version>.
98                If --install-base is specified, the archive will be extracted under the specified path.
99                In this case, SDK will install into <BASE>/zephyr-sdk-<version> .
100                If --install-dir is specified, the directory contained in the archive will be renamed
101                and placed to the specified location.
102
103                --interactive, --toolchains, --no-toolchains and --no-hosttools options
104                specify the behavior of the installer. Please see the description of each option.
105
106                --personal-access-token specifies the GitHub personal access token.
107                This helps to relax the limits on the number of REST API calls.
108
109                --api-url specifies the REST API endpoint for GitHub releases information
110                when installing the SDK from a different GitHub repository.
111            """
112            ),
113        )
114
115        install_args_parser.add_argument(
116            "--version",
117            default=None,
118            nargs="?",
119            metavar="SDK_VER",
120            help="version of the Zephyr SDK to install. "
121            "If not specified, the install version is detected from "
122            "${ZEPHYR_BASE}/SDK_VERSION file.",
123        )
124        install_args_parser.add_argument(
125            "-b",
126            "--install-base",
127            default=None,
128            metavar="BASE",
129            help="Base directory to SDK install. "
130            "The subdirectory created by extracting the archive in <BASE> will be the SDK installation directory. "
131            "For example, -b /foo/bar will install the SDK in `/foo/bar/zephyr-sdk-<version>'."
132        )
133        install_args_parser.add_argument(
134            "-d",
135            "--install-dir",
136            default=None,
137            metavar="DIR",
138            help="SDK install destination directory. "
139            "The SDK will be installed on the specified path. "
140            "The directory contained in the archive will be renamed and installed for the specified directory. "
141            "For example, if you specify -b /foo/bar/baz, The archive's zephyr-sdk-<version> directory will be renamed baz and placed under /foo/bar. "
142            "If this option is specified, the --install-base option is ignored. "
143        )
144        install_args_parser.add_argument(
145            "-i",
146            "--interactive",
147            action="store_true",
148            help="launches installer in interactive mode. "
149            "--toolchains, --no-toolchains and --no-hosttools will be ignored if this option is enabled.",
150        )
151        install_args_parser.add_argument(
152            "-t",
153            "--toolchains",
154            metavar="toolchain_name",
155            nargs="+",
156            help="toolchain(s) to install (e.g. 'arm-zephyr-eabi'). "
157            "If this option is not given, toolchains for all architectures will be installed.",
158        )
159        install_args_parser.add_argument(
160            "-T",
161            "--no-toolchains",
162            action="store_true",
163            help="do not install toolchains. "
164            "--toolchains will be ignored if this option is enabled.",
165        )
166        install_args_parser.add_argument(
167            "-H",
168            "--no-hosttools",
169            action="store_true",
170            help="do not install host-tools.",
171        )
172        install_args_parser.add_argument(
173            "--personal-access-token", help="GitHub personal access token."
174        )
175        install_args_parser.add_argument(
176            "--api-url",
177            default="https://api.github.com/repos/zephyrproject-rtos/sdk-ng/releases",
178            help="GitHub releases API endpoint used to look for Zephyr SDKs.",
179        )
180
181        return parser
182
183    def os_arch_name(self):
184        system = platform.system()
185        machine = platform.machine()
186
187        if system == "Linux":
188            osname = "linux"
189        elif system == "Darwin":
190            osname = "macos"
191        elif system == "Windows":
192            osname = "windows"
193        else:
194            self.die(f"Unsupported system: {system}")
195
196        if machine in ["aarch64", "arm64"]:
197            arch = "aarch64"
198        elif machine in ["x86_64", "AMD64"]:
199            arch = "x86_64"
200        else:
201            self.die(f"Unsupported machine: {machine}")
202
203        return (osname, arch)
204
205    def detect_version(self, args):
206        if args.version:
207            version = args.version
208        else:
209            if os.environ["ZEPHYR_BASE"]:
210                zephyr_base = Path(os.environ["ZEPHYR_BASE"])
211            else:
212                zephyr_base = Path(__file__).parents[2]
213
214            sdk_version_file = zephyr_base / "SDK_VERSION"
215
216            if not sdk_version_file.exists():
217                self.die(f"{str(sdk_version_file)} does not exist.")
218
219            with open(sdk_version_file) as f:
220                version = f.readlines()[0].strip()
221                self.inf(
222                    f"Found '{str(sdk_version_file)}', installing version {version}."
223                )
224
225        try:
226            semver.Version.parse(version)
227        except Exception:
228            self.die(f"Invalid version format: {version}")
229
230        if semver.compare(version, "0.14.1") < 0:
231            self.die(f"Versions older than v0.14.1 are not supported.")
232
233        return version
234
235    def fetch_releases(self, url, req_headers):
236        """fetch releases data via GitHub REST API"""
237
238        releases = []
239        page = 1
240
241        while True:
242            params = {"page": page, "per_page": 100}
243            resp = requests.get(url, headers=req_headers, params=params)
244            if resp.status_code != 200:
245                raise Exception(f"Failed to fetch: {resp.status_code}, {resp.text}")
246
247            data = resp.json()
248            if not data:
249                break
250
251            releases.extend(data)
252            page += 1
253
254        return releases
255
256    def minimal_sdk_filename(self, release):
257        (osname, arch) = self.os_arch_name()
258        version = re.sub(r"^v", "", release["tag_name"])
259
260        if semver.compare(version, "0.16.0") < 0:
261            if osname == "windows":
262                ext = ".zip"
263            else:
264                ext = ".tar.gz"
265        else:
266            if osname == "windows":
267                ext = ".7z"
268            else:
269                ext = ".tar.xz"
270
271        return f"zephyr-sdk-{version}_{osname}-{arch}_minimal{ext}"
272
273    def minimal_sdk_sha256(self, sha256_list, release):
274        name = self.minimal_sdk_filename(release)
275        tuples = [(re.split(r"\s+", t)) for t in sha256_list.splitlines()]
276        hashtable = {t[1]: t[0] for t in tuples}
277
278        return hashtable[name]
279
280    def minimal_sdk_url(self, release):
281        name = self.minimal_sdk_filename(release)
282        assets = release.get("assets", [])
283        minimal_sdk_asset = next(filter(lambda x: x["name"] == name, assets))
284
285        return minimal_sdk_asset["browser_download_url"]
286
287    def sha256_sum_url(self, release):
288        assets = release.get("assets", [])
289        minimal_sdk_asset = next(filter(lambda x: x["name"] == "sha256.sum", assets))
290
291        return minimal_sdk_asset["browser_download_url"]
292
293    def download_and_extract(self, base_dir, dir_name, target_release, req_headers):
294        self.inf("Fetching sha256...")
295        sha256_url = self.sha256_sum_url(target_release)
296        resp = requests.get(sha256_url, headers=req_headers, stream=True)
297        if resp.status_code != 200:
298            raise Exception(f"Failed to download {sha256_url}: {resp.status_code}")
299
300        sha256 = self.minimal_sdk_sha256(resp.content.decode("UTF-8"), target_release)
301
302        archive_url = self.minimal_sdk_url(target_release)
303        self.inf(f"Downloading {archive_url}...")
304        resp = requests.get(archive_url, headers=req_headers, stream=True)
305        if resp.status_code != 200:
306            raise Exception(f"Failed to download {archive_url}: {resp.status_code}")
307
308        try:
309            Path(base_dir).mkdir(parents=True, exist_ok=True)
310
311            with tempfile.TemporaryDirectory(dir=base_dir) as tempdir:
312                # download archive file
313                filename = Path(tempdir) / re.sub(r"^.*/", "", archive_url)
314                file = open(filename, mode="wb")
315                total_length = int(resp.headers["Content-Length"])
316                count = 0
317
318                class WestLogAdapter(io.StringIO):
319                    buf = []
320                    ctx = None
321
322                    def __init__(self, ctx):
323                        super(__class__, self).__init__()
324                        self.ctx = ctx
325
326                    def write(self, buf):
327                        try:
328                            self.ctx.inf(buf, colorize=False, end='')
329                        except TypeError:
330                            # West v1.1.0 and earlier versions do not support the end parameter.
331                            self.ctx.inf(buf, colorize=False)
332
333                    def flush(self):
334                        pass
335
336                tbar = tqdm.tqdm(
337                    total=total_length,
338                    file=WestLogAdapter(self),
339                    desc=Path(file.name).name,
340                    unit_scale=True,
341                    ncols=shutil.get_terminal_size().columns,
342                    unit="",
343                    bar_format="{desc}: {percentage:3.0f}%|{bar}|   {n_fmt} {rate_fmt}   [{elapsed}]        ",
344                )
345
346                for chunk in resp.iter_content(chunk_size=8192):
347                    tbar.update(len(chunk))
348                    file.write(chunk)
349                    count = count + len(chunk)
350
351                tbar.close()
352
353                self.inf()
354                self.inf(f"Downloaded: {file.name}")
355                file.close()
356
357                # check sha256 hash
358                with open(file.name, "rb") as sha256file:
359                    digest = hashlib.sha256(sha256file.read()).hexdigest()
360                    if sha256 != digest:
361                        raise Exception(f"sha256 mismatched: {sha256}:{digest}")
362
363                # extract archive file
364                self.inf(f"Extract: {file.name}")
365                patoolib.extract_archive(file.name, outdir=tempdir)
366
367                # We expect that only the zephyr-sdk-x.y.z directory will be present in the archive.
368                extracted_dirs = [d for d in Path(tempdir).iterdir() if d.is_dir()]
369                if len(extracted_dirs) != 1:
370                    raise Exception("Unexpected archive format")
371
372                # move to destination dir
373                if dir_name:
374                    dest_dir = Path(base_dir / dir_name)
375                else:
376                    dest_dir = Path(base_dir / extracted_dirs[0].name)
377
378                Path(dest_dir).parent.mkdir(parents=True, exist_ok=True)
379
380                self.inf(f"Move: {str(extracted_dirs[0])} to {dest_dir}.")
381                shutil.move(extracted_dirs[0], dest_dir)
382
383                return dest_dir
384        except PermissionError as pe:
385            self.die(pe)
386
387    def run_setup(self, args, sdk_dir):
388        if "Windows" == platform.system():
389            setup = Path(sdk_dir) / "setup.cmd"
390            optsep = "/"
391        else:
392            setup = Path(sdk_dir) / "setup.sh"
393            optsep = "-"
394
395        # Associate installed SDK so that it can be found.
396        cmds_cmake_pkg = [str(setup), f"{optsep}c"]
397        self.dbg("Run: ", cmds_cmake_pkg)
398        result = subprocess.run(cmds_cmake_pkg)
399        if result.returncode != 0:
400            self.die(f"command \"{' '.join(cmds_cmake_pkg)}\" failed")
401
402        cmds = [str(setup)]
403
404        if not args.interactive and not args.no_toolchains:
405            if not args.toolchains:
406                cmds.extend([f"{optsep}t", "all"])
407            else:
408                for tc in args.toolchains:
409                    cmds.extend([f"{optsep}t", tc])
410
411        if not args.interactive and not args.no_hosttools:
412            cmds.extend([f"{optsep}h"])
413
414        if args.interactive or len(cmds) != 1:
415            self.dbg("Run: ", cmds)
416            result = subprocess.run(cmds)
417            if result.returncode != 0:
418                self.die(f"command \"{' '.join(cmds)}\" failed")
419
420    def install_sdk(self, args, user_args):
421        version = self.detect_version(args)
422        (osname, arch) = self.os_arch_name()
423
424        if args.personal_access_token:
425            req_headers = {
426                "Authorization": f"Bearer {args.personal_access_token}",
427            }
428        else:
429            req_headers = {}
430
431        self.inf("Fetching Zephyr SDK list...")
432        releases = self.fetch_releases(args.api_url, req_headers)
433        self.dbg("releases: ", "\n".join([x["tag_name"] for x in releases]))
434
435        # checking version
436        def check_semver(version):
437            try:
438                semver.Version.parse(version)
439                return True
440            except Exception:
441                return False
442
443        available_versions = [
444            re.sub(r"^v", "", x["tag_name"])
445            for x in releases
446            if check_semver(re.sub(r"^v", "", x["tag_name"]))
447        ]
448
449        if not version in available_versions:
450            self.die(
451                f"Unavailable SDK version: {version}."
452                + "Please select from the list below:\n"
453                + "\n".join(available_versions)
454            )
455
456        target_release = [x for x in releases if x["tag_name"] == f"v{version}"][0]
457
458        # checking toolchains parameters
459        assets = target_release["assets"]
460        self.dbg("assets: ", "\n".join([x["browser_download_url"] for x in assets]))
461
462        prefix = f"toolchain_{osname}-{arch}_"
463        available_toolchains = [
464            re.sub(r"\..*", "", x["name"].replace(prefix, ""))
465            for x in assets
466            if x["name"].startswith(prefix)
467        ]
468
469        if args.toolchains:
470            for tc in args.toolchains:
471                if not tc in available_toolchains:
472                    self.die(
473                        f"toolchain {tc} is not available.\n"
474                        + "Please select from the list below:\n"
475                        + "\n".join(available_toolchains)
476                    )
477
478        installed_info = [v for (k, v) in self.fetch_sdk_info().items() if k == version]
479        if len(installed_info) == 0:
480            if args.install_dir:
481                base_dir = Path(args.install_dir).parent
482                dir_name = Path(args.install_dir).name
483            elif args.install_base:
484                base_dir = Path(args.install_base)
485                dir_name = None
486            else:
487                base_dir = Path("~").expanduser()
488                dir_name = None
489
490            sdk_dir = self.download_and_extract(
491                base_dir, dir_name, target_release, req_headers
492            )
493        else:
494            sdk_dir = Path(installed_info[0]["path"])
495            self.inf(
496                f"Zephyr SDK version {version} is already installed at {str(sdk_dir)}. Using it."
497            )
498
499        self.run_setup(args, sdk_dir)
500
501    def fetch_sdk_info(self):
502        sdk_lines = []
503        try:
504            cmds = [
505                "-P",
506                str(Path(__file__).parent / "sdk" / "listsdk.cmake"),
507            ]
508
509            output = zcmake.run_cmake(cmds, capture_output=True)
510            if output:
511                # remove '-- Zephyr-sdk,' leader
512                sdk_lines = [l[15:] for l in output if l.startswith("-- Zephyr-sdk,")]
513            else:
514                sdk_lines = []
515
516        except Exception as e:
517            self.die(e)
518
519        def parse_sdk_entry(line):
520            class SdkEntry:
521                def __init__(self):
522                    self.version = None
523                    self.path = None
524
525            info = SdkEntry()
526            for ent in line.split(","):
527                kv = ent.split("=")
528                if kv[0].strip() == "ver":
529                    info.version = kv[1].strip()
530                elif kv[0].strip() == "dir":
531                    info.path = kv[1].strip()
532
533            return info
534
535        sdk_info = {}
536        for sdk_ent in [parse_sdk_entry(l) for l in reversed(sdk_lines)]:
537            entry = {}
538
539            ver = None
540            sdk_path = Path(sdk_ent.path)
541            sdk_version_path = sdk_path / "sdk_version"
542            if sdk_version_path.exists():
543                with open(str(sdk_version_path)) as f:
544                    ver = f.readline().strip()
545            else:
546                continue
547
548            entry["path"] = sdk_path
549
550            if (sdk_path / "sysroots").exists():
551                entry["hosttools"] = "installed"
552
553            # Identify toolchain directory by the existence of <toolchain>/bin/<toolchain>-gcc
554            if "Windows" == platform.system():
555                gcc_postfix = "-gcc.exe"
556            else:
557                gcc_postfix = "-gcc"
558
559            toolchains = [
560                tc.name
561                for tc in sdk_path.iterdir()
562                if (sdk_path / tc / "bin" / (tc.name + gcc_postfix)).exists()
563            ]
564
565            if len(toolchains) > 0:
566                entry["toolchains"] = toolchains
567
568            if ver:
569                sdk_info[ver] = entry
570
571        return sdk_info
572
573    def list_sdk(self):
574        sdk_info = self.fetch_sdk_info()
575
576        if len(sdk_info) == 0:
577            self.die("No Zephyr SDK installed.")
578
579        for k, v in sdk_info.items():
580            self.inf(f"{k}:")
581            self.inf(f"  path: {v['path']}")
582            if "hosttools" in v:
583                self.inf(f"  hosttools: {v['hosttools']}")
584            if "toolchains" in v:
585                self.inf("  installed-toolchains:")
586                for tc in v["toolchains"]:
587                    self.inf(f"    - {tc}")
588
589                # Since version 0.15.2, the sdk_toolchains file is included,
590                # so we can get information about available toolchains from there.
591                if (Path(v["path"]) / "sdk_toolchains").exists():
592                    with open(Path(v["path"]) / "sdk_toolchains") as f:
593                        all_tcs = [l.strip() for l in f.readlines()]
594
595                    self.inf("  available-toolchains:")
596                    for tc in all_tcs:
597                        if tc not in v["toolchains"]:
598                            self.inf(f"    - {tc}")
599
600            self.inf()
601
602    def do_run(self, args, user_args):
603        self.dbg("args: ", args)
604        if args.subcommand == "install":
605            self.install_sdk(args, user_args)
606        elif args.subcommand == "list" or not args.subcommand:
607            self.list_sdk()
608