1 # Copyright (c) 2024 TOKITA Hiroshi
2 #
3 # SPDX-License-Identifier: Apache-2.0
4 
5 import argparse
6 import hashlib
7 import io
8 import os
9 import patoolib
10 import platform
11 import re
12 import requests
13 import semver
14 import shutil
15 import subprocess
16 import tempfile
17 import textwrap
18 import tqdm
19 import zcmake
20 from pathlib import Path
21 
22 from west.commands import WestCommand
23 
24 
25 class Sdk(WestCommand):
26     def __init__(self):
27         super().__init__(
28             "sdk",
29             "manage Zephyr SDK",
30             "List and Install Zephyr SDK",
31         )
32 
33     def do_add_parser(self, parser_adder):
34         parser = parser_adder.add_parser(
35             self.name,
36             help=self.help,
37             description=self.description,
38             formatter_class=argparse.RawDescriptionHelpFormatter,
39             epilog=textwrap.dedent(
40                 """
41             Listing SDKs:
42 
43                 Run 'west sdk' or 'west sdk list' to list installed SDKs.
44                 See 'west sdk list --help' for details.
45 
46 
47             Installing SDK:
48 
49                 Run 'west sdk install' to install Zephyr SDK.
50                 See 'west sdk install --help' for details.
51             """
52             ),
53         )
54 
55         subparsers_gen = parser.add_subparsers(
56             metavar="<subcommand>",
57             dest="subcommand",
58             help="select a subcommand. If omitted, treat it as the 'list' selected.",
59         )
60 
61         subparsers_gen.add_parser(
62             "list",
63             help="list installed Zephyr SDKs",
64             formatter_class=argparse.RawDescriptionHelpFormatter,
65             epilog=textwrap.dedent(
66                 """
67             Listing SDKs:
68 
69                 Run 'west sdk' or 'west sdk list' command information about available SDKs is displayed.
70             """
71             ),
72         )
73 
74         install_args_parser = subparsers_gen.add_parser(
75             "install",
76             help="install Zephyr SDK",
77             formatter_class=argparse.RawDescriptionHelpFormatter,
78             epilog=textwrap.dedent(
79                 """
80             Installing SDK:
81 
82                 Run 'west sdk install' to install Zephyr SDK.
83 
84                 Set --version option to install a specific version of the SDK.
85                 If not specified, the install version is detected from "${ZEPHYR_BASE}/SDK_VERSION file.
86                 SDKs older than 0.14.1 are not supported.
87 
88                 You can specify the installation directory with --install-dir or --install-base.
89                 If the specified version of the SDK is already installed,
90                 the already installed SDK will be used regardless of the settings of
91                 --install-dir and --install-base.
92 
93                 Typically, Zephyr SDK archives contain only one directory named zephyr-sdk-<version>
94                 at the top level.
95                 The SDK archive is extracted to the home directory if both --install-dir and --install-base
96                 are not specified.
97                 In this case, SDK will install into ${HOME}/zephyr-sdk-<version>.
98                 If --install-base is specified, the archive will be extracted under the specified path.
99                 In this case, SDK will install into <BASE>/zephyr-sdk-<version> .
100                 If --install-dir is specified, the directory contained in the archive will be renamed
101                 and placed to the specified location.
102 
103                 --interactive, --toolchains, --no-toolchains and --no-hosttools options
104                 specify the behavior of the installer. Please see the description of each option.
105 
106                 --personal-access-token specifies the GitHub personal access token.
107                 This helps to relax the limits on the number of REST API calls.
108 
109                 --api-url specifies the REST API endpoint for GitHub releases information
110                 when installing the SDK from a different GitHub repository.
111             """
112             ),
113         )
114 
115         install_args_parser.add_argument(
116             "--version",
117             default=None,
118             nargs="?",
119             metavar="SDK_VER",
120             help="version of the Zephyr SDK to install. "
121             "If not specified, the install version is detected from "
122             "${ZEPHYR_BASE}/SDK_VERSION file.",
123         )
124         install_args_parser.add_argument(
125             "-b",
126             "--install-base",
127             default=None,
128             metavar="BASE",
129             help="Base directory to SDK install. "
130             "The subdirectory created by extracting the archive in <BASE> will be the SDK installation directory. "
131             "For example, -b /foo/bar will install the SDK in `/foo/bar/zephyr-sdk-<version>'."
132         )
133         install_args_parser.add_argument(
134             "-d",
135             "--install-dir",
136             default=None,
137             metavar="DIR",
138             help="SDK install destination directory. "
139             "The SDK will be installed on the specified path. "
140             "The directory contained in the archive will be renamed and installed for the specified directory. "
141             "For example, if you specify -b /foo/bar/baz, The archive's zephyr-sdk-<version> directory will be renamed baz and placed under /foo/bar. "
142             "If this option is specified, the --install-base option is ignored. "
143         )
144         install_args_parser.add_argument(
145             "-i",
146             "--interactive",
147             action="store_true",
148             help="launches installer in interactive mode. "
149             "--toolchains, --no-toolchains and --no-hosttools will be ignored if this option is enabled.",
150         )
151         install_args_parser.add_argument(
152             "-t",
153             "--toolchains",
154             metavar="toolchain_name",
155             nargs="+",
156             help="toolchain(s) to install (e.g. 'arm-zephyr-eabi'). "
157             "If this option is not given, toolchains for all architectures will be installed.",
158         )
159         install_args_parser.add_argument(
160             "-T",
161             "--no-toolchains",
162             action="store_true",
163             help="do not install toolchains. "
164             "--toolchains will be ignored if this option is enabled.",
165         )
166         install_args_parser.add_argument(
167             "-H",
168             "--no-hosttools",
169             action="store_true",
170             help="do not install host-tools.",
171         )
172         install_args_parser.add_argument(
173             "--personal-access-token", help="GitHub personal access token."
174         )
175         install_args_parser.add_argument(
176             "--api-url",
177             default="https://api.github.com/repos/zephyrproject-rtos/sdk-ng/releases",
178             help="GitHub releases API endpoint used to look for Zephyr SDKs.",
179         )
180 
181         return parser
182 
183     def os_arch_name(self):
184         system = platform.system()
185         machine = platform.machine()
186 
187         if system == "Linux":
188             osname = "linux"
189         elif system == "Darwin":
190             osname = "macos"
191         elif system == "Windows":
192             osname = "windows"
193         else:
194             self.die(f"Unsupported system: {system}")
195 
196         if machine in ["aarch64", "arm64"]:
197             arch = "aarch64"
198         elif machine in ["x86_64", "AMD64"]:
199             arch = "x86_64"
200         else:
201             self.die(f"Unsupported machine: {machine}")
202 
203         return (osname, arch)
204 
205     def detect_version(self, args):
206         if args.version:
207             version = args.version
208         else:
209             if os.environ["ZEPHYR_BASE"]:
210                 zephyr_base = Path(os.environ["ZEPHYR_BASE"])
211             else:
212                 zephyr_base = Path(__file__).parents[2]
213 
214             sdk_version_file = zephyr_base / "SDK_VERSION"
215 
216             if not sdk_version_file.exists():
217                 self.die(f"{str(sdk_version_file)} does not exist.")
218 
219             with open(sdk_version_file) as f:
220                 version = f.readlines()[0].strip()
221                 self.inf(
222                     f"Found '{str(sdk_version_file)}', installing version {version}."
223                 )
224 
225         try:
226             semver.Version.parse(version)
227         except Exception:
228             self.die(f"Invalid version format: {version}")
229 
230         if semver.compare(version, "0.14.1") < 0:
231             self.die(f"Versions older than v0.14.1 are not supported.")
232 
233         return version
234 
235     def fetch_releases(self, url, req_headers):
236         """fetch releases data via GitHub REST API"""
237 
238         releases = []
239         page = 1
240 
241         while True:
242             params = {"page": page, "per_page": 100}
243             resp = requests.get(url, headers=req_headers, params=params)
244             if resp.status_code != 200:
245                 raise Exception(f"Failed to fetch: {resp.status_code}, {resp.text}")
246 
247             data = resp.json()
248             if not data:
249                 break
250 
251             releases.extend(data)
252             page += 1
253 
254         return releases
255 
256     def minimal_sdk_filename(self, release):
257         (osname, arch) = self.os_arch_name()
258         version = re.sub(r"^v", "", release["tag_name"])
259 
260         if semver.compare(version, "0.16.0") < 0:
261             if osname == "windows":
262                 ext = ".zip"
263             else:
264                 ext = ".tar.gz"
265         else:
266             if osname == "windows":
267                 ext = ".7z"
268             else:
269                 ext = ".tar.xz"
270 
271         return f"zephyr-sdk-{version}_{osname}-{arch}_minimal{ext}"
272 
273     def minimal_sdk_sha256(self, sha256_list, release):
274         name = self.minimal_sdk_filename(release)
275         tuples = [(re.split(r"\s+", t)) for t in sha256_list.splitlines()]
276         hashtable = {t[1]: t[0] for t in tuples}
277 
278         return hashtable[name]
279 
280     def minimal_sdk_url(self, release):
281         name = self.minimal_sdk_filename(release)
282         assets = release.get("assets", [])
283         minimal_sdk_asset = next(filter(lambda x: x["name"] == name, assets))
284 
285         return minimal_sdk_asset["browser_download_url"]
286 
287     def sha256_sum_url(self, release):
288         assets = release.get("assets", [])
289         minimal_sdk_asset = next(filter(lambda x: x["name"] == "sha256.sum", assets))
290 
291         return minimal_sdk_asset["browser_download_url"]
292 
293     def download_and_extract(self, base_dir, dir_name, target_release, req_headers):
294         self.inf("Fetching sha256...")
295         sha256_url = self.sha256_sum_url(target_release)
296         resp = requests.get(sha256_url, headers=req_headers, stream=True)
297         if resp.status_code != 200:
298             raise Exception(f"Failed to download {sha256_url}: {resp.status_code}")
299 
300         sha256 = self.minimal_sdk_sha256(resp.content.decode("UTF-8"), target_release)
301 
302         archive_url = self.minimal_sdk_url(target_release)
303         self.inf(f"Downloading {archive_url}...")
304         resp = requests.get(archive_url, headers=req_headers, stream=True)
305         if resp.status_code != 200:
306             raise Exception(f"Failed to download {archive_url}: {resp.status_code}")
307 
308         try:
309             Path(base_dir).mkdir(parents=True, exist_ok=True)
310 
311             with tempfile.TemporaryDirectory(dir=base_dir) as tempdir:
312                 # download archive file
313                 filename = Path(tempdir) / re.sub(r"^.*/", "", archive_url)
314                 file = open(filename, mode="wb")
315                 total_length = int(resp.headers["Content-Length"])
316                 count = 0
317 
318                 class WestLogAdapter(io.StringIO):
319                     buf = []
320                     ctx = None
321 
322                     def __init__(self, ctx):
323                         super(__class__, self).__init__()
324                         self.ctx = ctx
325 
326                     def write(self, buf):
327                         try:
328                             self.ctx.inf(buf, colorize=False, end='')
329                         except TypeError:
330                             # West v1.1.0 and earlier versions do not support the end parameter.
331                             self.ctx.inf(buf, colorize=False)
332 
333                     def flush(self):
334                         pass
335 
336                 tbar = tqdm.tqdm(
337                     total=total_length,
338                     file=WestLogAdapter(self),
339                     desc=Path(file.name).name,
340                     unit_scale=True,
341                     ncols=shutil.get_terminal_size().columns,
342                     unit="",
343                     bar_format="{desc}: {percentage:3.0f}%|{bar}|   {n_fmt} {rate_fmt}   [{elapsed}]        ",
344                 )
345 
346                 for chunk in resp.iter_content(chunk_size=8192):
347                     tbar.update(len(chunk))
348                     file.write(chunk)
349                     count = count + len(chunk)
350 
351                 tbar.close()
352 
353                 self.inf()
354                 self.inf(f"Downloaded: {file.name}")
355                 file.close()
356 
357                 # check sha256 hash
358                 with open(file.name, "rb") as sha256file:
359                     digest = hashlib.sha256(sha256file.read()).hexdigest()
360                     if sha256 != digest:
361                         raise Exception(f"sha256 mismatched: {sha256}:{digest}")
362 
363                 # extract archive file
364                 self.inf(f"Extract: {file.name}")
365                 patoolib.extract_archive(file.name, outdir=tempdir)
366 
367                 # We expect that only the zephyr-sdk-x.y.z directory will be present in the archive.
368                 extracted_dirs = [d for d in Path(tempdir).iterdir() if d.is_dir()]
369                 if len(extracted_dirs) != 1:
370                     raise Exception("Unexpected archive format")
371 
372                 # move to destination dir
373                 if dir_name:
374                     dest_dir = Path(base_dir / dir_name)
375                 else:
376                     dest_dir = Path(base_dir / extracted_dirs[0].name)
377 
378                 Path(dest_dir).parent.mkdir(parents=True, exist_ok=True)
379 
380                 self.inf(f"Move: {str(extracted_dirs[0])} to {dest_dir}.")
381                 shutil.move(extracted_dirs[0], dest_dir)
382 
383                 return dest_dir
384         except PermissionError as pe:
385             self.die(pe)
386 
387     def run_setup(self, args, sdk_dir):
388         if "Windows" == platform.system():
389             setup = Path(sdk_dir) / "setup.cmd"
390             optsep = "/"
391         else:
392             setup = Path(sdk_dir) / "setup.sh"
393             optsep = "-"
394 
395         # Associate installed SDK so that it can be found.
396         cmds_cmake_pkg = [str(setup), f"{optsep}c"]
397         self.dbg("Run: ", cmds_cmake_pkg)
398         result = subprocess.run(cmds_cmake_pkg)
399         if result.returncode != 0:
400             self.die(f"command \"{' '.join(cmds_cmake_pkg)}\" failed")
401 
402         cmds = [str(setup)]
403 
404         if not args.interactive and not args.no_toolchains:
405             if not args.toolchains:
406                 cmds.extend([f"{optsep}t", "all"])
407             else:
408                 for tc in args.toolchains:
409                     cmds.extend([f"{optsep}t", tc])
410 
411         if not args.interactive and not args.no_hosttools:
412             cmds.extend([f"{optsep}h"])
413 
414         if args.interactive or len(cmds) != 1:
415             self.dbg("Run: ", cmds)
416             result = subprocess.run(cmds)
417             if result.returncode != 0:
418                 self.die(f"command \"{' '.join(cmds)}\" failed")
419 
420     def install_sdk(self, args, user_args):
421         version = self.detect_version(args)
422         (osname, arch) = self.os_arch_name()
423 
424         if args.personal_access_token:
425             req_headers = {
426                 "Authorization": f"Bearer {args.personal_access_token}",
427             }
428         else:
429             req_headers = {}
430 
431         self.inf("Fetching Zephyr SDK list...")
432         releases = self.fetch_releases(args.api_url, req_headers)
433         self.dbg("releases: ", "\n".join([x["tag_name"] for x in releases]))
434 
435         # checking version
436         def check_semver(version):
437             try:
438                 semver.Version.parse(version)
439                 return True
440             except Exception:
441                 return False
442 
443         available_versions = [
444             re.sub(r"^v", "", x["tag_name"])
445             for x in releases
446             if check_semver(re.sub(r"^v", "", x["tag_name"]))
447         ]
448 
449         if not version in available_versions:
450             self.die(
451                 f"Unavailable SDK version: {version}."
452                 + "Please select from the list below:\n"
453                 + "\n".join(available_versions)
454             )
455 
456         target_release = [x for x in releases if x["tag_name"] == f"v{version}"][0]
457 
458         # checking toolchains parameters
459         assets = target_release["assets"]
460         self.dbg("assets: ", "\n".join([x["browser_download_url"] for x in assets]))
461 
462         prefix = f"toolchain_{osname}-{arch}_"
463         available_toolchains = [
464             re.sub(r"\..*", "", x["name"].replace(prefix, ""))
465             for x in assets
466             if x["name"].startswith(prefix)
467         ]
468 
469         if args.toolchains:
470             for tc in args.toolchains:
471                 if not tc in available_toolchains:
472                     self.die(
473                         f"toolchain {tc} is not available.\n"
474                         + "Please select from the list below:\n"
475                         + "\n".join(available_toolchains)
476                     )
477 
478         installed_info = [v for (k, v) in self.fetch_sdk_info().items() if k == version]
479         if len(installed_info) == 0:
480             if args.install_dir:
481                 base_dir = Path(args.install_dir).parent
482                 dir_name = Path(args.install_dir).name
483             elif args.install_base:
484                 base_dir = Path(args.install_base)
485                 dir_name = None
486             else:
487                 base_dir = Path("~").expanduser()
488                 dir_name = None
489 
490             sdk_dir = self.download_and_extract(
491                 base_dir, dir_name, target_release, req_headers
492             )
493         else:
494             sdk_dir = Path(installed_info[0]["path"])
495             self.inf(
496                 f"Zephyr SDK version {version} is already installed at {str(sdk_dir)}. Using it."
497             )
498 
499         self.run_setup(args, sdk_dir)
500 
501     def fetch_sdk_info(self):
502         sdk_lines = []
503         try:
504             cmds = [
505                 "-P",
506                 str(Path(__file__).parent / "sdk" / "listsdk.cmake"),
507             ]
508 
509             output = zcmake.run_cmake(cmds, capture_output=True)
510             if output:
511                 # remove '-- Zephyr-sdk,' leader
512                 sdk_lines = [l[15:] for l in output if l.startswith("-- Zephyr-sdk,")]
513             else:
514                 sdk_lines = []
515 
516         except Exception as e:
517             self.die(e)
518 
519         def parse_sdk_entry(line):
520             class SdkEntry:
521                 def __init__(self):
522                     self.version = None
523                     self.path = None
524 
525             info = SdkEntry()
526             for ent in line.split(","):
527                 kv = ent.split("=")
528                 if kv[0].strip() == "ver":
529                     info.version = kv[1].strip()
530                 elif kv[0].strip() == "dir":
531                     info.path = kv[1].strip()
532 
533             return info
534 
535         sdk_info = {}
536         for sdk_ent in [parse_sdk_entry(l) for l in reversed(sdk_lines)]:
537             entry = {}
538 
539             ver = None
540             sdk_path = Path(sdk_ent.path)
541             sdk_version_path = sdk_path / "sdk_version"
542             if sdk_version_path.exists():
543                 with open(str(sdk_version_path)) as f:
544                     ver = f.readline().strip()
545             else:
546                 continue
547 
548             entry["path"] = sdk_path
549 
550             if (sdk_path / "sysroots").exists():
551                 entry["hosttools"] = "installed"
552 
553             # Identify toolchain directory by the existence of <toolchain>/bin/<toolchain>-gcc
554             if "Windows" == platform.system():
555                 gcc_postfix = "-gcc.exe"
556             else:
557                 gcc_postfix = "-gcc"
558 
559             toolchains = [
560                 tc.name
561                 for tc in sdk_path.iterdir()
562                 if (sdk_path / tc / "bin" / (tc.name + gcc_postfix)).exists()
563             ]
564 
565             if len(toolchains) > 0:
566                 entry["toolchains"] = toolchains
567 
568             if ver:
569                 sdk_info[ver] = entry
570 
571         return sdk_info
572 
573     def list_sdk(self):
574         sdk_info = self.fetch_sdk_info()
575 
576         if len(sdk_info) == 0:
577             self.die("No Zephyr SDK installed.")
578 
579         for k, v in sdk_info.items():
580             self.inf(f"{k}:")
581             self.inf(f"  path: {v['path']}")
582             if "hosttools" in v:
583                 self.inf(f"  hosttools: {v['hosttools']}")
584             if "toolchains" in v:
585                 self.inf("  installed-toolchains:")
586                 for tc in v["toolchains"]:
587                     self.inf(f"    - {tc}")
588 
589                 # Since version 0.15.2, the sdk_toolchains file is included,
590                 # so we can get information about available toolchains from there.
591                 if (Path(v["path"]) / "sdk_toolchains").exists():
592                     with open(Path(v["path"]) / "sdk_toolchains") as f:
593                         all_tcs = [l.strip() for l in f.readlines()]
594 
595                     self.inf("  available-toolchains:")
596                     for tc in all_tcs:
597                         if tc not in v["toolchains"]:
598                             self.inf(f"    - {tc}")
599 
600             self.inf()
601 
602     def do_run(self, args, user_args):
603         self.dbg("args: ", args)
604         if args.subcommand == "install":
605             self.install_sdk(args, user_args)
606         elif args.subcommand == "list" or not args.subcommand:
607             self.list_sdk()
608