1#!/usr/bin/env python 2# 3# ESP32 x509 certificate bundle generation utility 4# 5# Converts PEM and DER certificates to a custom bundle format which stores just the 6# subject name and public key to reduce space 7# 8# The bundle will have the format: number of certificates; crt 1 subject name length; crt 1 public key length; 9# crt 1 subject name; crt 1 public key; crt 2... 10# 11# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD 12# SPDX-License-Identifier: Apache-2.0 13 14from __future__ import with_statement 15 16import argparse 17import csv 18import os 19import re 20import struct 21import sys 22from io import open 23 24try: 25 from cryptography import x509 26 from cryptography.hazmat.backends import default_backend 27 from cryptography.hazmat.primitives import serialization 28except ImportError: 29 print('The cryptography package is not installed.' 30 'Please refer to the Get Started section of the ESP-IDF Programming Guide for ' 31 'setting up the required packages.') 32 raise 33 34ca_bundle_bin_file = 'x509_crt_bundle' 35 36quiet = False 37 38 39def status(msg): 40 """ Print status message to stderr """ 41 if not quiet: 42 critical(msg) 43 44 45def critical(msg): 46 """ Print critical message to stderr """ 47 sys.stderr.write('gen_crt_bundle.py: ') 48 sys.stderr.write(msg) 49 sys.stderr.write('\n') 50 51 52class CertificateBundle: 53 def __init__(self): 54 self.certificates = [] 55 self.compressed_crts = [] 56 57 if os.path.isfile(ca_bundle_bin_file): 58 os.remove(ca_bundle_bin_file) 59 60 def add_from_path(self, crts_path): 61 62 found = False 63 for file_path in os.listdir(crts_path): 64 found |= self.add_from_file(os.path.join(crts_path, file_path)) 65 66 if found is False: 67 raise InputError('No valid x509 certificates found in %s' % crts_path) 68 69 def add_from_file(self, file_path): 70 try: 71 if file_path.endswith('.pem'): 72 status('Parsing certificates from %s' % file_path) 73 with open(file_path, 'r', encoding='utf-8') as f: 74 crt_str = f.read() 75 self.add_from_pem(crt_str) 76 return True 77 78 elif file_path.endswith('.der'): 79 status('Parsing certificates from %s' % file_path) 80 with open(file_path, 'rb') as f: 81 crt_str = f.read() 82 self.add_from_der(crt_str) 83 return True 84 85 except ValueError: 86 critical('Invalid certificate in %s' % file_path) 87 raise InputError('Invalid certificate') 88 89 return False 90 91 def add_from_pem(self, crt_str): 92 """ A single PEM file may have multiple certificates """ 93 94 crt = '' 95 count = 0 96 start = False 97 98 for strg in crt_str.splitlines(True): 99 if strg == '-----BEGIN CERTIFICATE-----\n' and start is False: 100 crt = '' 101 start = True 102 elif strg == '-----END CERTIFICATE-----\n' and start is True: 103 crt += strg + '\n' 104 start = False 105 self.certificates.append(x509.load_pem_x509_certificate(crt.encode(), default_backend())) 106 count += 1 107 if start is True: 108 crt += strg 109 110 if count == 0: 111 raise InputError('No certificate found') 112 113 status('Successfully added %d certificates' % count) 114 115 def add_from_der(self, crt_str): 116 self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend())) 117 status('Successfully added 1 certificate') 118 119 def create_bundle(self): 120 # Sort certificates in order to do binary search when looking up certificates 121 self.certificates = sorted(self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend())) 122 123 bundle = struct.pack('>H', len(self.certificates)) 124 125 for crt in self.certificates: 126 """ Read the public key as DER format """ 127 pub_key = crt.public_key() 128 pub_key_der = pub_key.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo) 129 130 """ Read the subject name as DER format """ 131 sub_name_der = crt.subject.public_bytes(default_backend()) 132 133 name_len = len(sub_name_der) 134 key_len = len(pub_key_der) 135 len_data = struct.pack('>HH', name_len, key_len) 136 137 bundle += len_data 138 bundle += sub_name_der 139 bundle += pub_key_der 140 141 return bundle 142 143 def add_with_filter(self, crts_path, filter_path): 144 145 filter_set = set() 146 with open(filter_path, 'r', encoding='utf-8') as f: 147 csv_reader = csv.reader(f, delimiter=',') 148 149 # Skip header 150 next(csv_reader) 151 for row in csv_reader: 152 filter_set.add(row[1]) 153 154 status('Parsing certificates from %s' % crts_path) 155 crt_str = [] 156 with open(crts_path, 'r', encoding='utf-8') as f: 157 crt_str = f.read() 158 159 # Split all certs into a list of (name, certificate string) tuples 160 pem_crts = re.findall(r'(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)', crt_str, re.MULTILINE) 161 162 filtered_crts = '' 163 for name, crt in pem_crts: 164 if name in filter_set: 165 filtered_crts += crt 166 167 self.add_from_pem(filtered_crts) 168 169 170class InputError(RuntimeError): 171 def __init__(self, e): 172 super(InputError, self).__init__(e) 173 174 175def main(): 176 global quiet 177 178 parser = argparse.ArgumentParser(description='ESP-IDF x509 certificate bundle utility') 179 180 parser.add_argument('--quiet', '-q', help="Don't print non-critical status messages to stderr", action='store_true') 181 parser.add_argument('--input', '-i', nargs='+', required=True, 182 help='Paths to the custom certificate folders or files to parse, parses all .pem or .der files') 183 parser.add_argument('--filter', '-f', help='Path to CSV-file where the second columns contains the name of the certificates \ 184 that should be included from cacrt_all.pem') 185 186 args = parser.parse_args() 187 188 quiet = args.quiet 189 190 bundle = CertificateBundle() 191 192 for path in args.input: 193 if os.path.isfile(path): 194 if os.path.basename(path) == 'cacrt_all.pem' and args.filter: 195 bundle.add_with_filter(path, args.filter) 196 else: 197 bundle.add_from_file(path) 198 elif os.path.isdir(path): 199 bundle.add_from_path(path) 200 else: 201 raise InputError('Invalid --input=%s, is neither file nor folder' % args.input) 202 203 status('Successfully added %d certificates in total' % len(bundle.certificates)) 204 205 crt_bundle = bundle.create_bundle() 206 207 with open(ca_bundle_bin_file, 'wb') as f: 208 f.write(crt_bundle) 209 210 211if __name__ == '__main__': 212 try: 213 main() 214 except InputError as e: 215 print(e) 216 sys.exit(2) 217