1#!/usr/bin/python3 2 3""" 4Copyright (c) 2020 Cypress Semiconductor Corporation 5 6Licensed under the Apache License, Version 2.0 (the "License"); 7you may not use this file except in compliance with the License. 8You may obtain a copy of the License at 9 10 http://www.apache.org/licenses/LICENSE-2.0 11 12Unless required by applicable law or agreed to in writing, software 13distributed under the License is distributed on an "AS IS" BASIS, 14WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15See the License for the specific language governing permissions and 16limitations under the License. 17""" 18 19import os 20import platform 21import sys, argparse 22import subprocess 23import json 24from time import sleep 25 26from cysecuretools import CySecureTools 27from cysecuretools.execute.provisioning_lib.cyprov_pem import PemKey 28from OpenSSL import crypto, SSL 29from cysecuretools.execute.programmer.programmer import ProgrammingTool 30from cysecuretools.execute.programmer.base import AP 31from cysecuretools.core.target_director import Target 32from cysecuretools.execute.programmer.pyocd_wrapper import ResetType 33 34# Examples 35# minimal parameters for default values: 36# ./reprov_helper.py 37# 38# non-interactive: 39# ./reprov_helper.py -d cy8ckit-064s0s2-4343w \ 40# -p policy/policy_multi_CM0_CM4_tfm_dev_certs.json \ 41# -[existing-keys|new-keys] -s <serial number> -y 42 43 44def myargs(argv): 45 parser = argparse.ArgumentParser(add_help=False) 46 parser.add_argument('-h', '--help', 47 dest='show_help', 48 action='help', 49 help='Print this help message and exit') 50 51 parser.add_argument('-p', '--policy', 52 dest='policy_file', 53 action='store', 54 type=str, 55 help="Device policy file", 56 required=False) 57 58 parser.add_argument('-d', '--device', 59 dest='device', 60 action='store', 61 type=str, 62 help="Device manufacturing part number", 63 required=False) 64 65 parser.add_argument('-s', '--serial', 66 dest='serial_number', 67 action='store', 68 type=str, 69 help="Device unique serial number", 70 required=False) 71 72 parser.add_argument('-new-keys', 73 dest='new_keys', 74 action='store_true', 75 help="Create a new set keys if defined", 76 required=False) 77 78 parser.add_argument('-existing-keys', 79 dest='existing_keys', 80 action='store_true', 81 help="Force to use existing set keys if defined", 82 required=False) 83 84 parser.add_argument('-y', 85 dest='force_reprov', 86 action='store_true', 87 help="Force reprovisioning", 88 required=False) 89 90 options = parser.parse_args(argv) 91 return options 92 93 94def create_app_keys(overwrite=None): 95 cytools.create_keys(overwrite) 96 97 98def read_device_pub_key(): 99 # Read Device Key and save 100 print('Reading public key from device') 101 key=cytools.read_public_key(1, "jwk") 102 print("key: {}".format(key)) 103 if not key: 104 print('Error: Cannot read device public key.') 105 return 1 106 107 pub_key_json = 'keys/device_pub_key.json' 108 109 with open(pub_key_json, 'w') as json_file: 110 json.dump(key, json_file) 111 112 # Change from JWK to PEM 113 pub_key_pem = 'keys/device_pub_key.pem' 114 if os.path.exists(pub_key_json) and os.stat(pub_key_json).st_size > 0: 115 pem = PemKey(pub_key_json) 116 pem.save(pub_key_pem, private_key=False) 117 else: 118 print('Failed to read device public key') 119 return 1 120 print('Device public key has been read successfully.') 121 return 0 122 123 124def generate_device_cert(dev_serial_num, 125 dev_pub_key_path="keys/device_pub_key.pem", 126 ca_priv_key_path="certificates/rootCA.key", 127 ca_cert_path="certificates/rootCA.pem"): 128 129 if True: 130 # read device public key from previously read from the device 131 dev_pub_key = crypto.load_publickey(crypto.FILETYPE_PEM, 132 open(dev_pub_key_path, 'r').read()) 133 else: 134 # for development only, use public key from self generated private key 135 dev_priv_key = crypto.load_privatekey(crypto.FILETYPE_ASN1, 136 open("keys/device_priv_key.der", 137 'rb').read()) 138 dev_pub_key = crypto.load_publickey(crypto.FILETYPE_PEM, 139 crypto.dump_publickey(crypto.FILETYPE_PEM, 140 dev_priv_key)) 141 ca_privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, 142 open(ca_priv_key_path, 'r').read()) 143 ca_cert = crypto.load_certificate(crypto.FILETYPE_PEM, 144 open(ca_cert_path, 'r').read()) 145 146 # create cert signed by ca_cert 147 cert = crypto.X509() 148 cert.set_subject(ca_cert.get_subject()) 149 cert.get_subject().CN = cytools.target_name.upper() + '-' + str(dev_serial_num) 150 cert.set_serial_number(int(dev_serial_num)) 151 cert.gmtime_adj_notBefore(0) 152 cert.gmtime_adj_notAfter(10*365*24*60*60) 153 cert.set_issuer(ca_cert.get_subject()) 154 cert.set_pubkey(dev_pub_key) 155 cert.sign(ca_privatekey, 'sha256') 156 157 open("certificates/device_cert.pem", "wb")\ 158 .write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) 159 open("certificates/device_cert.der", "wb")\ 160 .write(crypto.dump_certificate(crypto.FILETYPE_ASN1, cert)) 161 print('Device certificate generated successfully.') 162 return 0 163 164 165def create_provisioning_packet(): 166 cytools.create_provisioning_packet() 167 168 169def re_provision_device(device, policy): 170 cytools.re_provision_device() 171 172 173def erase_flash(addr, size): 174 tool = ProgrammingTool.create(cytools.PROGRAMMING_TOOL) 175 tool.connect(target_name=cytools.target_name, ap='cm0') 176 tool.set_ap(AP.CM0) 177 print('Erasing address {hex(addr)}, size {hex(size)} ...') 178 tool.erase(addr, size) 179 print('Erasing complete\n') 180 tool.reset(ResetType.HW) 181 sleep(3) 182 tool.disconnect() 183 return 0 184 185 186def exec_shell_command(cmd): 187 if not isinstance(cmd, list) or not cmd: 188 raise Exception("Command must be in a non-empty list") 189 190 output = [] 191 print("Executing command: {}".format(' '.join(cmd))) 192 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, 193 stderr=subprocess.STDOUT) 194 for line in iter(p.stdout.readline, b''): 195 output.append(line.decode('utf-8')) 196 print("{}".format(line.decode('utf-8')), end='') 197 p.stdout.close() 198 ret = p.wait() 199 print("Command completed (ret={})".format(ret)) 200 return ret, ''.join(output) 201 202 203def main(argv): 204 205 create_signing_keys = False 206 207 options = myargs(argv) 208 print("options: {}\r\n".format(options)) 209 210 if not options.policy_file: 211 options.policy_file = 'policy_multi_CM0_CM4_tfm_dev_certs.json' 212 options.policy_file = os.path.join('policy', 213 options.policy_file) 214 answer = \ 215 input('Policy file name was not provided, use default {}? (Y/n): ' 216 .format(options.policy_file)) 217 if answer == 'N' or answer == 'n': 218 print("Please specify policy as a parameter: `-p <policy>`") 219 exit(1) 220 if not os.path.isfile(options.policy_file): 221 print("Policy file {} doesn't exit.".format(options.policy_file)) 222 exit(1) 223 224 if not options.device: 225 options.device = "cy8ckit-064s0s2-4343w" 226 answer = input("\r\nDevice is not provided, use default {}? (Y/n): " 227 .format(options.device)) 228 if answer == 'N' or answer == 'n': 229 print("Please specify device as a parameter: `-d <device>'") 230 exit(1) 231 232 # Create cysecuretools object 233 global cytools 234 cytools = CySecureTools(options.device, options.policy_file) 235 236 if not options.serial_number: 237 dev_serial_num = \ 238 input("\r\nSelect unique device serial number for {} (digits only):\n" 239 .format(cytools.target_name.upper())) 240 if not dev_serial_num.isnumeric(): 241 print('Error: device serial number not number') 242 exit(1) 243 else: 244 dev_serial_num = options.serial_number 245 246 # signing keys 247 if not options.new_keys and not options.existing_keys: 248 answer = \ 249 input('\r\nDo you want to create a new set of keys (y/N): ') 250 if answer == 'Y' or answer == 'y': 251 print('Will create new keys.') 252 create_signing_keys = True 253 else: 254 # TBD: check if the keys exist (open json, read keys) 255 print('Will use existing keys.') 256 else: 257 if options.new_keys: 258 create_signing_keys = True 259 else: 260 create_signing_keys = False 261 262 print("\r\n") 263 print("##################################################################") 264 print("Current configuration:") 265 print("Policy file: {}".format(options.policy_file)) 266 print("Device: {}".format(options.device)) 267 print("Serial Number: {}".format(dev_serial_num)) 268 print("Create new signing keys: {}".format(create_signing_keys)) 269 print("##################################################################") 270 print("\r\n") 271 print("!!! Make sure the board is in the DAPLink mode (Mode LED blinks at 2Hz) !!!") 272 print("\r\n") 273 274 if not options.force_reprov: 275 answer = input('Reprovision the device. Are you sure? (y/N): ') 276 if answer is not 'y' and answer is not 'Y': 277 print('Reprovision skipped.') 278 exit(1) 279 280 if create_signing_keys == True: 281 print('Creating new signing keys.') 282 create_app_keys(overwrite=True) 283 284 # invalidate SPE image in Flash so it won't run. 285 ret = erase_flash(0x10000000, 0x1000) 286 if ret != 0: 287 exit(1) 288 289 ret = read_device_pub_key() 290 if ret != 0: 291 exit(1) 292 293 ret = generate_device_cert(dev_serial_num) 294 if ret != 0: 295 exit(1) 296 297 create_provisioning_packet() 298 299 re_provision_device(options.device, options.policy_file) 300 301 exit(0) 302 303 304if __name__ == "__main__": 305 main(sys.argv[1:]) 306