1# Copyright (c) 2020 Vestas Wind Systems A/S
2#
3# SPDX-License-Identifier: Apache-2.0
4
5'''Runner for performing program download over CANopen (DSP 302-3).'''
6
7import argparse
8import os
9import time
10
11from runners.core import RunnerCaps, ZephyrBinaryRunner
12
13try:
14    import canopen
15    from tqdm import tqdm
16    MISSING_REQUIREMENTS = False
17except ImportError:
18    MISSING_REQUIREMENTS = True
19
20# Default Python-CAN context to use, see python-can documentation for details
21DEFAULT_CAN_CONTEXT = 'default'
22
23# Default program number
24DEFAULT_PROGRAM_NUMBER = 1
25
26# Program download buffer size in bytes
27DEFAULT_PROGRAM_DOWNLOAD_BUFFER_SIZE = 1024
28
29# Default timeouts and retries
30DEFAULT_TIMEOUT = 10.0 # seconds
31DEFAULT_SDO_TIMEOUT = 1 # seconds
32DEFAULT_SDO_RETRIES = 1
33
34# Object dictionary indexes
35H1F50_PROGRAM_DATA = 0x1F50
36H1F51_PROGRAM_CTRL = 0x1F51
37H1F56_PROGRAM_SWID = 0x1F56
38H1F57_FLASH_STATUS = 0x1F57
39
40# Program control commands
41PROGRAM_CTRL_STOP = 0x00
42PROGRAM_CTRL_START = 0x01
43PROGRAM_CTRL_RESET = 0x02
44PROGRAM_CTRL_CLEAR = 0x03
45PROGRAM_CTRL_ZEPHYR_CONFIRM = 0x80
46
47class CANopenBinaryRunner(ZephyrBinaryRunner):
48    '''Runner front-end for CANopen.'''
49    def __init__(self, cfg, dev_id, can_context=DEFAULT_CAN_CONTEXT,
50                 program_number=DEFAULT_PROGRAM_NUMBER, confirm=True,
51                 confirm_only=True, timeout=DEFAULT_TIMEOUT,
52                 sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT,
53                 download_buffer_size=DEFAULT_PROGRAM_DOWNLOAD_BUFFER_SIZE, block_transfer=False):
54        if MISSING_REQUIREMENTS:
55            raise RuntimeError('one or more Python dependencies were missing; '
56                               "see the getting started guide for details on "
57                               "how to fix")
58
59        super().__init__(cfg)
60        self.dev_id = dev_id # Only use for error checking in do_run()
61        self.bin_file = cfg.bin_file
62        self.confirm = confirm
63        self.confirm_only = confirm_only
64        self.timeout = timeout
65        self.downloader = CANopenProgramDownloader(logger=self.logger,
66                                                   node_id=dev_id,
67                                                   can_context=can_context,
68                                                   program_number=program_number,
69                                                   sdo_retries=sdo_retries,
70                                                   sdo_timeout=sdo_timeout,
71                                                   download_buffer_size=download_buffer_size,
72                                                   block_transfer=block_transfer,
73                                                   )
74
75    @classmethod
76    def name(cls):
77        return 'canopen'
78
79    @classmethod
80    def capabilities(cls):
81        return RunnerCaps(commands={'flash'}, dev_id=True, flash_addr=False)
82
83    @classmethod
84    def dev_id_help(cls) -> str:
85        return 'CANopen Node ID.'
86
87    @classmethod
88    def do_add_parser(cls, parser):
89        # Optional:
90        parser.add_argument('--node-id', dest='dev_id',
91                            help=cls.dev_id_help())
92
93        parser.add_argument('--can-context', default=DEFAULT_CAN_CONTEXT,
94                            help=f'Python-CAN context to use (default: {DEFAULT_CAN_CONTEXT})')
95        parser.add_argument('--program-number', type=int, default=DEFAULT_PROGRAM_NUMBER,
96                            help=f'program number (default: {DEFAULT_PROGRAM_NUMBER})')
97        parser.add_argument('--confirm', action=argparse.BooleanOptionalAction,
98                            help='confirm after starting? (default: yes)')
99        parser.add_argument('--confirm-only', default=False, action='store_true',
100                            help='confirm only, no program download (default: no)')
101        parser.add_argument('--timeout', type=float, default=DEFAULT_TIMEOUT,
102                            help=f'Timeout in seconds (default: {DEFAULT_TIMEOUT})')
103        parser.add_argument('--sdo-retries', type=int, default=DEFAULT_SDO_RETRIES,
104                            help=f'CANopen SDO request retries (default: {DEFAULT_SDO_RETRIES})')
105        parser.add_argument('--sdo-timeout', type=float, default=DEFAULT_SDO_TIMEOUT,
106                            help=f'''CANopen SDO response timeout in seconds
107                            (default: {DEFAULT_SDO_TIMEOUT})''')
108        parser.add_argument('--download-buffer-size', type=int,
109                            default=DEFAULT_PROGRAM_DOWNLOAD_BUFFER_SIZE,
110                            help=f'''Program download buffer size in bytes
111                            (default: {DEFAULT_PROGRAM_DOWNLOAD_BUFFER_SIZE})''')
112        parser.add_argument('--block-transfer', default=False, action='store_true',
113                            help='Use SDO block transfers (experimental, default: no)')
114
115        parser.set_defaults(confirm=True)
116
117    @classmethod
118    def do_create(cls, cfg, args):
119        return CANopenBinaryRunner(cfg, int(args.dev_id),
120                                   can_context=args.can_context,
121                                   program_number=args.program_number,
122                                   confirm=args.confirm,
123                                   confirm_only=args.confirm_only,
124                                   timeout=args.timeout,
125                                   sdo_retries=args.sdo_retries,
126                                   sdo_timeout=args.sdo_timeout,
127                                   download_buffer_size=args.download_buffer_size,
128                                   block_transfer=args.block_transfer)
129
130    def do_run(self, command, **kwargs):
131        if not self.dev_id:
132            raise RuntimeError('Please specify a CANopen node ID with the '
133                               '-i/--dev-id or --node-id command-line switch.')
134        if command == 'flash':
135            self.flash(**kwargs)
136
137    def flash(self, **kwargs):
138        '''Download program to flash over CANopen'''
139        self.ensure_output('bin')
140        self.logger.info('Using Node ID %d, program number %d',
141                         self.downloader.node_id,
142                         self.downloader.program_number)
143
144        self.downloader.connect()
145        status = self.downloader.wait_for_flash_status_ok(self.timeout)
146        if status == 0:
147            self.downloader.swid()
148        else:
149            self.logger.warning(f'Flash status 0x{status:02x}, '
150                                'skipping software identification')
151
152        self.downloader.enter_pre_operational()
153
154        if self.confirm_only:
155            self.downloader.zephyr_confirm_program()
156            self.downloader.disconnect()
157            return
158
159        if self.bin_file is None:
160            raise ValueError('Cannot download program; bin_file is missing')
161
162        self.downloader.stop_program()
163        self.downloader.clear_program()
164        self.downloader.wait_for_flash_status_ok(self.timeout)
165        self.downloader.download(self.bin_file)
166
167        status = self.downloader.wait_for_flash_status_ok(self.timeout)
168        if status != 0:
169            raise ValueError('Program download failed: '
170                             f'flash status 0x{status:02x}')
171
172        self.downloader.swid()
173        self.downloader.start_program()
174        self.downloader.wait_for_bootup(self.timeout)
175        self.downloader.swid()
176
177        if self.confirm:
178            self.downloader.enter_pre_operational()
179            self.downloader.zephyr_confirm_program()
180
181        self.downloader.disconnect()
182
183class CANopenProgramDownloader:
184    '''CANopen program downloader'''
185    def __init__(self, logger, node_id, can_context=DEFAULT_CAN_CONTEXT,
186                 program_number=DEFAULT_PROGRAM_NUMBER,
187                 sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT,
188                 download_buffer_size=DEFAULT_PROGRAM_DOWNLOAD_BUFFER_SIZE, block_transfer=False):
189        super().__init__()
190        self.logger = logger
191        self.node_id = node_id
192        self.can_context = can_context
193        self.program_number = program_number
194        self.network = canopen.Network()
195        self.node = self.network.add_node(self.node_id,
196                                          self.create_object_dictionary())
197        self.data_sdo = self.node.sdo[H1F50_PROGRAM_DATA][self.program_number]
198        self.ctrl_sdo = self.node.sdo[H1F51_PROGRAM_CTRL][self.program_number]
199        self.swid_sdo = self.node.sdo[H1F56_PROGRAM_SWID][self.program_number]
200        self.flash_sdo = self.node.sdo[H1F57_FLASH_STATUS][self.program_number]
201        self.download_buffer_size = download_buffer_size
202
203        self.node.sdo.MAX_RETRIES = sdo_retries
204        self.node.sdo.RESPONSE_TIMEOUT = sdo_timeout
205
206        self.block_transfer = block_transfer
207
208    def connect(self):
209        '''Connect to CAN network'''
210        try:
211            self.network.connect(context=self.can_context)
212        except Exception as err:
213            raise ValueError('Unable to connect to CAN network') from err
214
215    def disconnect(self):
216        '''Disconnect from CAN network'''
217        self.network.disconnect()
218
219    def enter_pre_operational(self):
220        '''Enter pre-operational NMT state'''
221        self.logger.info("Entering pre-operational mode")
222        try:
223            self.node.nmt.state = 'PRE-OPERATIONAL'
224        except Exception as err:
225            raise ValueError('Failed to enter pre-operational mode') from err
226
227    def _ctrl_program(self, cmd):
228        '''Write program control command to CANopen object dictionary (0x1f51)'''
229        try:
230            self.ctrl_sdo.raw = cmd
231        except Exception as err:
232            raise ValueError(f'Unable to write control command 0x{cmd:02x}') from err
233
234    def stop_program(self):
235        '''Write stop control command to CANopen object dictionary (0x1f51)'''
236        self.logger.info('Stopping program')
237        self._ctrl_program(PROGRAM_CTRL_STOP)
238
239    def start_program(self):
240        '''Write start control command to CANopen object dictionary (0x1f51)'''
241        self.logger.info('Starting program')
242        self._ctrl_program(PROGRAM_CTRL_START)
243
244    def clear_program(self):
245        '''Write clear control command to CANopen object dictionary (0x1f51)'''
246        self.logger.info('Clearing program')
247        self._ctrl_program(PROGRAM_CTRL_CLEAR)
248
249    def zephyr_confirm_program(self):
250        '''Write confirm control command to CANopen object dictionary (0x1f51)'''
251        self.logger.info('Confirming program')
252        self._ctrl_program(PROGRAM_CTRL_ZEPHYR_CONFIRM)
253
254    def swid(self):
255        '''Read software identification from CANopen object dictionary (0x1f56)'''
256        try:
257            swid = self.swid_sdo.raw
258        except Exception as err:
259            raise ValueError('Failed to read software identification') from err
260        self.logger.info(f'Program software identification: 0x{swid:08x}')
261        return swid
262
263    def flash_status(self):
264        '''Read flash status identification'''
265        try:
266            status = self.flash_sdo.raw
267        except Exception as err:
268            raise ValueError('Failed to read flash status identification') from err
269        return status
270
271    def download(self, bin_file):
272        '''Download program to CANopen object dictionary (0x1f50)'''
273        self.logger.info('Downloading program: %s', bin_file)
274        try:
275            size = os.path.getsize(bin_file)
276            infile = open(bin_file, 'rb')  # noqa: SIM115
277            outfile = self.data_sdo.open('wb', buffering=self.download_buffer_size,
278                                         size=size, block_transfer=self.block_transfer)
279
280            progress = tqdm(total=size, unit='B', unit_scale=True)
281            while True:
282                chunk = infile.read(self.download_buffer_size // 2)
283                if not chunk:
284                    break
285                outfile.write(chunk)
286                progress.update(len(chunk))
287        except Exception as err:
288            raise ValueError('Failed to download program') from err
289        finally:
290            progress.close()
291            infile.close()
292            outfile.close()
293
294    def wait_for_bootup(self, timeout=DEFAULT_TIMEOUT):
295        '''Wait for boot-up message reception'''
296        self.logger.info('Waiting for boot-up message...')
297        try:
298            self.node.nmt.wait_for_bootup(timeout=timeout)
299        except Exception as err:
300            raise ValueError('Timeout waiting for boot-up message') from err
301
302    def wait_for_flash_status_ok(self, timeout=DEFAULT_TIMEOUT):
303        '''Wait for flash status ok'''
304        self.logger.info('Waiting for flash status ok')
305        end_time = time.time() + timeout
306        while True:
307            now = time.time()
308            status = self.flash_status()
309            if status == 0:
310                break
311
312            if now > end_time:
313                return status
314
315        return status
316
317    @staticmethod
318    def create_object_dictionary():
319        '''Create a synthetic CANopen object dictionary for program download'''
320        objdict = canopen.objectdictionary.ObjectDictionary()
321
322        array = canopen.objectdictionary.Array('Program data', 0x1f50)
323        member = canopen.objectdictionary.Variable('', 0x1f50, subindex=1)
324        member.data_type = canopen.objectdictionary.DOMAIN
325        array.add_member(member)
326        objdict.add_object(array)
327
328        array = canopen.objectdictionary.Array('Program control', 0x1f51)
329        member = canopen.objectdictionary.Variable('', 0x1f51, subindex=1)
330        member.data_type = canopen.objectdictionary.UNSIGNED8
331        array.add_member(member)
332        objdict.add_object(array)
333
334        array = canopen.objectdictionary.Array('Program software ID', 0x1f56)
335        member = canopen.objectdictionary.Variable('', 0x1f56, subindex=1)
336        member.data_type = canopen.objectdictionary.UNSIGNED32
337        array.add_member(member)
338        objdict.add_object(array)
339
340        array = canopen.objectdictionary.Array('Flash error ID', 0x1f57)
341        member = canopen.objectdictionary.Variable('', 0x1f57, subindex=1)
342        member.data_type = canopen.objectdictionary.UNSIGNED32
343        array.add_member(member)
344        objdict.add_object(array)
345
346        return objdict
347