1# Copyright (c) 2020 Vestas Wind Systems A/S 2# 3# SPDX-License-Identifier: Apache-2.0 4 5'''Runner for performing program download over CANopen (DSP 302-3).''' 6 7import argparse 8import os 9import time 10 11from runners.core import RunnerCaps, ZephyrBinaryRunner 12 13try: 14 import canopen 15 from progress.bar import Bar 16 MISSING_REQUIREMENTS = False 17except ImportError: 18 MISSING_REQUIREMENTS = True 19 20# Default Python-CAN context to use, see python-can documentation for details 21DEFAULT_CAN_CONTEXT = 'default' 22 23# Default program number 24DEFAULT_PROGRAM_NUMBER = 1 25 26# Program download buffer size in bytes 27DEFAULT_PROGRAM_DOWNLOAD_BUFFER_SIZE = 1024 28 29# Default timeouts and retries 30DEFAULT_TIMEOUT = 10.0 # seconds 31DEFAULT_SDO_TIMEOUT = 1 # seconds 32DEFAULT_SDO_RETRIES = 1 33 34# Object dictionary indexes 35H1F50_PROGRAM_DATA = 0x1F50 36H1F51_PROGRAM_CTRL = 0x1F51 37H1F56_PROGRAM_SWID = 0x1F56 38H1F57_FLASH_STATUS = 0x1F57 39 40# Program control commands 41PROGRAM_CTRL_STOP = 0x00 42PROGRAM_CTRL_START = 0x01 43PROGRAM_CTRL_RESET = 0x02 44PROGRAM_CTRL_CLEAR = 0x03 45PROGRAM_CTRL_ZEPHYR_CONFIRM = 0x80 46 47class ToggleAction(argparse.Action): 48 '''Toggle argument parser''' 49 def __call__(self, parser, namespace, values, option_string=None): 50 setattr(namespace, self.dest, not option_string.startswith('--no-')) 51 52class CANopenBinaryRunner(ZephyrBinaryRunner): 53 '''Runner front-end for CANopen.''' 54 def __init__(self, cfg, dev_id, can_context=DEFAULT_CAN_CONTEXT, 55 program_number=DEFAULT_PROGRAM_NUMBER, confirm=True, 56 confirm_only=True, timeout=DEFAULT_TIMEOUT, 57 sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT, 58 download_buffer_size=DEFAULT_PROGRAM_DOWNLOAD_BUFFER_SIZE, block_transfer=False): 59 if MISSING_REQUIREMENTS: 60 raise RuntimeError('one or more Python dependencies were missing; ' 61 "see the getting started guide for details on " 62 "how to fix") 63 64 super().__init__(cfg) 65 self.dev_id = dev_id # Only use for error checking in do_run() 66 self.bin_file = cfg.bin_file 67 self.confirm = confirm 68 self.confirm_only = confirm_only 69 self.timeout = timeout 70 self.downloader = CANopenProgramDownloader(logger=self.logger, 71 node_id=dev_id, 72 can_context=can_context, 73 program_number=program_number, 74 sdo_retries=sdo_retries, 75 sdo_timeout=sdo_timeout, 76 download_buffer_size=download_buffer_size, 77 block_transfer=block_transfer, 78 ) 79 80 @classmethod 81 def name(cls): 82 return 'canopen' 83 84 @classmethod 85 def capabilities(cls): 86 return RunnerCaps(commands={'flash'}, dev_id=True, flash_addr=False) 87 88 @classmethod 89 def dev_id_help(cls) -> str: 90 return 'CANopen Node ID.' 91 92 @classmethod 93 def do_add_parser(cls, parser): 94 # Optional: 95 parser.add_argument('--node-id', dest='dev_id', 96 help=cls.dev_id_help()) 97 98 parser.add_argument('--can-context', default=DEFAULT_CAN_CONTEXT, 99 help=f'Python-CAN context to use (default: {DEFAULT_CAN_CONTEXT})') 100 parser.add_argument('--program-number', type=int, default=DEFAULT_PROGRAM_NUMBER, 101 help=f'program number (default: {DEFAULT_PROGRAM_NUMBER})') 102 parser.add_argument('--confirm', '--no-confirm', 103 dest='confirm', nargs=0, 104 action=ToggleAction, 105 help='confirm after starting? (default: yes)') 106 parser.add_argument('--confirm-only', default=False, action='store_true', 107 help='confirm only, no program download (default: no)') 108 parser.add_argument('--timeout', type=float, default=DEFAULT_TIMEOUT, 109 help=f'Timeout in seconds (default: {DEFAULT_TIMEOUT})') 110 parser.add_argument('--sdo-retries', type=int, default=DEFAULT_SDO_RETRIES, 111 help=f'CANopen SDO request retries (default: {DEFAULT_SDO_RETRIES})') 112 parser.add_argument('--sdo-timeout', type=float, default=DEFAULT_SDO_TIMEOUT, 113 help=f'''CANopen SDO response timeout in seconds 114 (default: {DEFAULT_SDO_TIMEOUT})''') 115 parser.add_argument('--download-buffer-size', type=int, 116 default=DEFAULT_PROGRAM_DOWNLOAD_BUFFER_SIZE, 117 help=f'''Program download buffer size in bytes 118 (default: {DEFAULT_PROGRAM_DOWNLOAD_BUFFER_SIZE})''') 119 parser.add_argument('--block-transfer', default=False, action='store_true', 120 help='Use SDO block transfers (experimental, default: no)') 121 122 parser.set_defaults(confirm=True) 123 124 @classmethod 125 def do_create(cls, cfg, args): 126 return CANopenBinaryRunner(cfg, int(args.dev_id), 127 can_context=args.can_context, 128 program_number=args.program_number, 129 confirm=args.confirm, 130 confirm_only=args.confirm_only, 131 timeout=args.timeout, 132 sdo_retries=args.sdo_retries, 133 sdo_timeout=args.sdo_timeout, 134 download_buffer_size=args.download_buffer_size, 135 block_transfer=args.block_transfer) 136 137 def do_run(self, command, **kwargs): 138 if not self.dev_id: 139 raise RuntimeError('Please specify a CANopen node ID with the ' 140 '-i/--dev-id or --node-id command-line switch.') 141 if command == 'flash': 142 self.flash(**kwargs) 143 144 def flash(self, **kwargs): 145 '''Download program to flash over CANopen''' 146 self.ensure_output('bin') 147 self.logger.info('Using Node ID %d, program number %d', 148 self.downloader.node_id, 149 self.downloader.program_number) 150 151 self.downloader.connect() 152 status = self.downloader.wait_for_flash_status_ok(self.timeout) 153 if status == 0: 154 self.downloader.swid() 155 else: 156 self.logger.warning(f'Flash status 0x{status:02x}, ' 157 'skipping software identification') 158 159 self.downloader.enter_pre_operational() 160 161 if self.confirm_only: 162 self.downloader.zephyr_confirm_program() 163 self.downloader.disconnect() 164 return 165 166 if self.bin_file is None: 167 raise ValueError('Cannot download program; bin_file is missing') 168 169 self.downloader.stop_program() 170 self.downloader.clear_program() 171 self.downloader.wait_for_flash_status_ok(self.timeout) 172 self.downloader.download(self.bin_file) 173 174 status = self.downloader.wait_for_flash_status_ok(self.timeout) 175 if status != 0: 176 raise ValueError('Program download failed: ' 177 f'flash status 0x{status:02x}') 178 179 self.downloader.swid() 180 self.downloader.start_program() 181 self.downloader.wait_for_bootup(self.timeout) 182 self.downloader.swid() 183 184 if self.confirm: 185 self.downloader.enter_pre_operational() 186 self.downloader.zephyr_confirm_program() 187 188 self.downloader.disconnect() 189 190class CANopenProgramDownloader: 191 '''CANopen program downloader''' 192 def __init__(self, logger, node_id, can_context=DEFAULT_CAN_CONTEXT, 193 program_number=DEFAULT_PROGRAM_NUMBER, 194 sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT, 195 download_buffer_size=DEFAULT_PROGRAM_DOWNLOAD_BUFFER_SIZE, block_transfer=False): 196 super().__init__() 197 self.logger = logger 198 self.node_id = node_id 199 self.can_context = can_context 200 self.program_number = program_number 201 self.network = canopen.Network() 202 self.node = self.network.add_node(self.node_id, 203 self.create_object_dictionary()) 204 self.data_sdo = self.node.sdo[H1F50_PROGRAM_DATA][self.program_number] 205 self.ctrl_sdo = self.node.sdo[H1F51_PROGRAM_CTRL][self.program_number] 206 self.swid_sdo = self.node.sdo[H1F56_PROGRAM_SWID][self.program_number] 207 self.flash_sdo = self.node.sdo[H1F57_FLASH_STATUS][self.program_number] 208 self.download_buffer_size = download_buffer_size 209 210 self.node.sdo.MAX_RETRIES = sdo_retries 211 self.node.sdo.RESPONSE_TIMEOUT = sdo_timeout 212 213 self.block_transfer = block_transfer 214 215 def connect(self): 216 '''Connect to CAN network''' 217 try: 218 self.network.connect(context=self.can_context) 219 except Exception as err: 220 raise ValueError('Unable to connect to CAN network') from err 221 222 def disconnect(self): 223 '''Disconnect from CAN network''' 224 self.network.disconnect() 225 226 def enter_pre_operational(self): 227 '''Enter pre-operational NMT state''' 228 self.logger.info("Entering pre-operational mode") 229 try: 230 self.node.nmt.state = 'PRE-OPERATIONAL' 231 except Exception as err: 232 raise ValueError('Failed to enter pre-operational mode') from err 233 234 def _ctrl_program(self, cmd): 235 '''Write program control command to CANopen object dictionary (0x1f51)''' 236 try: 237 self.ctrl_sdo.raw = cmd 238 except Exception as err: 239 raise ValueError(f'Unable to write control command 0x{cmd:02x}') from err 240 241 def stop_program(self): 242 '''Write stop control command to CANopen object dictionary (0x1f51)''' 243 self.logger.info('Stopping program') 244 self._ctrl_program(PROGRAM_CTRL_STOP) 245 246 def start_program(self): 247 '''Write start control command to CANopen object dictionary (0x1f51)''' 248 self.logger.info('Starting program') 249 self._ctrl_program(PROGRAM_CTRL_START) 250 251 def clear_program(self): 252 '''Write clear control command to CANopen object dictionary (0x1f51)''' 253 self.logger.info('Clearing program') 254 self._ctrl_program(PROGRAM_CTRL_CLEAR) 255 256 def zephyr_confirm_program(self): 257 '''Write confirm control command to CANopen object dictionary (0x1f51)''' 258 self.logger.info('Confirming program') 259 self._ctrl_program(PROGRAM_CTRL_ZEPHYR_CONFIRM) 260 261 def swid(self): 262 '''Read software identification from CANopen object dictionary (0x1f56)''' 263 try: 264 swid = self.swid_sdo.raw 265 except Exception as err: 266 raise ValueError('Failed to read software identification') from err 267 self.logger.info(f'Program software identification: 0x{swid:08x}') 268 return swid 269 270 def flash_status(self): 271 '''Read flash status identification''' 272 try: 273 status = self.flash_sdo.raw 274 except Exception as err: 275 raise ValueError('Failed to read flash status identification') from err 276 return status 277 278 def download(self, bin_file): 279 '''Download program to CANopen object dictionary (0x1f50)''' 280 self.logger.info('Downloading program: %s', bin_file) 281 try: 282 size = os.path.getsize(bin_file) 283 infile = open(bin_file, 'rb') # noqa: SIM115 284 outfile = self.data_sdo.open('wb', buffering=self.download_buffer_size, 285 size=size, block_transfer=self.block_transfer) 286 287 progress = Bar('%(percent)d%%', max=size, suffix='%(index)d/%(max)dB') 288 while True: 289 chunk = infile.read(self.download_buffer_size // 2) 290 if not chunk: 291 break 292 outfile.write(chunk) 293 progress.next(n=len(chunk)) 294 except Exception as err: 295 raise ValueError('Failed to download program') from err 296 finally: 297 progress.finish() 298 infile.close() 299 outfile.close() 300 301 def wait_for_bootup(self, timeout=DEFAULT_TIMEOUT): 302 '''Wait for boot-up message reception''' 303 self.logger.info('Waiting for boot-up message...') 304 try: 305 self.node.nmt.wait_for_bootup(timeout=timeout) 306 except Exception as err: 307 raise ValueError('Timeout waiting for boot-up message') from err 308 309 def wait_for_flash_status_ok(self, timeout=DEFAULT_TIMEOUT): 310 '''Wait for flash status ok''' 311 self.logger.info('Waiting for flash status ok') 312 end_time = time.time() + timeout 313 while True: 314 now = time.time() 315 status = self.flash_status() 316 if status == 0: 317 break 318 319 if now > end_time: 320 return status 321 322 return status 323 324 @staticmethod 325 def create_object_dictionary(): 326 '''Create a synthetic CANopen object dictionary for program download''' 327 objdict = canopen.objectdictionary.ObjectDictionary() 328 329 array = canopen.objectdictionary.Array('Program data', 0x1f50) 330 member = canopen.objectdictionary.Variable('', 0x1f50, subindex=1) 331 member.data_type = canopen.objectdictionary.DOMAIN 332 array.add_member(member) 333 objdict.add_object(array) 334 335 array = canopen.objectdictionary.Array('Program control', 0x1f51) 336 member = canopen.objectdictionary.Variable('', 0x1f51, subindex=1) 337 member.data_type = canopen.objectdictionary.UNSIGNED8 338 array.add_member(member) 339 objdict.add_object(array) 340 341 array = canopen.objectdictionary.Array('Program software ID', 0x1f56) 342 member = canopen.objectdictionary.Variable('', 0x1f56, subindex=1) 343 member.data_type = canopen.objectdictionary.UNSIGNED32 344 array.add_member(member) 345 objdict.add_object(array) 346 347 array = canopen.objectdictionary.Array('Flash error ID', 0x1f57) 348 member = canopen.objectdictionary.Variable('', 0x1f57, subindex=1) 349 member.data_type = canopen.objectdictionary.UNSIGNED32 350 array.add_member(member) 351 objdict.add_object(array) 352 353 return objdict 354