1# Copyright (c) 2020 Vestas Wind Systems A/S 2# 3# SPDX-License-Identifier: Apache-2.0 4 5'''Runner for performing program download over CANopen (DSP 302-3).''' 6 7import argparse 8import os 9import time 10 11from runners.core import ZephyrBinaryRunner, RunnerCaps 12 13try: 14 import canopen 15 from progress.bar import Bar 16 MISSING_REQUIREMENTS = False 17except ImportError: 18 MISSING_REQUIREMENTS = True 19 20# Default Python-CAN context to use, see python-can documentation for details 21DEFAULT_CAN_CONTEXT = 'default' 22 23# Default program number 24DEFAULT_PROGRAM_NUMBER = 1 25 26# Default timeouts and retries 27DEFAULT_TIMEOUT = 10.0 # seconds 28DEFAULT_SDO_TIMEOUT = 0.3 # seconds 29DEFAULT_SDO_RETRIES = 1 30 31# Object dictionary indexes 32H1F50_PROGRAM_DATA = 0x1F50 33H1F51_PROGRAM_CTRL = 0x1F51 34H1F56_PROGRAM_SWID = 0x1F56 35H1F57_FLASH_STATUS = 0x1F57 36 37# Program control commands 38PROGRAM_CTRL_STOP = 0x00 39PROGRAM_CTRL_START = 0x01 40PROGRAM_CTRL_RESET = 0x02 41PROGRAM_CTRL_CLEAR = 0x03 42PROGRAM_CTRL_ZEPHYR_CONFIRM = 0x80 43 44class ToggleAction(argparse.Action): 45 '''Toggle argument parser''' 46 def __call__(self, parser, namespace, values, option_string=None): 47 setattr(namespace, self.dest, not option_string.startswith('--no-')) 48 49class CANopenBinaryRunner(ZephyrBinaryRunner): 50 '''Runner front-end for CANopen.''' 51 def __init__(self, cfg, node_id, can_context=DEFAULT_CAN_CONTEXT, 52 program_number=DEFAULT_PROGRAM_NUMBER, confirm=True, 53 confirm_only=True, timeout=DEFAULT_TIMEOUT, 54 sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT): 55 if MISSING_REQUIREMENTS: 56 raise RuntimeError('one or more Python dependencies were missing; ' 57 "see the getting started guide for details on " 58 "how to fix") 59 60 super().__init__(cfg) 61 self.bin_file = cfg.bin_file 62 self.confirm = confirm 63 self.confirm_only = confirm_only 64 self.timeout = timeout 65 self.downloader = CANopenProgramDownloader(logger=self.logger, 66 node_id=node_id, 67 can_context=can_context, 68 program_number=program_number, 69 sdo_retries=sdo_retries, 70 sdo_timeout=sdo_timeout) 71 72 @classmethod 73 def name(cls): 74 return 'canopen' 75 76 @classmethod 77 def capabilities(cls): 78 return RunnerCaps(commands={'flash'}, flash_addr=False) 79 80 @classmethod 81 def do_add_parser(cls, parser): 82 # Required: 83 parser.add_argument('--node-id', required=True, help='Node ID') 84 85 # Optional: 86 parser.add_argument('--can-context', default=DEFAULT_CAN_CONTEXT, 87 help=f'Python-CAN context to use (default: {DEFAULT_CAN_CONTEXT})') 88 parser.add_argument('--program-number', type=int, default=DEFAULT_PROGRAM_NUMBER, 89 help=f'program number (default: {DEFAULT_PROGRAM_NUMBER})') 90 parser.add_argument('--confirm', '--no-confirm', 91 dest='confirm', nargs=0, 92 action=ToggleAction, 93 help='confirm after starting? (default: yes)') 94 parser.add_argument('--confirm-only', default=False, action='store_true', 95 help='confirm only, no program download (default: no)') 96 parser.add_argument('--timeout', type=float, default=DEFAULT_TIMEOUT, 97 help=f'Timeout in seconds (default: {DEFAULT_TIMEOUT})') 98 parser.add_argument('--sdo-retries', type=int, default=DEFAULT_SDO_RETRIES, 99 help=f'CANopen SDO request retries (default: {DEFAULT_SDO_RETRIES})') 100 parser.add_argument('--sdo-timeout', type=float, default=DEFAULT_SDO_TIMEOUT, 101 help=f'''CANopen SDO response timeout in seconds 102 (default: {DEFAULT_SDO_TIMEOUT})''') 103 104 parser.set_defaults(confirm=True) 105 106 @classmethod 107 def do_create(cls, cfg, args): 108 return CANopenBinaryRunner(cfg, int(args.node_id), 109 can_context=args.can_context, 110 program_number=args.program_number, 111 confirm=args.confirm, 112 confirm_only=args.confirm_only, 113 timeout=args.timeout, 114 sdo_retries=args.sdo_retries, 115 sdo_timeout=args.sdo_timeout) 116 117 def do_run(self, command, **kwargs): 118 if command == 'flash': 119 self.flash(**kwargs) 120 121 def flash(self, **kwargs): 122 '''Download program to flash over CANopen''' 123 self.ensure_output('bin') 124 self.logger.info('Using Node ID %d, program number %d', 125 self.downloader.node_id, 126 self.downloader.program_number) 127 128 self.downloader.connect() 129 status = self.downloader.wait_for_flash_status_ok(self.timeout) 130 if status == 0: 131 self.downloader.swid() 132 else: 133 self.logger.warning('Flash status 0x{:02x}, ' 134 'skipping software identification'.format(status)) 135 136 self.downloader.enter_pre_operational() 137 138 if self.confirm_only: 139 self.downloader.zephyr_confirm_program() 140 self.downloader.disconnect() 141 return 142 143 if self.bin_file is None: 144 raise ValueError('Cannot download program; bin_file is missing') 145 146 self.downloader.stop_program() 147 self.downloader.clear_program() 148 self.downloader.wait_for_flash_status_ok(self.timeout) 149 self.downloader.download(self.bin_file) 150 151 status = self.downloader.wait_for_flash_status_ok(self.timeout) 152 if status != 0: 153 raise ValueError('Program download failed: ' 154 'flash status 0x{:02x}'.format(status)) 155 156 self.downloader.swid() 157 self.downloader.start_program() 158 self.downloader.wait_for_bootup(self.timeout) 159 self.downloader.swid() 160 161 if self.confirm: 162 self.downloader.enter_pre_operational() 163 self.downloader.zephyr_confirm_program() 164 165 self.downloader.disconnect() 166 167class CANopenProgramDownloader(object): 168 '''CANopen program downloader''' 169 def __init__(self, logger, node_id, can_context=DEFAULT_CAN_CONTEXT, 170 program_number=DEFAULT_PROGRAM_NUMBER, 171 sdo_retries=DEFAULT_SDO_RETRIES, sdo_timeout=DEFAULT_SDO_TIMEOUT): 172 super(CANopenProgramDownloader, self).__init__() 173 self.logger = logger 174 self.node_id = node_id 175 self.can_context = can_context 176 self.program_number = program_number 177 self.network = canopen.Network() 178 self.node = self.network.add_node(self.node_id, 179 self.create_object_dictionary()) 180 self.data_sdo = self.node.sdo[H1F50_PROGRAM_DATA][self.program_number] 181 self.ctrl_sdo = self.node.sdo[H1F51_PROGRAM_CTRL][self.program_number] 182 self.swid_sdo = self.node.sdo[H1F56_PROGRAM_SWID][self.program_number] 183 self.flash_sdo = self.node.sdo[H1F57_FLASH_STATUS][self.program_number] 184 185 self.node.sdo.MAX_RETRIES = sdo_retries 186 self.node.sdo.RESPONSE_TIMEOUT = sdo_timeout 187 188 def connect(self): 189 '''Connect to CAN network''' 190 try: 191 self.network.connect(context=self.can_context) 192 except: 193 raise ValueError('Unable to connect to CAN network') 194 195 def disconnect(self): 196 '''Disconnect from CAN network''' 197 self.network.disconnect() 198 199 def enter_pre_operational(self): 200 '''Enter pre-operational NMT state''' 201 self.logger.info("Entering pre-operational mode") 202 try: 203 self.node.nmt.state = 'PRE-OPERATIONAL' 204 except: 205 raise ValueError('Failed to enter pre-operational mode') 206 207 def _ctrl_program(self, cmd): 208 '''Write program control command to CANopen object dictionary (0x1f51)''' 209 try: 210 self.ctrl_sdo.raw = cmd 211 except: 212 raise ValueError('Unable to write control command 0x{:02x}'.format(cmd)) 213 214 def stop_program(self): 215 '''Write stop control command to CANopen object dictionary (0x1f51)''' 216 self.logger.info('Stopping program') 217 self._ctrl_program(PROGRAM_CTRL_STOP) 218 219 def start_program(self): 220 '''Write start control command to CANopen object dictionary (0x1f51)''' 221 self.logger.info('Starting program') 222 self._ctrl_program(PROGRAM_CTRL_START) 223 224 def clear_program(self): 225 '''Write clear control command to CANopen object dictionary (0x1f51)''' 226 self.logger.info('Clearing program') 227 self._ctrl_program(PROGRAM_CTRL_CLEAR) 228 229 def zephyr_confirm_program(self): 230 '''Write confirm control command to CANopen object dictionary (0x1f51)''' 231 self.logger.info('Confirming program') 232 self._ctrl_program(PROGRAM_CTRL_ZEPHYR_CONFIRM) 233 234 def swid(self): 235 '''Read software identification from CANopen object dictionary (0x1f56)''' 236 try: 237 swid = self.swid_sdo.raw 238 except: 239 raise ValueError('Failed to read software identification') 240 self.logger.info('Program software identification: 0x{:08x}'.format(swid)) 241 return swid 242 243 def flash_status(self): 244 '''Read flash status identification''' 245 try: 246 status = self.flash_sdo.raw 247 except: 248 raise ValueError('Failed to read flash status identification') 249 return status 250 251 def download(self, bin_file): 252 '''Download program to CANopen object dictionary (0x1f50)''' 253 self.logger.info('Downloading program: %s', bin_file) 254 try: 255 size = os.path.getsize(bin_file) 256 infile = open(bin_file, 'rb') 257 outfile = self.data_sdo.open('wb', size=size) 258 259 progress = Bar('%(percent)d%%', max=size, suffix='%(index)d/%(max)dB') 260 while True: 261 chunk = infile.read(1024) 262 if not chunk: 263 break 264 outfile.write(chunk) 265 progress.next(n=len(chunk)) 266 except: 267 raise ValueError('Failed to download program') 268 finally: 269 progress.finish() 270 infile.close() 271 outfile.close() 272 273 def wait_for_bootup(self, timeout=DEFAULT_TIMEOUT): 274 '''Wait for boot-up message reception''' 275 self.logger.info('Waiting for boot-up message...') 276 try: 277 self.node.nmt.wait_for_bootup(timeout=timeout) 278 except: 279 raise ValueError('Timeout waiting for boot-up message') 280 281 def wait_for_flash_status_ok(self, timeout=DEFAULT_TIMEOUT): 282 '''Wait for flash status ok''' 283 self.logger.info('Waiting for flash status ok') 284 end_time = time.time() + timeout 285 while True: 286 now = time.time() 287 status = self.flash_status() 288 if status == 0: 289 break 290 291 if now > end_time: 292 return status 293 294 return status 295 296 @staticmethod 297 def create_object_dictionary(): 298 '''Create a synthetic CANopen object dictionary for program download''' 299 objdict = canopen.objectdictionary.ObjectDictionary() 300 301 array = canopen.objectdictionary.Array('Program data', 0x1f50) 302 member = canopen.objectdictionary.Variable('', 0x1f50, subindex=1) 303 member.data_type = canopen.objectdictionary.DOMAIN 304 array.add_member(member) 305 objdict.add_object(array) 306 307 array = canopen.objectdictionary.Array('Program control', 0x1f51) 308 member = canopen.objectdictionary.Variable('', 0x1f51, subindex=1) 309 member.data_type = canopen.objectdictionary.UNSIGNED8 310 array.add_member(member) 311 objdict.add_object(array) 312 313 array = canopen.objectdictionary.Array('Program sofware ID', 0x1f56) 314 member = canopen.objectdictionary.Variable('', 0x1f56, subindex=1) 315 member.data_type = canopen.objectdictionary.UNSIGNED32 316 array.add_member(member) 317 objdict.add_object(array) 318 319 array = canopen.objectdictionary.Array('Flash error ID', 0x1f57) 320 member = canopen.objectdictionary.Variable('', 0x1f57, subindex=1) 321 member.data_type = canopen.objectdictionary.UNSIGNED32 322 array.add_member(member) 323 objdict.add_object(array) 324 325 return objdict 326