1#!/usr/bin/env python 2# 3# ESP32 x509 certificate bundle generation utility 4# 5# Converts PEM and DER certificates to a custom bundle format which stores just the 6# subject name and public key to reduce space 7# 8# The bundle will have the format: number of certificates; crt 1 subject name length; crt 1 public key length; 9# crt 1 subject name; crt 1 public key; crt 2... 10# 11# Copyright 2018-2019 Espressif Systems (Shanghai) PTE LTD 12# 13# Licensed under the Apache License, Version 2.0 (the "License"); 14# you may not use this file except in compliance with the License. 15# You may obtain a copy of the License at 16# 17# http:#www.apache.org/licenses/LICENSE-2.0 18# 19# Unless required by applicable law or agreed to in writing, software 20# distributed under the License is distributed on an "AS IS" BASIS, 21# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 22# See the License for the specific language governing permissions and 23# limitations under the License. 24 25from __future__ import with_statement 26 27import argparse 28import csv 29import os 30import re 31import struct 32import sys 33from io import open 34 35try: 36 from cryptography import x509 37 from cryptography.hazmat.backends import default_backend 38 from cryptography.hazmat.primitives import serialization 39except ImportError: 40 print('The cryptography package is not installed.' 41 'Please refer to the Get Started section of the ESP-IDF Programming Guide for ' 42 'setting up the required packages.') 43 raise 44 45ca_bundle_bin_file = 'x509_crt_bundle' 46 47quiet = False 48 49 50def status(msg): 51 """ Print status message to stderr """ 52 if not quiet: 53 critical(msg) 54 55 56def critical(msg): 57 """ Print critical message to stderr """ 58 sys.stderr.write('gen_crt_bundle.py: ') 59 sys.stderr.write(msg) 60 sys.stderr.write('\n') 61 62 63class CertificateBundle: 64 def __init__(self): 65 self.certificates = [] 66 self.compressed_crts = [] 67 68 if os.path.isfile(ca_bundle_bin_file): 69 os.remove(ca_bundle_bin_file) 70 71 def add_from_path(self, crts_path): 72 73 found = False 74 for file_path in os.listdir(crts_path): 75 found |= self.add_from_file(os.path.join(crts_path, file_path)) 76 77 if found is False: 78 raise InputError('No valid x509 certificates found in %s' % crts_path) 79 80 def add_from_file(self, file_path): 81 try: 82 if file_path.endswith('.pem'): 83 status('Parsing certificates from %s' % file_path) 84 with open(file_path, 'r', encoding='utf-8') as f: 85 crt_str = f.read() 86 self.add_from_pem(crt_str) 87 return True 88 89 elif file_path.endswith('.der'): 90 status('Parsing certificates from %s' % file_path) 91 with open(file_path, 'rb') as f: 92 crt_str = f.read() 93 self.add_from_der(crt_str) 94 return True 95 96 except ValueError: 97 critical('Invalid certificate in %s' % file_path) 98 raise InputError('Invalid certificate') 99 100 return False 101 102 def add_from_pem(self, crt_str): 103 """ A single PEM file may have multiple certificates """ 104 105 crt = '' 106 count = 0 107 start = False 108 109 for strg in crt_str.splitlines(True): 110 if strg == '-----BEGIN CERTIFICATE-----\n' and start is False: 111 crt = '' 112 start = True 113 elif strg == '-----END CERTIFICATE-----\n' and start is True: 114 crt += strg + '\n' 115 start = False 116 self.certificates.append(x509.load_pem_x509_certificate(crt.encode(), default_backend())) 117 count += 1 118 if start is True: 119 crt += strg 120 121 if(count == 0): 122 raise InputError('No certificate found') 123 124 status('Successfully added %d certificates' % count) 125 126 def add_from_der(self, crt_str): 127 self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend())) 128 status('Successfully added 1 certificate') 129 130 def create_bundle(self): 131 # Sort certificates in order to do binary search when looking up certificates 132 self.certificates = sorted(self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend())) 133 134 bundle = struct.pack('>H', len(self.certificates)) 135 136 for crt in self.certificates: 137 """ Read the public key as DER format """ 138 pub_key = crt.public_key() 139 pub_key_der = pub_key.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo) 140 141 """ Read the subject name as DER format """ 142 sub_name_der = crt.subject.public_bytes(default_backend()) 143 144 name_len = len(sub_name_der) 145 key_len = len(pub_key_der) 146 len_data = struct.pack('>HH', name_len, key_len) 147 148 bundle += len_data 149 bundle += sub_name_der 150 bundle += pub_key_der 151 152 return bundle 153 154 def add_with_filter(self, crts_path, filter_path): 155 156 filter_set = set() 157 with open(filter_path, 'r', encoding='utf-8') as f: 158 csv_reader = csv.reader(f, delimiter=',') 159 160 # Skip header 161 next(csv_reader) 162 for row in csv_reader: 163 filter_set.add(row[1]) 164 165 status('Parsing certificates from %s' % crts_path) 166 crt_str = [] 167 with open(crts_path, 'r', encoding='utf-8') as f: 168 crt_str = f.read() 169 170 # Split all certs into a list of (name, certificate string) tuples 171 pem_crts = re.findall(r'(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)', crt_str, re.MULTILINE) 172 173 filtered_crts = '' 174 for name, crt in pem_crts: 175 if name in filter_set: 176 filtered_crts += crt 177 178 self.add_from_pem(filtered_crts) 179 180 181class InputError(RuntimeError): 182 def __init__(self, e): 183 super(InputError, self).__init__(e) 184 185 186def main(): 187 global quiet 188 189 parser = argparse.ArgumentParser(description='ESP-IDF x509 certificate bundle utility') 190 191 parser.add_argument('--quiet', '-q', help="Don't print non-critical status messages to stderr", action='store_true') 192 parser.add_argument('--input', '-i', nargs='+', required=True, 193 help='Paths to the custom certificate folders or files to parse, parses all .pem or .der files') 194 parser.add_argument('--filter', '-f', help='Path to CSV-file where the second columns contains the name of the certificates \ 195 that should be included from cacrt_all.pem') 196 197 args = parser.parse_args() 198 199 quiet = args.quiet 200 201 bundle = CertificateBundle() 202 203 for path in args.input: 204 if os.path.isfile(path): 205 if os.path.basename(path) == 'cacrt_all.pem' and args.filter: 206 bundle.add_with_filter(path, args.filter) 207 else: 208 bundle.add_from_file(path) 209 elif os.path.isdir(path): 210 bundle.add_from_path(path) 211 else: 212 raise InputError('Invalid --input=%s, is neither file nor folder' % args.input) 213 214 status('Successfully added %d certificates in total' % len(bundle.certificates)) 215 216 crt_bundle = bundle.create_bundle() 217 218 with open(ca_bundle_bin_file, 'wb') as f: 219 f.write(crt_bundle) 220 221 222if __name__ == '__main__': 223 try: 224 main() 225 except InputError as e: 226 print(e) 227 sys.exit(2) 228