1# Copyright (c) 2022-2024 Intel Corporation
2#
3# SPDX-License-Identifier: Apache-2.0
4
5'''Runner for flashing with the Intel ADSP boards.'''
6
7import argparse
8import hashlib
9import os
10import random
11import re
12import shutil
13import sys
14
15from zephyr_ext_common import ZEPHYR_BASE
16
17from runners.core import RunnerCaps, ZephyrBinaryRunner
18
19DEFAULT_CAVSTOOL='soc/intel/intel_adsp/tools/cavstool_client.py'
20
21class SignParamError(argparse.Action):
22    'User-friendly feedback when trying to sign with west flash'
23    def __call__(self, parser, namespace, values, option_string=None):
24        parser.error(f'Cannot use "west flash {option_string} ..." any more. ' +
25                     '"west sign" is now called from CMake, see "west sign -h"')
26
27class IntelAdspBinaryRunner(ZephyrBinaryRunner):
28    '''Runner front-end for the intel ADSP boards.'''
29
30    def __init__(self,
31                 cfg,
32                 remote_host,
33                 pty,
34                 tool_opt,
35                 ):
36        super().__init__(cfg)
37
38        self.remote_host = remote_host
39        self.bin_fw = os.path.join(cfg.build_dir, 'zephyr', 'zephyr.ri')
40
41        self.cavstool = os.path.join(ZEPHYR_BASE, DEFAULT_CAVSTOOL)
42        self.platform = os.path.basename(cfg.board_dir)
43        self.pty = pty
44
45        self.tool_opt_args = tool_opt
46
47    @classmethod
48    def name(cls):
49        return 'intel_adsp'
50
51    @classmethod
52    def capabilities(cls):
53        return RunnerCaps(commands={'flash'}, tool_opt=True)
54
55    @classmethod
56    def do_add_parser(cls, parser):
57        parser.add_argument('--remote-host',
58                            help='hostname of the remote targeting ADSP board')
59        parser.add_argument('--pty', nargs='?', const="remote-host", type=str,
60                            help=''''Capture the output of cavstool.py running on --remote-host \
61                            and stream it remotely to west's standard output.''')
62
63        for old_sign_param in [ '--rimage-tool', '--config-dir', '--default-key', '--key']:
64            parser.add_argument(old_sign_param, action=SignParamError,
65                                help='''do not use, "west sign" is now called from CMake,
66                                see "west sign -h"''')
67
68    @classmethod
69    def tool_opt_help(cls) -> str:
70        return """Additional options for run/request service tool,
71        e.g. '--lock' """
72
73    @classmethod
74    def do_create(cls, cfg, args):
75        return IntelAdspBinaryRunner(cfg,
76                                    remote_host=args.remote_host,
77                                    pty=args.pty,
78                                    tool_opt=args.tool_opt,
79                                    )
80
81    def do_run(self, command, **kwargs):
82        self.logger.info('Starting Intel ADSP runner')
83
84        if re.search("adsp", self.platform):
85            self.require(self.cavstool)
86            self.flash(**kwargs)
87        else:
88            self.logger.error("No suitable platform for running")
89            sys.exit(1)
90
91    def flash(self, **kwargs):
92        'Generate a hash string for appending to the sending ri file'
93        hash_object = hashlib.md5(self.bin_fw.encode())
94        random_str = f"{random.getrandbits(64)}".encode()
95        hash_object.update(random_str)
96        send_bin_fw = str(self.bin_fw + "." + hash_object.hexdigest())
97        shutil.copy(self.bin_fw, send_bin_fw)
98
99        # Copy the zephyr to target remote ADSP host and run
100        self.run_cmd = ([f'{self.cavstool}','-s', f'{self.remote_host}', f'{send_bin_fw}'])
101
102        # Add the extra tool options to run/request service tool
103        if self.tool_opt_args:
104            self.run_cmd = self.run_cmd + self.tool_opt_args
105
106        self.logger.debug(f"rcmd: {self.run_cmd}")
107
108        self.check_call(self.run_cmd)
109
110        # If the self.pty is assigned, the log will output to stdout
111        # directly. That means you don't have to execute the command:
112        #
113        #   cavstool_client.py -s {host}:{port} -l
114        #
115        # to get the result later separately.
116        if self.pty is not None:
117            if self.pty == 'remote-host':
118                self.log_cmd = ([f'{self.cavstool}','-s', f'{self.remote_host}', '-l'])
119            else:
120                self.log_cmd = ([f'{self.cavstool}','-s', f'{self.pty}', '-l'])
121
122            self.logger.debug(f"rcmd: {self.log_cmd}")
123
124            self.check_call(self.log_cmd)
125