1# Copyright (c) 2020 Vestas Wind Systems A/S
2#
3# SPDX-License-Identifier: Apache-2.0
4
5'''Runner for performing program download over CANopen (DSP 302-3).'''
6
7import argparse
8import os
9import time
10
11from runners.core import ZephyrBinaryRunner, RunnerCaps
12
13try:
14    import canopen
15    from progress.bar import Bar
16    MISSING_REQUIREMENTS = False
17except ImportError:
18    MISSING_REQUIREMENTS = True
19
20# Default Python-CAN context to use, see python-can documentation for details
21DEFAULT_CAN_CONTEXT = 'default'
22
23# Default program number
24DEFAULT_PROGRAM_NUMBER = 1
25
26# Program download buffer size in bytes
27PROGRAM_DOWNLOAD_BUFFER_SIZE = 1024
28
29# Program download chunk size in bytes
30PROGRAM_DOWNLOAD_CHUNK_SIZE = PROGRAM_DOWNLOAD_BUFFER_SIZE // 2
31
32# Default timeouts and retries
33DEFAULT_TIMEOUT = 10.0 # seconds
34DEFAULT_SDO_TIMEOUT = 1 # seconds
35DEFAULT_SDO_RETRIES = 1
36
37# Object dictionary indexes
38H1F50_PROGRAM_DATA = 0x1F50
39H1F51_PROGRAM_CTRL = 0x1F51
40H1F56_PROGRAM_SWID = 0x1F56
41H1F57_FLASH_STATUS = 0x1F57
42
43# Program control commands
44PROGRAM_CTRL_STOP = 0x00
45PROGRAM_CTRL_START = 0x01
46PROGRAM_CTRL_RESET = 0x02
47PROGRAM_CTRL_CLEAR = 0x03
48PROGRAM_CTRL_ZEPHYR_CONFIRM = 0x80
49
50class ToggleAction(argparse.Action):
51    '''Toggle argument parser'''
52    def __call__(self, parser, namespace, values, option_string=None):
53        setattr(namespace, self.dest, not option_string.startswith('--no-'))
54
55class CANopenBinaryRunner(ZephyrBinaryRunner):
56    '''Runner front-end for CANopen.'''
57    def __init__(self, cfg, dev_id, can_context=DEFAULT_CAN_CONTEXT,
58                 program_number=DEFAULT_PROGRAM_NUMBER, confirm=True,
59                 confirm_only=True, timeout=DEFAULT_TIMEOUT,
60                 sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT,
61                 block_transfer=False):
62        if MISSING_REQUIREMENTS:
63            raise RuntimeError('one or more Python dependencies were missing; '
64                               "see the getting started guide for details on "
65                               "how to fix")
66
67        super().__init__(cfg)
68        self.dev_id = dev_id # Only use for error checking in do_run()
69        self.bin_file = cfg.bin_file
70        self.confirm = confirm
71        self.confirm_only = confirm_only
72        self.timeout = timeout
73        self.downloader = CANopenProgramDownloader(logger=self.logger,
74                                                   node_id=dev_id,
75                                                   can_context=can_context,
76                                                   program_number=program_number,
77                                                   sdo_retries=sdo_retries,
78                                                   sdo_timeout=sdo_timeout,
79                                                   block_transfer=block_transfer)
80
81    @classmethod
82    def name(cls):
83        return 'canopen'
84
85    @classmethod
86    def capabilities(cls):
87        return RunnerCaps(commands={'flash'}, dev_id=True, flash_addr=False)
88
89    @classmethod
90    def dev_id_help(cls) -> str:
91        return 'CANopen Node ID.'
92
93    @classmethod
94    def do_add_parser(cls, parser):
95        # Optional:
96        parser.add_argument('--node-id', dest='dev_id',
97                            help=cls.dev_id_help())
98
99        parser.add_argument('--can-context', default=DEFAULT_CAN_CONTEXT,
100                            help=f'Python-CAN context to use (default: {DEFAULT_CAN_CONTEXT})')
101        parser.add_argument('--program-number', type=int, default=DEFAULT_PROGRAM_NUMBER,
102                            help=f'program number (default: {DEFAULT_PROGRAM_NUMBER})')
103        parser.add_argument('--confirm', '--no-confirm',
104                            dest='confirm', nargs=0,
105                            action=ToggleAction,
106                            help='confirm after starting? (default: yes)')
107        parser.add_argument('--confirm-only', default=False, action='store_true',
108                            help='confirm only, no program download (default: no)')
109        parser.add_argument('--timeout', type=float, default=DEFAULT_TIMEOUT,
110                            help=f'Timeout in seconds (default: {DEFAULT_TIMEOUT})')
111        parser.add_argument('--sdo-retries', type=int, default=DEFAULT_SDO_RETRIES,
112                            help=f'CANopen SDO request retries (default: {DEFAULT_SDO_RETRIES})')
113        parser.add_argument('--sdo-timeout', type=float, default=DEFAULT_SDO_TIMEOUT,
114                            help=f'''CANopen SDO response timeout in seconds
115                            (default: {DEFAULT_SDO_TIMEOUT})''')
116        parser.add_argument('--block-transfer', default=False, action='store_true',
117                            help='Use SDO block transfers (experimental, default: no)')
118
119        parser.set_defaults(confirm=True)
120
121    @classmethod
122    def do_create(cls, cfg, args):
123        return CANopenBinaryRunner(cfg, int(args.dev_id),
124                                   can_context=args.can_context,
125                                   program_number=args.program_number,
126                                   confirm=args.confirm,
127                                   confirm_only=args.confirm_only,
128                                   timeout=args.timeout,
129                                   sdo_retries=args.sdo_retries,
130                                   sdo_timeout=args.sdo_timeout,
131                                   block_transfer=args.block_transfer)
132
133    def do_run(self, command, **kwargs):
134        if not self.dev_id:
135            raise RuntimeError('Please specify a CANopen node ID with the '
136                               '-i/--dev-id or --node-id command-line switch.')
137        if command == 'flash':
138            self.flash(**kwargs)
139
140    def flash(self, **kwargs):
141        '''Download program to flash over CANopen'''
142        self.ensure_output('bin')
143        self.logger.info('Using Node ID %d, program number %d',
144                         self.downloader.node_id,
145                         self.downloader.program_number)
146
147        self.downloader.connect()
148        status = self.downloader.wait_for_flash_status_ok(self.timeout)
149        if status == 0:
150            self.downloader.swid()
151        else:
152            self.logger.warning('Flash status 0x{:02x}, '
153                                'skipping software identification'.format(status))
154
155        self.downloader.enter_pre_operational()
156
157        if self.confirm_only:
158            self.downloader.zephyr_confirm_program()
159            self.downloader.disconnect()
160            return
161
162        if self.bin_file is None:
163            raise ValueError('Cannot download program; bin_file is missing')
164
165        self.downloader.stop_program()
166        self.downloader.clear_program()
167        self.downloader.wait_for_flash_status_ok(self.timeout)
168        self.downloader.download(self.bin_file)
169
170        status = self.downloader.wait_for_flash_status_ok(self.timeout)
171        if status != 0:
172            raise ValueError('Program download failed: '
173                             'flash status 0x{:02x}'.format(status))
174
175        self.downloader.swid()
176        self.downloader.start_program()
177        self.downloader.wait_for_bootup(self.timeout)
178        self.downloader.swid()
179
180        if self.confirm:
181            self.downloader.enter_pre_operational()
182            self.downloader.zephyr_confirm_program()
183
184        self.downloader.disconnect()
185
186class CANopenProgramDownloader(object):
187    '''CANopen program downloader'''
188    def __init__(self, logger, node_id, can_context=DEFAULT_CAN_CONTEXT,
189                 program_number=DEFAULT_PROGRAM_NUMBER,
190                 sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT,
191                 block_transfer=False):
192        super(CANopenProgramDownloader, self).__init__()
193        self.logger = logger
194        self.node_id = node_id
195        self.can_context = can_context
196        self.program_number = program_number
197        self.network = canopen.Network()
198        self.node = self.network.add_node(self.node_id,
199                                          self.create_object_dictionary())
200        self.data_sdo = self.node.sdo[H1F50_PROGRAM_DATA][self.program_number]
201        self.ctrl_sdo = self.node.sdo[H1F51_PROGRAM_CTRL][self.program_number]
202        self.swid_sdo = self.node.sdo[H1F56_PROGRAM_SWID][self.program_number]
203        self.flash_sdo = self.node.sdo[H1F57_FLASH_STATUS][self.program_number]
204
205        self.node.sdo.MAX_RETRIES = sdo_retries
206        self.node.sdo.RESPONSE_TIMEOUT = sdo_timeout
207
208        self.block_transfer = block_transfer
209
210    def connect(self):
211        '''Connect to CAN network'''
212        try:
213            self.network.connect(context=self.can_context)
214        except:
215            raise ValueError('Unable to connect to CAN network')
216
217    def disconnect(self):
218        '''Disconnect from CAN network'''
219        self.network.disconnect()
220
221    def enter_pre_operational(self):
222        '''Enter pre-operational NMT state'''
223        self.logger.info("Entering pre-operational mode")
224        try:
225            self.node.nmt.state = 'PRE-OPERATIONAL'
226        except:
227            raise ValueError('Failed to enter pre-operational mode')
228
229    def _ctrl_program(self, cmd):
230        '''Write program control command to CANopen object dictionary (0x1f51)'''
231        try:
232            self.ctrl_sdo.raw = cmd
233        except:
234            raise ValueError('Unable to write control command 0x{:02x}'.format(cmd))
235
236    def stop_program(self):
237        '''Write stop control command to CANopen object dictionary (0x1f51)'''
238        self.logger.info('Stopping program')
239        self._ctrl_program(PROGRAM_CTRL_STOP)
240
241    def start_program(self):
242        '''Write start control command to CANopen object dictionary (0x1f51)'''
243        self.logger.info('Starting program')
244        self._ctrl_program(PROGRAM_CTRL_START)
245
246    def clear_program(self):
247        '''Write clear control command to CANopen object dictionary (0x1f51)'''
248        self.logger.info('Clearing program')
249        self._ctrl_program(PROGRAM_CTRL_CLEAR)
250
251    def zephyr_confirm_program(self):
252        '''Write confirm control command to CANopen object dictionary (0x1f51)'''
253        self.logger.info('Confirming program')
254        self._ctrl_program(PROGRAM_CTRL_ZEPHYR_CONFIRM)
255
256    def swid(self):
257        '''Read software identification from CANopen object dictionary (0x1f56)'''
258        try:
259            swid = self.swid_sdo.raw
260        except:
261            raise ValueError('Failed to read software identification')
262        self.logger.info('Program software identification: 0x{:08x}'.format(swid))
263        return swid
264
265    def flash_status(self):
266        '''Read flash status identification'''
267        try:
268            status = self.flash_sdo.raw
269        except:
270            raise ValueError('Failed to read flash status identification')
271        return status
272
273    def download(self, bin_file):
274        '''Download program to CANopen object dictionary (0x1f50)'''
275        self.logger.info('Downloading program: %s', bin_file)
276        try:
277            size = os.path.getsize(bin_file)
278            infile = open(bin_file, 'rb')
279            outfile = self.data_sdo.open('wb', buffering=PROGRAM_DOWNLOAD_BUFFER_SIZE,
280                                         size=size, block_transfer=self.block_transfer)
281
282            progress = Bar('%(percent)d%%', max=size, suffix='%(index)d/%(max)dB')
283            while True:
284                chunk = infile.read(PROGRAM_DOWNLOAD_CHUNK_SIZE)
285                if not chunk:
286                    break
287                outfile.write(chunk)
288                progress.next(n=len(chunk))
289        except:
290            raise ValueError('Failed to download program')
291        finally:
292            progress.finish()
293            infile.close()
294            outfile.close()
295
296    def wait_for_bootup(self, timeout=DEFAULT_TIMEOUT):
297        '''Wait for boot-up message reception'''
298        self.logger.info('Waiting for boot-up message...')
299        try:
300            self.node.nmt.wait_for_bootup(timeout=timeout)
301        except:
302            raise ValueError('Timeout waiting for boot-up message')
303
304    def wait_for_flash_status_ok(self, timeout=DEFAULT_TIMEOUT):
305        '''Wait for flash status ok'''
306        self.logger.info('Waiting for flash status ok')
307        end_time = time.time() + timeout
308        while True:
309            now = time.time()
310            status = self.flash_status()
311            if status == 0:
312                break
313
314            if now > end_time:
315                return status
316
317        return status
318
319    @staticmethod
320    def create_object_dictionary():
321        '''Create a synthetic CANopen object dictionary for program download'''
322        objdict = canopen.objectdictionary.ObjectDictionary()
323
324        array = canopen.objectdictionary.Array('Program data', 0x1f50)
325        member = canopen.objectdictionary.Variable('', 0x1f50, subindex=1)
326        member.data_type = canopen.objectdictionary.DOMAIN
327        array.add_member(member)
328        objdict.add_object(array)
329
330        array = canopen.objectdictionary.Array('Program control', 0x1f51)
331        member = canopen.objectdictionary.Variable('', 0x1f51, subindex=1)
332        member.data_type = canopen.objectdictionary.UNSIGNED8
333        array.add_member(member)
334        objdict.add_object(array)
335
336        array = canopen.objectdictionary.Array('Program software ID', 0x1f56)
337        member = canopen.objectdictionary.Variable('', 0x1f56, subindex=1)
338        member.data_type = canopen.objectdictionary.UNSIGNED32
339        array.add_member(member)
340        objdict.add_object(array)
341
342        array = canopen.objectdictionary.Array('Flash error ID', 0x1f57)
343        member = canopen.objectdictionary.Variable('', 0x1f57, subindex=1)
344        member.data_type = canopen.objectdictionary.UNSIGNED32
345        array.add_member(member)
346        objdict.add_object(array)
347
348        return objdict
349