1# Copyright (c) 2023 Nordic Semiconductor ASA.
2#
3# SPDX-License-Identifier: Apache-2.0
4
5'''Runner for flashing with nrfutil.'''
6
7import json
8import subprocess
9import sys
10from pathlib import Path
11
12from runners.core import _DRY_RUN
13from runners.nrf_common import NrfBinaryRunner
14
15
16class NrfUtilBinaryRunner(NrfBinaryRunner):
17    '''Runner front-end for nrfutil.'''
18
19    def __init__(self, cfg, family, softreset, pinreset, dev_id, erase=False,
20                 reset=True, tool_opt=None, force=False, recover=False,
21                 suit_starter=False, ext_mem_config_file=None):
22
23        super().__init__(cfg, family, softreset, pinreset, dev_id, erase, reset,
24                         tool_opt, force, recover)
25
26        self.suit_starter = suit_starter
27        self.ext_mem_config_file = ext_mem_config_file
28
29        self._ops = []
30        self._op_id = 1
31
32    @classmethod
33    def name(cls):
34        return 'nrfutil'
35
36    @classmethod
37    def capabilities(cls):
38        return NrfBinaryRunner._capabilities(mult_dev_ids=True)
39
40    @classmethod
41    def dev_id_help(cls) -> str:
42        return NrfBinaryRunner._dev_id_help() + \
43               '''.\n This option can be specified multiple times'''
44
45    @classmethod
46    def tool_opt_help(cls) -> str:
47        return 'Additional options for nrfutil, e.g. "--log-level"'
48
49    @classmethod
50    def do_create(cls, cfg, args):
51        return NrfUtilBinaryRunner(cfg, args.nrf_family, args.softreset,
52                                   args.pinreset, args.dev_id, erase=args.erase,
53                                   reset=args.reset,
54                                   tool_opt=args.tool_opt, force=args.force,
55                                   recover=args.recover,
56                                   suit_starter=args.suit_manifest_starter,
57                                   ext_mem_config_file=args.ext_mem_config_file)
58
59    @classmethod
60    def do_add_parser(cls, parser):
61        super().do_add_parser(parser)
62        parser.add_argument('--suit-manifest-starter', required=False,
63                            action='store_true',
64                            help='Use the SUIT manifest starter file')
65        parser.add_argument('--ext-mem-config-file', required=False,
66                            dest='ext_mem_config_file',
67                            help='path to an JSON file with external memory configuration')
68
69
70    def _exec(self, args):
71        jout_all = []
72
73        cmd = ['nrfutil', '--json', 'device'] + args
74        self._log_cmd(cmd)
75
76        if _DRY_RUN:
77            return {}
78
79        with subprocess.Popen(cmd, stdout=subprocess.PIPE) as p:
80            for line in iter(p.stdout.readline, b''):
81                # https://github.com/ndjson/ndjson-spec
82                jout = json.loads(line.decode(sys.getdefaultencoding()))
83                jout_all.append(jout)
84
85                if 'x-execute-batch' in args:
86                    if jout['type'] == 'batch_update':
87                        pld = jout['data']['data']
88                        if (
89                            pld['type'] == 'task_progress' and
90                            pld['data']['progress']['progressPercentage'] == 0
91                        ):
92                            self.logger.info(pld['data']['progress']['description'])
93                    elif jout['type'] == 'batch_end' and jout['data']['error']:
94                        raise subprocess.CalledProcessError(
95                            jout['data']['error']['code'], cmd
96                        )
97
98        return jout_all
99
100    def do_get_boards(self):
101        out = self._exec(['list'])
102        devs = []
103        for o in out:
104            if o['type'] == 'task_end':
105                devs = o['data']['data']['devices']
106        snrs = [dev['serialNumber'] for dev in devs if dev['traits']['jlink']]
107
108        self.logger.debug(f'Found boards: {snrs}')
109        return snrs
110
111    def do_require(self):
112        self.require('nrfutil')
113
114    def _insert_op(self, op):
115        op['operationId'] = f'{self._op_id}'
116        self._op_id += 1
117        self._ops.append(op)
118
119    def _format_dev_ids(self):
120        if isinstance(self.dev_id, list):
121            return ','.join(self.dev_id)
122        else:
123            return self.dev_id
124
125    def _append_batch(self, op, json_file):
126        _op = op['operation']
127        op_type = _op['type']
128
129        cmd = [f'{op_type}']
130
131        if op_type == 'program':
132            cmd += ['--firmware', _op['firmware']['file']]
133            opts = _op['options']
134            # populate the options
135            cmd.append('--options')
136            cli_opts = f"chip_erase_mode={opts['chip_erase_mode']}"
137            if opts.get('ext_mem_erase_mode'):
138                cli_opts += f",ext_mem_erase_mode={opts['ext_mem_erase_mode']}"
139            if opts.get('verify'):
140                cli_opts += f",verify={opts['verify']}"
141            cmd.append(cli_opts)
142        elif op_type == 'reset':
143            cmd += ['--reset-kind', _op['kind']]
144        elif op_type == 'erase':
145            cmd.append(f'--{_op["kind"]}')
146
147        cmd += ['--core', op['core']] if op.get('core') else []
148        cmd += ['--x-family', f'{self.family}']
149        cmd += ['--x-append-batch', f'{json_file}']
150        self._exec(cmd)
151
152    def _exec_batch(self):
153        # Use x-append-batch to get the JSON from nrfutil itself
154        json_file = Path(self.hex_).parent / 'generated_nrfutil_batch.json'
155        json_file.unlink(missing_ok=True)
156        for op in self._ops:
157            self._append_batch(op, json_file)
158
159        # reset first in case an exception is thrown
160        self._ops = []
161        self._op_id = 1
162        self.logger.debug(f'Executing batch in: {json_file}')
163        precmd = []
164        if self.ext_mem_config_file:
165            # This needs to be prepended, as it's a global option
166            precmd = ['--x-ext-mem-config-file', self.ext_mem_config_file]
167
168        self._exec(precmd + ['x-execute-batch', '--batch-path', f'{json_file}',
169                             '--serial-number', self._format_dev_ids()])
170
171    def do_exec_op(self, op, force=False):
172        self.logger.debug(f'Executing op: {op}')
173        if force:
174            if len(self._ops) != 0:
175                raise RuntimeError(f'Forced exec with {len(self._ops)} ops')
176            self._insert_op(op)
177            self._exec_batch()
178            return True
179        # Defer by default
180        return False
181
182    def flush_ops(self, force=True):
183        if not force:
184            return
185        while self.ops:
186            self._insert_op(self.ops.popleft())
187        self._exec_batch()
188