1# Copyright (c) 2023 Nordic Semiconductor ASA. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5'''Runner for flashing with nrfutil.''' 6 7import json 8import subprocess 9import sys 10from pathlib import Path 11 12from runners.core import _DRY_RUN 13from runners.nrf_common import NrfBinaryRunner 14 15 16class NrfUtilBinaryRunner(NrfBinaryRunner): 17 '''Runner front-end for nrfutil.''' 18 19 def __init__(self, cfg, family, softreset, pinreset, dev_id, erase=False, 20 reset=True, tool_opt=None, force=False, recover=False, 21 suit_starter=False, ext_mem_config_file=None): 22 23 super().__init__(cfg, family, softreset, pinreset, dev_id, erase, reset, 24 tool_opt, force, recover) 25 26 self.suit_starter = suit_starter 27 self.ext_mem_config_file = ext_mem_config_file 28 29 self._ops = [] 30 self._op_id = 1 31 32 @classmethod 33 def name(cls): 34 return 'nrfutil' 35 36 @classmethod 37 def capabilities(cls): 38 return NrfBinaryRunner._capabilities(mult_dev_ids=True) 39 40 @classmethod 41 def dev_id_help(cls) -> str: 42 return NrfBinaryRunner._dev_id_help() + \ 43 '''.\n This option can be specified multiple times''' 44 45 @classmethod 46 def tool_opt_help(cls) -> str: 47 return 'Additional options for nrfutil, e.g. "--log-level"' 48 49 @classmethod 50 def do_create(cls, cfg, args): 51 return NrfUtilBinaryRunner(cfg, args.nrf_family, args.softreset, 52 args.pinreset, args.dev_id, erase=args.erase, 53 reset=args.reset, 54 tool_opt=args.tool_opt, force=args.force, 55 recover=args.recover, 56 suit_starter=args.suit_manifest_starter, 57 ext_mem_config_file=args.ext_mem_config_file) 58 59 @classmethod 60 def do_add_parser(cls, parser): 61 super().do_add_parser(parser) 62 parser.add_argument('--suit-manifest-starter', required=False, 63 action='store_true', 64 help='Use the SUIT manifest starter file') 65 parser.add_argument('--ext-mem-config-file', required=False, 66 dest='ext_mem_config_file', 67 help='path to an JSON file with external memory configuration') 68 69 70 def _exec(self, args): 71 jout_all = [] 72 73 cmd = ['nrfutil', '--json', 'device'] + args 74 self._log_cmd(cmd) 75 76 if _DRY_RUN: 77 return {} 78 79 with subprocess.Popen(cmd, stdout=subprocess.PIPE) as p: 80 for line in iter(p.stdout.readline, b''): 81 # https://github.com/ndjson/ndjson-spec 82 jout = json.loads(line.decode(sys.getdefaultencoding())) 83 jout_all.append(jout) 84 85 if 'x-execute-batch' in args: 86 if jout['type'] == 'batch_update': 87 pld = jout['data']['data'] 88 if ( 89 pld['type'] == 'task_progress' and 90 pld['data']['progress']['progressPercentage'] == 0 91 ): 92 self.logger.info(pld['data']['progress']['description']) 93 elif jout['type'] == 'batch_end' and jout['data']['error']: 94 raise subprocess.CalledProcessError( 95 jout['data']['error']['code'], cmd 96 ) 97 98 return jout_all 99 100 def do_get_boards(self): 101 out = self._exec(['list']) 102 devs = [] 103 for o in out: 104 if o['type'] == 'task_end': 105 devs = o['data']['data']['devices'] 106 snrs = [dev['serialNumber'] for dev in devs if dev['traits']['jlink']] 107 108 self.logger.debug(f'Found boards: {snrs}') 109 return snrs 110 111 def do_require(self): 112 self.require('nrfutil') 113 114 def _insert_op(self, op): 115 op['operationId'] = f'{self._op_id}' 116 self._op_id += 1 117 self._ops.append(op) 118 119 def _format_dev_ids(self): 120 if isinstance(self.dev_id, list): 121 return ','.join(self.dev_id) 122 else: 123 return self.dev_id 124 125 def _append_batch(self, op, json_file): 126 _op = op['operation'] 127 op_type = _op['type'] 128 129 cmd = [f'{op_type}'] 130 131 if op_type == 'program': 132 cmd += ['--firmware', _op['firmware']['file']] 133 opts = _op['options'] 134 # populate the options 135 cmd.append('--options') 136 cli_opts = f"chip_erase_mode={opts['chip_erase_mode']}" 137 if opts.get('ext_mem_erase_mode'): 138 cli_opts += f",ext_mem_erase_mode={opts['ext_mem_erase_mode']}" 139 if opts.get('verify'): 140 cli_opts += f",verify={opts['verify']}" 141 cmd.append(cli_opts) 142 elif op_type == 'reset': 143 cmd += ['--reset-kind', _op['kind']] 144 elif op_type == 'erase': 145 cmd.append(f'--{_op["kind"]}') 146 147 cmd += ['--core', op['core']] if op.get('core') else [] 148 cmd += ['--x-family', f'{self.family}'] 149 cmd += ['--x-append-batch', f'{json_file}'] 150 self._exec(cmd) 151 152 def _exec_batch(self): 153 # Use x-append-batch to get the JSON from nrfutil itself 154 json_file = Path(self.hex_).parent / 'generated_nrfutil_batch.json' 155 json_file.unlink(missing_ok=True) 156 for op in self._ops: 157 self._append_batch(op, json_file) 158 159 # reset first in case an exception is thrown 160 self._ops = [] 161 self._op_id = 1 162 self.logger.debug(f'Executing batch in: {json_file}') 163 precmd = [] 164 if self.ext_mem_config_file: 165 # This needs to be prepended, as it's a global option 166 precmd = ['--x-ext-mem-config-file', self.ext_mem_config_file] 167 168 self._exec(precmd + ['x-execute-batch', '--batch-path', f'{json_file}', 169 '--serial-number', self._format_dev_ids()]) 170 171 def do_exec_op(self, op, force=False): 172 self.logger.debug(f'Executing op: {op}') 173 if force: 174 if len(self._ops) != 0: 175 raise RuntimeError(f'Forced exec with {len(self._ops)} ops') 176 self._insert_op(op) 177 self._exec_batch() 178 return True 179 # Defer by default 180 return False 181 182 def flush_ops(self, force=True): 183 if not force: 184 return 185 while self.ops: 186 self._insert_op(self.ops.popleft()) 187 self._exec_batch() 188