1# Copyright (c) 2023 Nordic Semiconductor ASA. 2# 3# SPDX-License-Identifier: Apache-2.0 4 5'''Runner for flashing with nrfutil.''' 6 7import json 8import os 9import subprocess 10import sys 11from pathlib import Path 12 13from runners.core import _DRY_RUN 14from runners.nrf_common import NrfBinaryRunner 15 16 17class NrfUtilBinaryRunner(NrfBinaryRunner): 18 '''Runner front-end for nrfutil.''' 19 20 def __init__(self, cfg, family, softreset, dev_id, erase=False, 21 reset=True, tool_opt=None, force=False, recover=False, 22 suit_starter=False): 23 24 super().__init__(cfg, family, softreset, dev_id, erase, reset, 25 tool_opt, force, recover) 26 27 self.suit_starter = suit_starter 28 29 self._ops = [] 30 self._op_id = 1 31 32 @classmethod 33 def name(cls): 34 return 'nrfutil' 35 36 @classmethod 37 def tool_opt_help(cls) -> str: 38 return 'Additional options for nrfutil, e.g. "--log-level"' 39 40 @classmethod 41 def do_create(cls, cfg, args): 42 return NrfUtilBinaryRunner(cfg, args.nrf_family, args.softreset, 43 args.dev_id, erase=args.erase, 44 reset=args.reset, 45 tool_opt=args.tool_opt, force=args.force, 46 recover=args.recover, 47 suit_starter=args.suit_manifest_starter) 48 49 @classmethod 50 def do_add_parser(cls, parser): 51 super().do_add_parser(parser) 52 parser.add_argument('--suit-manifest-starter', required=False, 53 action='store_true', 54 help='Use the SUIT manifest starter file') 55 56 def _exec(self, args): 57 jout_all = [] 58 59 cmd = ['nrfutil', '--json', 'device'] + args 60 self._log_cmd(cmd) 61 62 if _DRY_RUN: 63 return {} 64 65 with subprocess.Popen(cmd, stdout=subprocess.PIPE) as p: 66 for line in iter(p.stdout.readline, b''): 67 # https://github.com/ndjson/ndjson-spec 68 jout = json.loads(line.decode(sys.getdefaultencoding())) 69 jout_all.append(jout) 70 71 if 'x-execute-batch' in args: 72 if jout['type'] == 'batch_update': 73 pld = jout['data']['data'] 74 if ( 75 pld['type'] == 'task_progress' and 76 pld['data']['progress']['progressPercentage'] == 0 77 ): 78 self.logger.info(pld['data']['progress']['description']) 79 elif jout['type'] == 'batch_end' and jout['data']['error']: 80 raise subprocess.CalledProcessError( 81 jout['data']['error']['code'], cmd 82 ) 83 84 return jout_all 85 86 def do_get_boards(self): 87 out = self._exec(['list']) 88 devs = [] 89 for o in out: 90 if o['type'] == 'task_end': 91 devs = o['data']['data']['devices'] 92 snrs = [dev['serialNumber'] for dev in devs if dev['traits']['jlink']] 93 94 self.logger.debug(f'Found boards: {snrs}') 95 return snrs 96 97 def do_require(self): 98 self.require('nrfutil') 99 100 def _insert_op(self, op): 101 op['operationId'] = f'{self._op_id}' 102 self._op_id += 1 103 self._ops.append(op) 104 105 def _exec_batch(self): 106 # prepare the dictionary and convert to JSON 107 batch = json.dumps({'family': f'{self.family}', 108 'operations': [op for op in self._ops]}, 109 indent=4) + '\n' 110 111 hex_dir = Path(self.hex_).parent 112 json_file = os.fspath(hex_dir / 'generated_nrfutil_batch.json') 113 114 with open(json_file, "w") as f: 115 f.write(batch) 116 117 # reset first in case an exception is thrown 118 self._ops = [] 119 self._op_id = 1 120 self.logger.debug(f'Executing batch in: {json_file}') 121 self._exec(['x-execute-batch', '--batch-path', f'{json_file}', 122 '--serial-number', f'{self.dev_id}']) 123 124 def do_exec_op(self, op, force=False): 125 self.logger.debug(f'Executing op: {op}') 126 if force: 127 if len(self._ops) != 0: 128 raise RuntimeError(f'Forced exec with {len(self._ops)} ops') 129 self._insert_op(op) 130 self._exec_batch() 131 return True 132 # Defer by default 133 return False 134 135 def flush_ops(self, force=True): 136 if not force: 137 return 138 while self.ops: 139 self._insert_op(self.ops.popleft()) 140 self._exec_batch() 141