1# Copyright (c) 2020 Vestas Wind Systems A/S
2#
3# SPDX-License-Identifier: Apache-2.0
4
5'''Runner for performing program download over CANopen (DSP 302-3).'''
6
7import argparse
8import os
9import time
10
11from runners.core import ZephyrBinaryRunner, RunnerCaps
12
13try:
14    import canopen
15    from progress.bar import Bar
16    MISSING_REQUIREMENTS = False
17except ImportError:
18    MISSING_REQUIREMENTS = True
19
20# Default Python-CAN context to use, see python-can documentation for details
21DEFAULT_CAN_CONTEXT = 'default'
22
23# Default program number
24DEFAULT_PROGRAM_NUMBER = 1
25
26# Default timeouts and retries
27DEFAULT_TIMEOUT = 10.0 # seconds
28DEFAULT_SDO_TIMEOUT = 0.3 # seconds
29DEFAULT_SDO_RETRIES = 1
30
31# Object dictionary indexes
32H1F50_PROGRAM_DATA = 0x1F50
33H1F51_PROGRAM_CTRL = 0x1F51
34H1F56_PROGRAM_SWID = 0x1F56
35H1F57_FLASH_STATUS = 0x1F57
36
37# Program control commands
38PROGRAM_CTRL_STOP = 0x00
39PROGRAM_CTRL_START = 0x01
40PROGRAM_CTRL_RESET = 0x02
41PROGRAM_CTRL_CLEAR = 0x03
42PROGRAM_CTRL_ZEPHYR_CONFIRM = 0x80
43
44class ToggleAction(argparse.Action):
45    '''Toggle argument parser'''
46    def __call__(self, parser, namespace, values, option_string=None):
47        setattr(namespace, self.dest, not option_string.startswith('--no-'))
48
49class CANopenBinaryRunner(ZephyrBinaryRunner):
50    '''Runner front-end for CANopen.'''
51    def __init__(self, cfg, node_id, can_context=DEFAULT_CAN_CONTEXT,
52                 program_number=DEFAULT_PROGRAM_NUMBER, confirm=True,
53                 confirm_only=True, timeout=DEFAULT_TIMEOUT,
54                 sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT):
55        if MISSING_REQUIREMENTS:
56            raise RuntimeError('one or more Python dependencies were missing; '
57                               "see the getting started guide for details on "
58                               "how to fix")
59
60        super().__init__(cfg)
61        self.bin_file = cfg.bin_file
62        self.confirm = confirm
63        self.confirm_only = confirm_only
64        self.timeout = timeout
65        self.downloader = CANopenProgramDownloader(logger=self.logger,
66                                                   node_id=node_id,
67                                                   can_context=can_context,
68                                                   program_number=program_number,
69                                                   sdo_retries=sdo_retries,
70                                                   sdo_timeout=sdo_timeout)
71
72    @classmethod
73    def name(cls):
74        return 'canopen'
75
76    @classmethod
77    def capabilities(cls):
78        return RunnerCaps(commands={'flash'}, flash_addr=False)
79
80    @classmethod
81    def do_add_parser(cls, parser):
82        # Required:
83        parser.add_argument('--node-id', required=True, help='Node ID')
84
85        # Optional:
86        parser.add_argument('--can-context', default=DEFAULT_CAN_CONTEXT,
87                            help=f'Python-CAN context to use (default: {DEFAULT_CAN_CONTEXT})')
88        parser.add_argument('--program-number', type=int, default=DEFAULT_PROGRAM_NUMBER,
89                            help=f'program number (default: {DEFAULT_PROGRAM_NUMBER})')
90        parser.add_argument('--confirm', '--no-confirm',
91                            dest='confirm', nargs=0,
92                            action=ToggleAction,
93                            help='confirm after starting? (default: yes)')
94        parser.add_argument('--confirm-only', default=False, action='store_true',
95                            help='confirm only, no program download (default: no)')
96        parser.add_argument('--timeout', type=float, default=DEFAULT_TIMEOUT,
97                            help=f'Timeout in seconds (default: {DEFAULT_TIMEOUT})')
98        parser.add_argument('--sdo-retries', type=int, default=DEFAULT_SDO_RETRIES,
99                            help=f'CANopen SDO request retries (default: {DEFAULT_SDO_RETRIES})')
100        parser.add_argument('--sdo-timeout', type=float, default=DEFAULT_SDO_TIMEOUT,
101                            help=f'''CANopen SDO response timeout in seconds
102                            (default: {DEFAULT_SDO_TIMEOUT})''')
103
104        parser.set_defaults(confirm=True)
105
106    @classmethod
107    def do_create(cls, cfg, args):
108        return CANopenBinaryRunner(cfg, int(args.node_id),
109                                   can_context=args.can_context,
110                                   program_number=args.program_number,
111                                   confirm=args.confirm,
112                                   confirm_only=args.confirm_only,
113                                   timeout=args.timeout,
114                                   sdo_retries=args.sdo_retries,
115                                   sdo_timeout=args.sdo_timeout)
116
117    def do_run(self, command, **kwargs):
118        if command == 'flash':
119            self.flash(**kwargs)
120
121    def flash(self, **kwargs):
122        '''Download program to flash over CANopen'''
123        self.ensure_output('bin')
124        self.logger.info('Using Node ID %d, program number %d',
125                         self.downloader.node_id,
126                         self.downloader.program_number)
127
128        self.downloader.connect()
129        status = self.downloader.wait_for_flash_status_ok(self.timeout)
130        if status == 0:
131            self.downloader.swid()
132        else:
133            self.logger.warning('Flash status 0x{:02x}, '
134                                'skipping software identification'.format(status))
135
136        self.downloader.enter_pre_operational()
137
138        if self.confirm_only:
139            self.downloader.zephyr_confirm_program()
140            self.downloader.disconnect()
141            return
142
143        if self.bin_file is None:
144            raise ValueError('Cannot download program; bin_file is missing')
145
146        self.downloader.stop_program()
147        self.downloader.clear_program()
148        self.downloader.wait_for_flash_status_ok(self.timeout)
149        self.downloader.download(self.bin_file)
150
151        status = self.downloader.wait_for_flash_status_ok(self.timeout)
152        if status != 0:
153            raise ValueError('Program download failed: '
154                             'flash status 0x{:02x}'.format(status))
155
156        self.downloader.swid()
157        self.downloader.start_program()
158        self.downloader.wait_for_bootup(self.timeout)
159        self.downloader.swid()
160
161        if self.confirm:
162            self.downloader.enter_pre_operational()
163            self.downloader.zephyr_confirm_program()
164
165        self.downloader.disconnect()
166
167class CANopenProgramDownloader(object):
168    '''CANopen program downloader'''
169    def __init__(self, logger, node_id, can_context=DEFAULT_CAN_CONTEXT,
170                 program_number=DEFAULT_PROGRAM_NUMBER,
171                 sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT):
172        super(CANopenProgramDownloader, self).__init__()
173        self.logger = logger
174        self.node_id = node_id
175        self.can_context = can_context
176        self.program_number = program_number
177        self.network = canopen.Network()
178        self.node = self.network.add_node(self.node_id,
179                                          self.create_object_dictionary())
180        self.data_sdo = self.node.sdo[H1F50_PROGRAM_DATA][self.program_number]
181        self.ctrl_sdo = self.node.sdo[H1F51_PROGRAM_CTRL][self.program_number]
182        self.swid_sdo = self.node.sdo[H1F56_PROGRAM_SWID][self.program_number]
183        self.flash_sdo = self.node.sdo[H1F57_FLASH_STATUS][self.program_number]
184
185        self.node.sdo.MAX_RETRIES = sdo_retries
186        self.node.sdo.RESPONSE_TIMEOUT = sdo_timeout
187
188    def connect(self):
189        '''Connect to CAN network'''
190        try:
191            self.network.connect(context=self.can_context)
192        except:
193            raise ValueError('Unable to connect to CAN network')
194
195    def disconnect(self):
196        '''Disconnect from CAN network'''
197        self.network.disconnect()
198
199    def enter_pre_operational(self):
200        '''Enter pre-operational NMT state'''
201        self.logger.info("Entering pre-operational mode")
202        try:
203            self.node.nmt.state = 'PRE-OPERATIONAL'
204        except:
205            raise ValueError('Failed to enter pre-operational mode')
206
207    def _ctrl_program(self, cmd):
208        '''Write program control command to CANopen object dictionary (0x1f51)'''
209        try:
210            self.ctrl_sdo.raw = cmd
211        except:
212            raise ValueError('Unable to write control command 0x{:02x}'.format(cmd))
213
214    def stop_program(self):
215        '''Write stop control command to CANopen object dictionary (0x1f51)'''
216        self.logger.info('Stopping program')
217        self._ctrl_program(PROGRAM_CTRL_STOP)
218
219    def start_program(self):
220        '''Write start control command to CANopen object dictionary (0x1f51)'''
221        self.logger.info('Starting program')
222        self._ctrl_program(PROGRAM_CTRL_START)
223
224    def clear_program(self):
225        '''Write clear control command to CANopen object dictionary (0x1f51)'''
226        self.logger.info('Clearing program')
227        self._ctrl_program(PROGRAM_CTRL_CLEAR)
228
229    def zephyr_confirm_program(self):
230        '''Write confirm control command to CANopen object dictionary (0x1f51)'''
231        self.logger.info('Confirming program')
232        self._ctrl_program(PROGRAM_CTRL_ZEPHYR_CONFIRM)
233
234    def swid(self):
235        '''Read software identification from CANopen object dictionary (0x1f56)'''
236        try:
237            swid = self.swid_sdo.raw
238        except:
239            raise ValueError('Failed to read software identification')
240        self.logger.info('Program software identification: 0x{:08x}'.format(swid))
241        return swid
242
243    def flash_status(self):
244        '''Read flash status identification'''
245        try:
246            status = self.flash_sdo.raw
247        except:
248            raise ValueError('Failed to read flash status identification')
249        return status
250
251    def download(self, bin_file):
252        '''Download program to CANopen object dictionary (0x1f50)'''
253        self.logger.info('Downloading program: %s', bin_file)
254        try:
255            size = os.path.getsize(bin_file)
256            infile = open(bin_file, 'rb')
257            outfile = self.data_sdo.open('wb', size=size)
258
259            progress = Bar('%(percent)d%%', max=size, suffix='%(index)d/%(max)dB')
260            while True:
261                chunk = infile.read(1024)
262                if not chunk:
263                    break
264                outfile.write(chunk)
265                progress.next(n=len(chunk))
266        except:
267            raise ValueError('Failed to download program')
268        finally:
269            progress.finish()
270            infile.close()
271            outfile.close()
272
273    def wait_for_bootup(self, timeout=DEFAULT_TIMEOUT):
274        '''Wait for boot-up message reception'''
275        self.logger.info('Waiting for boot-up message...')
276        try:
277            self.node.nmt.wait_for_bootup(timeout=timeout)
278        except:
279            raise ValueError('Timeout waiting for boot-up message')
280
281    def wait_for_flash_status_ok(self, timeout=DEFAULT_TIMEOUT):
282        '''Wait for flash status ok'''
283        self.logger.info('Waiting for flash status ok')
284        end_time = time.time() + timeout
285        while True:
286            now = time.time()
287            status = self.flash_status()
288            if status == 0:
289                break
290
291            if now > end_time:
292                return status
293
294        return status
295
296    @staticmethod
297    def create_object_dictionary():
298        '''Create a synthetic CANopen object dictionary for program download'''
299        objdict = canopen.objectdictionary.ObjectDictionary()
300
301        array = canopen.objectdictionary.Array('Program data', 0x1f50)
302        member = canopen.objectdictionary.Variable('', 0x1f50, subindex=1)
303        member.data_type = canopen.objectdictionary.DOMAIN
304        array.add_member(member)
305        objdict.add_object(array)
306
307        array = canopen.objectdictionary.Array('Program control', 0x1f51)
308        member = canopen.objectdictionary.Variable('', 0x1f51, subindex=1)
309        member.data_type = canopen.objectdictionary.UNSIGNED8
310        array.add_member(member)
311        objdict.add_object(array)
312
313        array = canopen.objectdictionary.Array('Program sofware ID', 0x1f56)
314        member = canopen.objectdictionary.Variable('', 0x1f56, subindex=1)
315        member.data_type = canopen.objectdictionary.UNSIGNED32
316        array.add_member(member)
317        objdict.add_object(array)
318
319        array = canopen.objectdictionary.Array('Flash error ID', 0x1f57)
320        member = canopen.objectdictionary.Variable('', 0x1f57, subindex=1)
321        member.data_type = canopen.objectdictionary.UNSIGNED32
322        array.add_member(member)
323        objdict.add_object(array)
324
325        return objdict
326