1# Copyright (c) 2020 Vestas Wind Systems A/S 2# 3# SPDX-License-Identifier: Apache-2.0 4 5'''Runner for performing program download over CANopen (DSP 302-3).''' 6 7import argparse 8import os 9import time 10 11from runners.core import ZephyrBinaryRunner, RunnerCaps 12 13try: 14 import canopen 15 from progress.bar import Bar 16 MISSING_REQUIREMENTS = False 17except ImportError: 18 MISSING_REQUIREMENTS = True 19 20# Default Python-CAN context to use, see python-can documentation for details 21DEFAULT_CAN_CONTEXT = 'default' 22 23# Default program number 24DEFAULT_PROGRAM_NUMBER = 1 25 26# Program download buffer size in bytes 27PROGRAM_DOWNLOAD_BUFFER_SIZE = 1024 28 29# Program download chunk size in bytes 30PROGRAM_DOWNLOAD_CHUNK_SIZE = PROGRAM_DOWNLOAD_BUFFER_SIZE // 2 31 32# Default timeouts and retries 33DEFAULT_TIMEOUT = 10.0 # seconds 34DEFAULT_SDO_TIMEOUT = 1 # seconds 35DEFAULT_SDO_RETRIES = 1 36 37# Object dictionary indexes 38H1F50_PROGRAM_DATA = 0x1F50 39H1F51_PROGRAM_CTRL = 0x1F51 40H1F56_PROGRAM_SWID = 0x1F56 41H1F57_FLASH_STATUS = 0x1F57 42 43# Program control commands 44PROGRAM_CTRL_STOP = 0x00 45PROGRAM_CTRL_START = 0x01 46PROGRAM_CTRL_RESET = 0x02 47PROGRAM_CTRL_CLEAR = 0x03 48PROGRAM_CTRL_ZEPHYR_CONFIRM = 0x80 49 50class ToggleAction(argparse.Action): 51 '''Toggle argument parser''' 52 def __call__(self, parser, namespace, values, option_string=None): 53 setattr(namespace, self.dest, not option_string.startswith('--no-')) 54 55class CANopenBinaryRunner(ZephyrBinaryRunner): 56 '''Runner front-end for CANopen.''' 57 def __init__(self, cfg, dev_id, can_context=DEFAULT_CAN_CONTEXT, 58 program_number=DEFAULT_PROGRAM_NUMBER, confirm=True, 59 confirm_only=True, timeout=DEFAULT_TIMEOUT, 60 sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT, 61 block_transfer=False): 62 if MISSING_REQUIREMENTS: 63 raise RuntimeError('one or more Python dependencies were missing; ' 64 "see the getting started guide for details on " 65 "how to fix") 66 67 super().__init__(cfg) 68 self.dev_id = dev_id # Only use for error checking in do_run() 69 self.bin_file = cfg.bin_file 70 self.confirm = confirm 71 self.confirm_only = confirm_only 72 self.timeout = timeout 73 self.downloader = CANopenProgramDownloader(logger=self.logger, 74 node_id=dev_id, 75 can_context=can_context, 76 program_number=program_number, 77 sdo_retries=sdo_retries, 78 sdo_timeout=sdo_timeout, 79 block_transfer=block_transfer) 80 81 @classmethod 82 def name(cls): 83 return 'canopen' 84 85 @classmethod 86 def capabilities(cls): 87 return RunnerCaps(commands={'flash'}, dev_id=True, flash_addr=False) 88 89 @classmethod 90 def dev_id_help(cls) -> str: 91 return 'CANopen Node ID.' 92 93 @classmethod 94 def do_add_parser(cls, parser): 95 # Optional: 96 parser.add_argument('--node-id', dest='dev_id', 97 help=cls.dev_id_help()) 98 99 parser.add_argument('--can-context', default=DEFAULT_CAN_CONTEXT, 100 help=f'Python-CAN context to use (default: {DEFAULT_CAN_CONTEXT})') 101 parser.add_argument('--program-number', type=int, default=DEFAULT_PROGRAM_NUMBER, 102 help=f'program number (default: {DEFAULT_PROGRAM_NUMBER})') 103 parser.add_argument('--confirm', '--no-confirm', 104 dest='confirm', nargs=0, 105 action=ToggleAction, 106 help='confirm after starting? (default: yes)') 107 parser.add_argument('--confirm-only', default=False, action='store_true', 108 help='confirm only, no program download (default: no)') 109 parser.add_argument('--timeout', type=float, default=DEFAULT_TIMEOUT, 110 help=f'Timeout in seconds (default: {DEFAULT_TIMEOUT})') 111 parser.add_argument('--sdo-retries', type=int, default=DEFAULT_SDO_RETRIES, 112 help=f'CANopen SDO request retries (default: {DEFAULT_SDO_RETRIES})') 113 parser.add_argument('--sdo-timeout', type=float, default=DEFAULT_SDO_TIMEOUT, 114 help=f'''CANopen SDO response timeout in seconds 115 (default: {DEFAULT_SDO_TIMEOUT})''') 116 parser.add_argument('--block-transfer', default=False, action='store_true', 117 help='Use SDO block transfers (experimental, default: no)') 118 119 parser.set_defaults(confirm=True) 120 121 @classmethod 122 def do_create(cls, cfg, args): 123 return CANopenBinaryRunner(cfg, int(args.dev_id), 124 can_context=args.can_context, 125 program_number=args.program_number, 126 confirm=args.confirm, 127 confirm_only=args.confirm_only, 128 timeout=args.timeout, 129 sdo_retries=args.sdo_retries, 130 sdo_timeout=args.sdo_timeout, 131 block_transfer=args.block_transfer) 132 133 def do_run(self, command, **kwargs): 134 if not self.dev_id: 135 raise RuntimeError('Please specify a CANopen node ID with the ' 136 '-i/--dev-id or --node-id command-line switch.') 137 if command == 'flash': 138 self.flash(**kwargs) 139 140 def flash(self, **kwargs): 141 '''Download program to flash over CANopen''' 142 self.ensure_output('bin') 143 self.logger.info('Using Node ID %d, program number %d', 144 self.downloader.node_id, 145 self.downloader.program_number) 146 147 self.downloader.connect() 148 status = self.downloader.wait_for_flash_status_ok(self.timeout) 149 if status == 0: 150 self.downloader.swid() 151 else: 152 self.logger.warning('Flash status 0x{:02x}, ' 153 'skipping software identification'.format(status)) 154 155 self.downloader.enter_pre_operational() 156 157 if self.confirm_only: 158 self.downloader.zephyr_confirm_program() 159 self.downloader.disconnect() 160 return 161 162 if self.bin_file is None: 163 raise ValueError('Cannot download program; bin_file is missing') 164 165 self.downloader.stop_program() 166 self.downloader.clear_program() 167 self.downloader.wait_for_flash_status_ok(self.timeout) 168 self.downloader.download(self.bin_file) 169 170 status = self.downloader.wait_for_flash_status_ok(self.timeout) 171 if status != 0: 172 raise ValueError('Program download failed: ' 173 'flash status 0x{:02x}'.format(status)) 174 175 self.downloader.swid() 176 self.downloader.start_program() 177 self.downloader.wait_for_bootup(self.timeout) 178 self.downloader.swid() 179 180 if self.confirm: 181 self.downloader.enter_pre_operational() 182 self.downloader.zephyr_confirm_program() 183 184 self.downloader.disconnect() 185 186class CANopenProgramDownloader(object): 187 '''CANopen program downloader''' 188 def __init__(self, logger, node_id, can_context=DEFAULT_CAN_CONTEXT, 189 program_number=DEFAULT_PROGRAM_NUMBER, 190 sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT, 191 block_transfer=False): 192 super(CANopenProgramDownloader, self).__init__() 193 self.logger = logger 194 self.node_id = node_id 195 self.can_context = can_context 196 self.program_number = program_number 197 self.network = canopen.Network() 198 self.node = self.network.add_node(self.node_id, 199 self.create_object_dictionary()) 200 self.data_sdo = self.node.sdo[H1F50_PROGRAM_DATA][self.program_number] 201 self.ctrl_sdo = self.node.sdo[H1F51_PROGRAM_CTRL][self.program_number] 202 self.swid_sdo = self.node.sdo[H1F56_PROGRAM_SWID][self.program_number] 203 self.flash_sdo = self.node.sdo[H1F57_FLASH_STATUS][self.program_number] 204 205 self.node.sdo.MAX_RETRIES = sdo_retries 206 self.node.sdo.RESPONSE_TIMEOUT = sdo_timeout 207 208 self.block_transfer = block_transfer 209 210 def connect(self): 211 '''Connect to CAN network''' 212 try: 213 self.network.connect(context=self.can_context) 214 except: 215 raise ValueError('Unable to connect to CAN network') 216 217 def disconnect(self): 218 '''Disconnect from CAN network''' 219 self.network.disconnect() 220 221 def enter_pre_operational(self): 222 '''Enter pre-operational NMT state''' 223 self.logger.info("Entering pre-operational mode") 224 try: 225 self.node.nmt.state = 'PRE-OPERATIONAL' 226 except: 227 raise ValueError('Failed to enter pre-operational mode') 228 229 def _ctrl_program(self, cmd): 230 '''Write program control command to CANopen object dictionary (0x1f51)''' 231 try: 232 self.ctrl_sdo.raw = cmd 233 except: 234 raise ValueError('Unable to write control command 0x{:02x}'.format(cmd)) 235 236 def stop_program(self): 237 '''Write stop control command to CANopen object dictionary (0x1f51)''' 238 self.logger.info('Stopping program') 239 self._ctrl_program(PROGRAM_CTRL_STOP) 240 241 def start_program(self): 242 '''Write start control command to CANopen object dictionary (0x1f51)''' 243 self.logger.info('Starting program') 244 self._ctrl_program(PROGRAM_CTRL_START) 245 246 def clear_program(self): 247 '''Write clear control command to CANopen object dictionary (0x1f51)''' 248 self.logger.info('Clearing program') 249 self._ctrl_program(PROGRAM_CTRL_CLEAR) 250 251 def zephyr_confirm_program(self): 252 '''Write confirm control command to CANopen object dictionary (0x1f51)''' 253 self.logger.info('Confirming program') 254 self._ctrl_program(PROGRAM_CTRL_ZEPHYR_CONFIRM) 255 256 def swid(self): 257 '''Read software identification from CANopen object dictionary (0x1f56)''' 258 try: 259 swid = self.swid_sdo.raw 260 except: 261 raise ValueError('Failed to read software identification') 262 self.logger.info('Program software identification: 0x{:08x}'.format(swid)) 263 return swid 264 265 def flash_status(self): 266 '''Read flash status identification''' 267 try: 268 status = self.flash_sdo.raw 269 except: 270 raise ValueError('Failed to read flash status identification') 271 return status 272 273 def download(self, bin_file): 274 '''Download program to CANopen object dictionary (0x1f50)''' 275 self.logger.info('Downloading program: %s', bin_file) 276 try: 277 size = os.path.getsize(bin_file) 278 infile = open(bin_file, 'rb') 279 outfile = self.data_sdo.open('wb', buffering=PROGRAM_DOWNLOAD_BUFFER_SIZE, 280 size=size, block_transfer=self.block_transfer) 281 282 progress = Bar('%(percent)d%%', max=size, suffix='%(index)d/%(max)dB') 283 while True: 284 chunk = infile.read(PROGRAM_DOWNLOAD_CHUNK_SIZE) 285 if not chunk: 286 break 287 outfile.write(chunk) 288 progress.next(n=len(chunk)) 289 except: 290 raise ValueError('Failed to download program') 291 finally: 292 progress.finish() 293 infile.close() 294 outfile.close() 295 296 def wait_for_bootup(self, timeout=DEFAULT_TIMEOUT): 297 '''Wait for boot-up message reception''' 298 self.logger.info('Waiting for boot-up message...') 299 try: 300 self.node.nmt.wait_for_bootup(timeout=timeout) 301 except: 302 raise ValueError('Timeout waiting for boot-up message') 303 304 def wait_for_flash_status_ok(self, timeout=DEFAULT_TIMEOUT): 305 '''Wait for flash status ok''' 306 self.logger.info('Waiting for flash status ok') 307 end_time = time.time() + timeout 308 while True: 309 now = time.time() 310 status = self.flash_status() 311 if status == 0: 312 break 313 314 if now > end_time: 315 return status 316 317 return status 318 319 @staticmethod 320 def create_object_dictionary(): 321 '''Create a synthetic CANopen object dictionary for program download''' 322 objdict = canopen.objectdictionary.ObjectDictionary() 323 324 array = canopen.objectdictionary.Array('Program data', 0x1f50) 325 member = canopen.objectdictionary.Variable('', 0x1f50, subindex=1) 326 member.data_type = canopen.objectdictionary.DOMAIN 327 array.add_member(member) 328 objdict.add_object(array) 329 330 array = canopen.objectdictionary.Array('Program control', 0x1f51) 331 member = canopen.objectdictionary.Variable('', 0x1f51, subindex=1) 332 member.data_type = canopen.objectdictionary.UNSIGNED8 333 array.add_member(member) 334 objdict.add_object(array) 335 336 array = canopen.objectdictionary.Array('Program software ID', 0x1f56) 337 member = canopen.objectdictionary.Variable('', 0x1f56, subindex=1) 338 member.data_type = canopen.objectdictionary.UNSIGNED32 339 array.add_member(member) 340 objdict.add_object(array) 341 342 array = canopen.objectdictionary.Array('Flash error ID', 0x1f57) 343 member = canopen.objectdictionary.Variable('', 0x1f57, subindex=1) 344 member.data_type = canopen.objectdictionary.UNSIGNED32 345 array.add_member(member) 346 objdict.add_object(array) 347 348 return objdict 349