1#!/usr/bin/env python
2#
3# ESP32 x509 certificate bundle generation utility
4#
5# Converts PEM and DER certificates to a custom bundle format which stores just the
6# subject name and public key to reduce space
7#
8# The bundle will have the format: number of certificates; crt 1 subject name length; crt 1 public key length;
9# crt 1 subject name; crt 1 public key; crt 2...
10#
11# SPDX-FileCopyrightText: 2018-2022 Espressif Systems (Shanghai) CO LTD
12# SPDX-License-Identifier: Apache-2.0
13
14from __future__ import with_statement
15
16import argparse
17import csv
18import os
19import re
20import struct
21import sys
22from io import open
23
24try:
25    from cryptography import x509
26    from cryptography.hazmat.backends import default_backend
27    from cryptography.hazmat.primitives import serialization
28except ImportError:
29    print('The cryptography package is not installed.'
30          'Please refer to the Get Started section of the ESP-IDF Programming Guide for '
31          'setting up the required packages.')
32    raise
33
34ca_bundle_bin_file = 'x509_crt_bundle'
35
36quiet = False
37
38
39def status(msg):
40    """ Print status message to stderr """
41    if not quiet:
42        critical(msg)
43
44
45def critical(msg):
46    """ Print critical message to stderr """
47    sys.stderr.write('gen_crt_bundle.py: ')
48    sys.stderr.write(msg)
49    sys.stderr.write('\n')
50
51
52class CertificateBundle:
53    def __init__(self):
54        self.certificates = []
55        self.compressed_crts = []
56
57        if os.path.isfile(ca_bundle_bin_file):
58            os.remove(ca_bundle_bin_file)
59
60    def add_from_path(self, crts_path):
61
62        found = False
63        for file_path in os.listdir(crts_path):
64            found |= self.add_from_file(os.path.join(crts_path, file_path))
65
66        if found is False:
67            raise InputError('No valid x509 certificates found in %s' % crts_path)
68
69    def add_from_file(self, file_path):
70        try:
71            if file_path.endswith('.pem'):
72                status('Parsing certificates from %s' % file_path)
73                with open(file_path, 'r', encoding='utf-8') as f:
74                    crt_str = f.read()
75                    self.add_from_pem(crt_str)
76                    return True
77
78            elif file_path.endswith('.der'):
79                status('Parsing certificates from %s' % file_path)
80                with open(file_path, 'rb') as f:
81                    crt_str = f.read()
82                    self.add_from_der(crt_str)
83                    return True
84
85        except ValueError:
86            critical('Invalid certificate in %s' % file_path)
87            raise InputError('Invalid certificate')
88
89        return False
90
91    def add_from_pem(self, crt_str):
92        """ A single PEM file may have multiple certificates """
93
94        crt = ''
95        count = 0
96        start = False
97
98        for strg in crt_str.splitlines(True):
99            if strg == '-----BEGIN CERTIFICATE-----\n' and start is False:
100                crt = ''
101                start = True
102            elif strg == '-----END CERTIFICATE-----\n' and start is True:
103                crt += strg + '\n'
104                start = False
105                self.certificates.append(x509.load_pem_x509_certificate(crt.encode(), default_backend()))
106                count += 1
107            if start is True:
108                crt += strg
109
110        if count == 0:
111            raise InputError('No certificate found')
112
113        status('Successfully added %d certificates' % count)
114
115    def add_from_der(self, crt_str):
116        self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend()))
117        status('Successfully added 1 certificate')
118
119    def create_bundle(self):
120        # Sort certificates in order to do binary search when looking up certificates
121        self.certificates = sorted(self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend()))
122
123        bundle = struct.pack('>H', len(self.certificates))
124
125        for crt in self.certificates:
126            """ Read the public key as DER format """
127            pub_key = crt.public_key()
128            pub_key_der = pub_key.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo)
129
130            """ Read the subject name as DER format """
131            sub_name_der = crt.subject.public_bytes(default_backend())
132
133            name_len = len(sub_name_der)
134            key_len = len(pub_key_der)
135            len_data = struct.pack('>HH', name_len, key_len)
136
137            bundle += len_data
138            bundle += sub_name_der
139            bundle += pub_key_der
140
141        return bundle
142
143    def add_with_filter(self, crts_path, filter_path):
144
145        filter_set = set()
146        with open(filter_path, 'r', encoding='utf-8') as f:
147            csv_reader = csv.reader(f, delimiter=',')
148
149            # Skip header
150            next(csv_reader)
151            for row in csv_reader:
152                filter_set.add(row[1])
153
154        status('Parsing certificates from %s' % crts_path)
155        crt_str = []
156        with open(crts_path, 'r', encoding='utf-8') as f:
157            crt_str = f.read()
158
159            # Split all certs into a list of (name, certificate string) tuples
160            pem_crts = re.findall(r'(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)', crt_str, re.MULTILINE)
161
162            filtered_crts = ''
163            for name, crt in pem_crts:
164                if name in filter_set:
165                    filtered_crts += crt
166
167        self.add_from_pem(filtered_crts)
168
169
170class InputError(RuntimeError):
171    def __init__(self, e):
172        super(InputError, self).__init__(e)
173
174
175def main():
176    global quiet
177
178    parser = argparse.ArgumentParser(description='ESP-IDF x509 certificate bundle utility')
179
180    parser.add_argument('--quiet', '-q', help="Don't print non-critical status messages to stderr", action='store_true')
181    parser.add_argument('--input', '-i', nargs='+', required=True,
182                        help='Paths to the custom certificate folders or files to parse, parses all .pem or .der files')
183    parser.add_argument('--filter', '-f', help='Path to CSV-file where the second columns contains the name of the certificates \
184                        that should be included from cacrt_all.pem')
185
186    args = parser.parse_args()
187
188    quiet = args.quiet
189
190    bundle = CertificateBundle()
191
192    for path in args.input:
193        if os.path.isfile(path):
194            if os.path.basename(path) == 'cacrt_all.pem' and args.filter:
195                bundle.add_with_filter(path, args.filter)
196            else:
197                bundle.add_from_file(path)
198        elif os.path.isdir(path):
199            bundle.add_from_path(path)
200        else:
201            raise InputError('Invalid --input=%s, is neither file nor folder' % args.input)
202
203    status('Successfully added %d certificates in total' % len(bundle.certificates))
204
205    crt_bundle = bundle.create_bundle()
206
207    with open(ca_bundle_bin_file, 'wb') as f:
208        f.write(crt_bundle)
209
210
211if __name__ == '__main__':
212    try:
213        main()
214    except InputError as e:
215        print(e)
216        sys.exit(2)
217