1# Copyright (c) 2023 Nordic Semiconductor ASA.
2#
3# SPDX-License-Identifier: Apache-2.0
4
5'''Runner for flashing with nrfutil.'''
6
7import json
8import os
9import subprocess
10import sys
11from pathlib import Path
12
13from runners.core import _DRY_RUN
14from runners.nrf_common import NrfBinaryRunner
15
16
17class NrfUtilBinaryRunner(NrfBinaryRunner):
18    '''Runner front-end for nrfutil.'''
19
20    def __init__(self, cfg, family, softreset, dev_id, erase=False,
21                 reset=True, tool_opt=None, force=False, recover=False,
22                 suit_starter=False):
23
24        super().__init__(cfg, family, softreset, dev_id, erase, reset,
25                         tool_opt, force, recover)
26
27        self.suit_starter = suit_starter
28
29        self._ops = []
30        self._op_id = 1
31
32    @classmethod
33    def name(cls):
34        return 'nrfutil'
35
36    @classmethod
37    def tool_opt_help(cls) -> str:
38        return 'Additional options for nrfutil, e.g. "--log-level"'
39
40    @classmethod
41    def do_create(cls, cfg, args):
42        return NrfUtilBinaryRunner(cfg, args.nrf_family, args.softreset,
43                                   args.dev_id, erase=args.erase,
44                                   reset=args.reset,
45                                   tool_opt=args.tool_opt, force=args.force,
46                                   recover=args.recover,
47                                   suit_starter=args.suit_manifest_starter)
48
49    @classmethod
50    def do_add_parser(cls, parser):
51        super().do_add_parser(parser)
52        parser.add_argument('--suit-manifest-starter', required=False,
53                            action='store_true',
54                            help='Use the SUIT manifest starter file')
55
56    def _exec(self, args):
57        jout_all = []
58
59        cmd = ['nrfutil', '--json', 'device'] + args
60        self._log_cmd(cmd)
61
62        if _DRY_RUN:
63            return {}
64
65        with subprocess.Popen(cmd, stdout=subprocess.PIPE) as p:
66            for line in iter(p.stdout.readline, b''):
67                # https://github.com/ndjson/ndjson-spec
68                jout = json.loads(line.decode(sys.getdefaultencoding()))
69                jout_all.append(jout)
70
71                if 'x-execute-batch' in args:
72                    if jout['type'] == 'batch_update':
73                        pld = jout['data']['data']
74                        if (
75                            pld['type'] == 'task_progress' and
76                            pld['data']['progress']['progressPercentage'] == 0
77                        ):
78                            self.logger.info(pld['data']['progress']['description'])
79                    elif jout['type'] == 'batch_end' and jout['data']['error']:
80                        raise subprocess.CalledProcessError(
81                            jout['data']['error']['code'], cmd
82                        )
83
84        return jout_all
85
86    def do_get_boards(self):
87        out = self._exec(['list'])
88        devs = []
89        for o in out:
90            if o['type'] == 'task_end':
91                devs = o['data']['data']['devices']
92        snrs = [dev['serialNumber'] for dev in devs if dev['traits']['jlink']]
93
94        self.logger.debug(f'Found boards: {snrs}')
95        return snrs
96
97    def do_require(self):
98        self.require('nrfutil')
99
100    def _insert_op(self, op):
101        op['operationId'] = f'{self._op_id}'
102        self._op_id += 1
103        self._ops.append(op)
104
105    def _exec_batch(self):
106        # prepare the dictionary and convert to JSON
107        batch = json.dumps({'family': f'{self.family}',
108                            'operations': [op for op in self._ops]},
109                            indent=4) + '\n'
110
111        hex_dir = Path(self.hex_).parent
112        json_file = os.fspath(hex_dir / 'generated_nrfutil_batch.json')
113
114        with open(json_file, "w") as f:
115            f.write(batch)
116
117        # reset first in case an exception is thrown
118        self._ops = []
119        self._op_id = 1
120        self.logger.debug(f'Executing batch in: {json_file}')
121        self._exec(['x-execute-batch', '--batch-path', f'{json_file}',
122                    '--serial-number', f'{self.dev_id}'])
123
124    def do_exec_op(self, op, force=False):
125        self.logger.debug(f'Executing op: {op}')
126        if force:
127            if len(self._ops) != 0:
128                raise RuntimeError(f'Forced exec with {len(self._ops)} ops')
129            self._insert_op(op)
130            self._exec_batch()
131            return True
132        # Defer by default
133        return False
134
135    def flush_ops(self, force=True):
136        if not force:
137            return
138        while self.ops:
139            self._insert_op(self.ops.popleft())
140        self._exec_batch()
141