1# Copyright (c) 2024 TOKITA Hiroshi
2#
3# SPDX-License-Identifier: Apache-2.0
4
5import argparse
6import hashlib
7import os
8import patoolib
9import platform
10import re
11import requests
12import semver
13import shutil
14import subprocess
15import tempfile
16import textwrap
17import zcmake
18from pathlib import Path
19
20from west.commands import WestCommand
21
22
23class Sdk(WestCommand):
24    def __init__(self):
25        super().__init__(
26            "sdk",
27            "manage Zephyr SDK",
28            "List and Install Zephyr SDK",
29        )
30
31    def do_add_parser(self, parser_adder):
32        parser = parser_adder.add_parser(
33            self.name,
34            help=self.help,
35            description=self.description,
36            formatter_class=argparse.RawDescriptionHelpFormatter,
37            epilog=textwrap.dedent(
38                """
39            Listing SDKs:
40
41                Run 'west sdk' or 'west sdk list' to list installed SDKs.
42                See 'west sdk list --help' for details.
43
44
45            Installing SDK:
46
47                Run 'west sdk install' to install Zephyr SDK.
48                See 'west sdk install --help' for details.
49            """
50            ),
51        )
52
53        subparsers_gen = parser.add_subparsers(
54            metavar="<subcommand>",
55            dest="subcommand",
56            help="select a subcommand. If omitted, treat it as the 'list' selected.",
57        )
58
59        subparsers_gen.add_parser(
60            "list",
61            help="list installed Zephyr SDKs",
62            formatter_class=argparse.RawDescriptionHelpFormatter,
63            epilog=textwrap.dedent(
64                """
65            Listing SDKs:
66
67                Run 'west sdk' or 'west sdk list' command information about available SDKs is displayed.
68            """
69            ),
70        )
71
72        install_args_parser = subparsers_gen.add_parser(
73            "install",
74            help="install Zephyr SDK",
75            formatter_class=argparse.RawDescriptionHelpFormatter,
76            epilog=textwrap.dedent(
77                """
78            Installing SDK:
79
80                Run 'west sdk install' to install Zephyr SDK.
81
82                Set --version option to install a specific version of the SDK.
83                If not specified, the install version is detected from "${ZEPHYR_BASE}/SDK_VERSION file.
84                SDKs older than 0.14.1 are not supported.
85
86                You can specify the installation directory with --install-dir or --install-base.
87                If the specified version of the SDK is already installed,
88                the already installed SDK will be used regardless of the settings of
89                --install-dir and --install-base.
90
91                Typically, Zephyr SDK archives contain only one directory named zephyr-sdk-<version>
92                at the top level.
93                The SDK archive is extracted to the home directory if both --install-dir and --install-base
94                are not specified.
95                In this case, SDK will install into ${HOME}/zephyr-sdk-<version>.
96                If --install-base is specified, the archive will be extracted under the specified path.
97                In this case, SDK will install into <BASE>/zephyr-sdk-<version> .
98                If --install-dir is specified, the directory contained in the archive will be renamed
99                and placed to the specified location.
100
101                --interactive, --toolchains, --no-toolchains and --no-hosttools options
102                specify the behavior of the installer. Please see the description of each option.
103
104                --personal-access-token specifies the GitHub personal access token.
105                This helps to relax the limits on the number of REST API calls.
106
107                --api-url specifies the REST API endpoint for GitHub releases information
108                when installing the SDK from a different GitHub repository.
109            """
110            ),
111        )
112
113        install_args_parser.add_argument(
114            "--version",
115            default=None,
116            nargs="?",
117            metavar="SDK_VER",
118            help="version of the Zephyr SDK to install. "
119            "If not specified, the install version is detected from "
120            "${ZEPHYR_BASE}/SDK_VERSION file.",
121        )
122        install_args_parser.add_argument(
123            "-b",
124            "--install-base",
125            default=None,
126            metavar="BASE",
127            help="Base directory to SDK install. "
128            "The subdirectory created by extracting the archive in <BASE> will be the SDK installation directory. "
129            "For example, -b /foo/bar will install the SDK in `/foo/bar/zephyr-sdk-<version>'."
130        )
131        install_args_parser.add_argument(
132            "-d",
133            "--install-dir",
134            default=None,
135            metavar="DIR",
136            help="SDK install destination directory. "
137            "The SDK will be installed on the specified path. "
138            "The directory contained in the archive will be renamed and installed for the specified directory. "
139            "For example, if you specify -b /foo/bar/baz, The archive's zephyr-sdk-<version> directory will be renamed baz and placed under /foo/bar. "
140            "If this option is specified, the --install-base option is ignored. "
141        )
142        install_args_parser.add_argument(
143            "-i",
144            "--interactive",
145            action="store_true",
146            help="launches installer in interactive mode. "
147            "--toolchains, --no-toolchains and --no-hosttools will be ignored if this option is enabled.",
148        )
149        install_args_parser.add_argument(
150            "-t",
151            "--toolchains",
152            metavar="toolchain_name",
153            nargs="+",
154            help="toolchain(s) to install (e.g. 'arm-zephyr-eabi'). "
155            "If this option is not given, toolchains for all architectures will be installed.",
156        )
157        install_args_parser.add_argument(
158            "-T",
159            "--no-toolchains",
160            action="store_true",
161            help="do not install toolchains. "
162            "--toolchains will be ignored if this option is enabled.",
163        )
164        install_args_parser.add_argument(
165            "-H",
166            "--no-hosttools",
167            action="store_true",
168            help="do not install host-tools.",
169        )
170        install_args_parser.add_argument(
171            "--personal-access-token", help="GitHub personal access token."
172        )
173        install_args_parser.add_argument(
174            "--api-url",
175            default="https://api.github.com/repos/zephyrproject-rtos/sdk-ng/releases",
176            help="GitHub releases API endpoint used to look for Zephyr SDKs.",
177        )
178
179        return parser
180
181    def os_arch_name(self):
182        system = platform.system()
183        machine = platform.machine()
184
185        if system == "Linux":
186            osname = "linux"
187        elif system == "Darwin":
188            osname = "macos"
189        elif system == "Windows":
190            osname = "windows"
191        else:
192            self.die(f"Unsupported system: {system}")
193
194        if machine in ["aarch64", "arm64"]:
195            arch = "aarch64"
196        elif machine in ["x86_64", "AMD64"]:
197            arch = "x86_64"
198        else:
199            self.die(f"Unsupported machine: {machine}")
200
201        return (osname, arch)
202
203    def detect_version(self, args):
204        if args.version:
205            version = args.version
206        else:
207            if os.environ["ZEPHYR_BASE"]:
208                zephyr_base = Path(os.environ["ZEPHYR_BASE"])
209            else:
210                zephyr_base = Path(__file__).parents[2]
211
212            sdk_version_file = zephyr_base / "SDK_VERSION"
213
214            if not sdk_version_file.exists():
215                self.die(f"{str(sdk_version_file)} does not exist.")
216
217            with open(sdk_version_file) as f:
218                version = f.readlines()[0].strip()
219                self.inf(
220                    f"Found '{str(sdk_version_file)}', installing version {version}."
221                )
222
223        try:
224            semver.Version.parse(version)
225        except Exception:
226            self.die(f"Invalid version format: {version}")
227
228        if semver.compare(version, "0.14.1") < 0:
229            self.die(f"Versions older than v0.14.1 are not supported.")
230
231        return version
232
233    def fetch_releases(self, url, req_headers):
234        """fetch releases data via GitHub REST API"""
235
236        releases = []
237        page = 1
238
239        while True:
240            params = {"page": page, "per_page": 100}
241            resp = requests.get(url, headers=req_headers, params=params)
242            if resp.status_code != 200:
243                raise Exception(f"Failed to fetch: {resp.status_code}, {resp.text}")
244
245            data = resp.json()
246            if not data:
247                break
248
249            releases.extend(data)
250            page += 1
251
252        return releases
253
254    def minimal_sdk_filename(self, release):
255        (osname, arch) = self.os_arch_name()
256        version = re.sub(r"^v", "", release["tag_name"])
257
258        if semver.compare(version, "0.16.0") < 0:
259            if osname == "windows":
260                ext = ".zip"
261            else:
262                ext = ".tar.gz"
263        else:
264            if osname == "windows":
265                ext = ".7z"
266            else:
267                ext = ".tar.xz"
268
269        return f"zephyr-sdk-{version}_{osname}-{arch}_minimal{ext}"
270
271    def minimal_sdk_sha256(self, sha256_list, release):
272        name = self.minimal_sdk_filename(release)
273        tuples = [(re.split(r"\s+", t)) for t in sha256_list.splitlines()]
274        hashtable = {t[1]: t[0] for t in tuples}
275
276        return hashtable[name]
277
278    def minimal_sdk_url(self, release):
279        name = self.minimal_sdk_filename(release)
280        assets = release.get("assets", [])
281        minimal_sdk_asset = next(filter(lambda x: x["name"] == name, assets))
282
283        return minimal_sdk_asset["browser_download_url"]
284
285    def sha256_sum_url(self, release):
286        assets = release.get("assets", [])
287        minimal_sdk_asset = next(filter(lambda x: x["name"] == "sha256.sum", assets))
288
289        return minimal_sdk_asset["browser_download_url"]
290
291    def download_and_extract(self, base_dir, dir_name, target_release, req_headers):
292        self.inf("Fetching sha256...")
293        sha256_url = self.sha256_sum_url(target_release)
294        resp = requests.get(sha256_url, headers=req_headers, stream=True)
295        if resp.status_code != 200:
296            raise Exception(f"Failed to download {sha256_url}: {resp.status_code}")
297
298        sha256 = self.minimal_sdk_sha256(resp.content.decode("UTF-8"), target_release)
299
300        archive_url = self.minimal_sdk_url(target_release)
301        self.inf(f"Downloading {archive_url}...")
302        resp = requests.get(archive_url, headers=req_headers, stream=True)
303        if resp.status_code != 200:
304            raise Exception(f"Failed to download {archive_url}: {resp.status_code}")
305
306        try:
307            Path(base_dir).mkdir(parents=True, exist_ok=True)
308
309            with tempfile.TemporaryDirectory(dir=base_dir) as tempdir:
310                # download archive file
311                filename = Path(tempdir) / re.sub(r"^.*/", "", archive_url)
312                file = open(filename, mode="wb")
313                total_length = int(resp.headers["Content-Length"])
314                count = 0
315
316                for chunk in resp.iter_content(chunk_size=8192):
317                    file.write(chunk)
318                    count = count + len(chunk)
319                    self.inf(f"\r {count}/{total_length}", end="")
320                self.inf()
321                self.inf(f"Downloaded: {file.name}")
322                file.close()
323
324                # check sha256 hash
325                with open(file.name, "rb") as sha256file:
326                    digest = hashlib.sha256(sha256file.read()).hexdigest()
327                    if sha256 != digest:
328                        raise Exception(f"sha256 mismatched: {sha256}:{digest}")
329
330                # extract archive file
331                self.inf(f"Extract: {file.name}")
332                patoolib.extract_archive(file.name, outdir=tempdir)
333
334                # We expect that only the zephyr-sdk-x.y.z directory will be present in the archive.
335                extracted_dirs = [d for d in Path(tempdir).iterdir() if d.is_dir()]
336                if len(extracted_dirs) != 1:
337                    raise Exception("Unexpected archive format")
338
339                # move to destination dir
340                if dir_name:
341                    dest_dir = Path(base_dir / dir_name)
342                else:
343                    dest_dir = Path(base_dir / extracted_dirs[0].name)
344
345                Path(dest_dir).parent.mkdir(parents=True, exist_ok=True)
346
347                self.inf(f"Move: {str(extracted_dirs[0])} to {dest_dir}.")
348                shutil.move(extracted_dirs[0], dest_dir)
349
350                return dest_dir
351        except PermissionError as pe:
352            self.die(pe)
353
354    def run_setup(self, args, sdk_dir):
355        if "Windows" == platform.system():
356            setup = Path(sdk_dir) / "setup.cmd"
357            optsep = "/"
358        else:
359            setup = Path(sdk_dir) / "setup.sh"
360            optsep = "-"
361
362        # Associate installed SDK so that it can be found.
363        cmds_cmake_pkg = [str(setup), f"{optsep}c"]
364        self.dbg("Run: ", cmds_cmake_pkg)
365        result = subprocess.run(cmds_cmake_pkg)
366        if result.returncode != 0:
367            self.die(f"command \"{' '.join(cmds_cmake_pkg)}\" failed")
368
369        cmds = [str(setup)]
370
371        if not args.interactive and not args.no_toolchains:
372            if not args.toolchains:
373                cmds.extend([f"{optsep}t", "all"])
374            else:
375                for tc in args.toolchains:
376                    cmds.extend([f"{optsep}t", tc])
377
378        if not args.interactive and not args.no_hosttools:
379            cmds.extend([f"{optsep}h"])
380
381        if args.interactive or len(cmds) != 1:
382            self.dbg("Run: ", cmds)
383            result = subprocess.run(cmds)
384            if result.returncode != 0:
385                self.die(f"command \"{' '.join(cmds)}\" failed")
386
387    def install_sdk(self, args, user_args):
388        version = self.detect_version(args)
389        (osname, arch) = self.os_arch_name()
390
391        if args.personal_access_token:
392            req_headers = {
393                "Authorization": f"Bearer {args.personal_access_token}",
394            }
395        else:
396            req_headers = {}
397
398        self.inf("Fetching Zephyr SDK list...")
399        releases = self.fetch_releases(args.api_url, req_headers)
400        self.dbg("releases: ", "\n".join([x["tag_name"] for x in releases]))
401
402        # checking version
403        def check_semver(version):
404            try:
405                semver.Version.parse(version)
406                return True
407            except Exception:
408                return False
409
410        available_versions = [
411            re.sub(r"^v", "", x["tag_name"])
412            for x in releases
413            if check_semver(re.sub(r"^v", "", x["tag_name"]))
414        ]
415
416        if not version in available_versions:
417            self.die(
418                f"Unavailable SDK version: {version}."
419                + "Please select from the list below:\n"
420                + "\n".join(available_versions)
421            )
422
423        target_release = [x for x in releases if x["tag_name"] == f"v{version}"][0]
424
425        # checking toolchains parameters
426        assets = target_release["assets"]
427        self.dbg("assets: ", "\n".join([x["browser_download_url"] for x in assets]))
428
429        prefix = f"toolchain_{osname}-{arch}_"
430        available_toolchains = [
431            re.sub(r"\..*", "", x["name"].replace(prefix, ""))
432            for x in assets
433            if x["name"].startswith(prefix)
434        ]
435
436        if args.toolchains:
437            for tc in args.toolchains:
438                if not tc in available_toolchains:
439                    self.die(
440                        f"toolchain {tc} is not available.\n"
441                        + "Please select from the list below:\n"
442                        + "\n".join(available_toolchains)
443                    )
444
445        installed_info = [v for (k, v) in self.fetch_sdk_info().items() if k == version]
446        if len(installed_info) == 0:
447            if args.install_dir:
448                base_dir = Path(args.install_dir).parent
449                dir_name = Path(args.install_dir).name
450            elif args.install_base:
451                base_dir = Path(args.install_base)
452                dir_name = None
453            else:
454                base_dir = Path("~").expanduser()
455                dir_name = None
456
457            sdk_dir = self.download_and_extract(
458                base_dir, dir_name, target_release, req_headers
459            )
460        else:
461            sdk_dir = Path(installed_info[0]["path"])
462            self.inf(
463                f"Zephyr SDK version {version} is already installed at {str(sdk_dir)}. Using it."
464            )
465
466        self.run_setup(args, sdk_dir)
467
468    def fetch_sdk_info(self):
469        sdk_lines = []
470        try:
471            cmds = [
472                "-P",
473                str(Path(__file__).parent / "sdk" / "listsdk.cmake"),
474            ]
475
476            output = zcmake.run_cmake(cmds, capture_output=True)
477            if output:
478                # remove '-- Zephyr-sdk,' leader
479                sdk_lines = [l[15:] for l in output if l.startswith("-- Zephyr-sdk,")]
480            else:
481                sdk_lines = []
482
483        except Exception as e:
484            self.die(e)
485
486        def parse_sdk_entry(line):
487            class SdkEntry:
488                def __init__(self):
489                    self.version = None
490                    self.path = None
491
492            info = SdkEntry()
493            for ent in line.split(","):
494                kv = ent.split("=")
495                if kv[0].strip() == "ver":
496                    info.version = kv[1].strip()
497                elif kv[0].strip() == "dir":
498                    info.path = kv[1].strip()
499
500            return info
501
502        sdk_info = {}
503        for sdk_ent in [parse_sdk_entry(l) for l in reversed(sdk_lines)]:
504            entry = {}
505
506            ver = None
507            sdk_path = Path(sdk_ent.path)
508            sdk_version_path = sdk_path / "sdk_version"
509            if sdk_version_path.exists():
510                with open(str(sdk_version_path)) as f:
511                    ver = f.readline().strip()
512            else:
513                continue
514
515            entry["path"] = sdk_path
516
517            if (sdk_path / "sysroots").exists():
518                entry["hosttools"] = "installed"
519
520            # Identify toolchain directory by the existence of <toolchain>/bin/<toolchain>-gcc
521            if "Windows" == platform.system():
522                gcc_postfix = "-gcc.exe"
523            else:
524                gcc_postfix = "-gcc"
525
526            toolchains = [
527                tc.name
528                for tc in sdk_path.iterdir()
529                if (sdk_path / tc / "bin" / (tc.name + gcc_postfix)).exists()
530            ]
531
532            if len(toolchains) > 0:
533                entry["toolchains"] = toolchains
534
535            if ver:
536                sdk_info[ver] = entry
537
538        return sdk_info
539
540    def list_sdk(self):
541        sdk_info = self.fetch_sdk_info()
542
543        if len(sdk_info) == 0:
544            self.die("No Zephyr SDK installed.")
545
546        for k, v in sdk_info.items():
547            self.inf(f"{k}:")
548            self.inf(f"  path: {v['path']}")
549            if "hosttools" in v:
550                self.inf(f"  hosttools: {v['hosttools']}")
551            if "toolchains" in v:
552                self.inf("  installed-toolchains:")
553                for tc in v["toolchains"]:
554                    self.inf(f"    - {tc}")
555
556                # Since version 0.15.2, the sdk_toolchains file is included,
557                # so we can get information about available toolchains from there.
558                if (Path(v["path"]) / "sdk_toolchains").exists():
559                    with open(Path(v["path"]) / "sdk_toolchains") as f:
560                        all_tcs = [l.strip() for l in f.readlines()]
561
562                    self.inf("  available-toolchains:")
563                    for tc in all_tcs:
564                        if tc not in v["toolchains"]:
565                            self.inf(f"    - {tc}")
566
567            self.inf()
568
569    def do_run(self, args, user_args):
570        self.dbg("args: ", args)
571        if args.subcommand == "install":
572            self.install_sdk(args, user_args)
573        elif args.subcommand == "list" or not args.subcommand:
574            self.list_sdk()
575