1#!/usr/bin/env python
2#
3# ESP32 x509 certificate bundle generation utility
4#
5# Converts PEM and DER certificates to a custom bundle format which stores just the
6# subject name and public key to reduce space
7#
8# The bundle will have the format: number of certificates; crt 1 subject name length; crt 1 public key length;
9# crt 1 subject name; crt 1 public key; crt 2...
10#
11# Copyright 2018-2019 Espressif Systems (Shanghai) PTE LTD
12#
13# Licensed under the Apache License, Version 2.0 (the "License");
14# you may not use this file except in compliance with the License.
15# You may obtain a copy of the License at
16#
17#     http:#www.apache.org/licenses/LICENSE-2.0
18#
19# Unless required by applicable law or agreed to in writing, software
20# distributed under the License is distributed on an "AS IS" BASIS,
21# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
22# See the License for the specific language governing permissions and
23# limitations under the License.
24
25from __future__ import with_statement
26
27import argparse
28import csv
29import os
30import re
31import struct
32import sys
33from io import open
34
35try:
36    from cryptography import x509
37    from cryptography.hazmat.backends import default_backend
38    from cryptography.hazmat.primitives import serialization
39except ImportError:
40    print('The cryptography package is not installed.'
41          'Please refer to the Get Started section of the ESP-IDF Programming Guide for '
42          'setting up the required packages.')
43    raise
44
45ca_bundle_bin_file = 'x509_crt_bundle'
46
47quiet = False
48
49
50def status(msg):
51    """ Print status message to stderr """
52    if not quiet:
53        critical(msg)
54
55
56def critical(msg):
57    """ Print critical message to stderr """
58    sys.stderr.write('gen_crt_bundle.py: ')
59    sys.stderr.write(msg)
60    sys.stderr.write('\n')
61
62
63class CertificateBundle:
64    def __init__(self):
65        self.certificates = []
66        self.compressed_crts = []
67
68        if os.path.isfile(ca_bundle_bin_file):
69            os.remove(ca_bundle_bin_file)
70
71    def add_from_path(self, crts_path):
72
73        found = False
74        for file_path in os.listdir(crts_path):
75            found |= self.add_from_file(os.path.join(crts_path, file_path))
76
77        if found is False:
78            raise InputError('No valid x509 certificates found in %s' % crts_path)
79
80    def add_from_file(self, file_path):
81        try:
82            if file_path.endswith('.pem'):
83                status('Parsing certificates from %s' % file_path)
84                with open(file_path, 'r', encoding='utf-8') as f:
85                    crt_str = f.read()
86                    self.add_from_pem(crt_str)
87                    return True
88
89            elif file_path.endswith('.der'):
90                status('Parsing certificates from %s' % file_path)
91                with open(file_path, 'rb') as f:
92                    crt_str = f.read()
93                    self.add_from_der(crt_str)
94                    return True
95
96        except ValueError:
97            critical('Invalid certificate in %s' % file_path)
98            raise InputError('Invalid certificate')
99
100        return False
101
102    def add_from_pem(self, crt_str):
103        """ A single PEM file may have multiple certificates """
104
105        crt = ''
106        count = 0
107        start = False
108
109        for strg in crt_str.splitlines(True):
110            if strg == '-----BEGIN CERTIFICATE-----\n' and start is False:
111                crt = ''
112                start = True
113            elif strg == '-----END CERTIFICATE-----\n' and start is True:
114                crt += strg + '\n'
115                start = False
116                self.certificates.append(x509.load_pem_x509_certificate(crt.encode(), default_backend()))
117                count += 1
118            if start is True:
119                crt += strg
120
121        if(count == 0):
122            raise InputError('No certificate found')
123
124        status('Successfully added %d certificates' % count)
125
126    def add_from_der(self, crt_str):
127        self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend()))
128        status('Successfully added 1 certificate')
129
130    def create_bundle(self):
131        # Sort certificates in order to do binary search when looking up certificates
132        self.certificates = sorted(self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend()))
133
134        bundle = struct.pack('>H', len(self.certificates))
135
136        for crt in self.certificates:
137            """ Read the public key as DER format """
138            pub_key = crt.public_key()
139            pub_key_der = pub_key.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo)
140
141            """ Read the subject name as DER format """
142            sub_name_der = crt.subject.public_bytes(default_backend())
143
144            name_len = len(sub_name_der)
145            key_len = len(pub_key_der)
146            len_data = struct.pack('>HH', name_len, key_len)
147
148            bundle += len_data
149            bundle += sub_name_der
150            bundle += pub_key_der
151
152        return bundle
153
154    def add_with_filter(self, crts_path, filter_path):
155
156        filter_set = set()
157        with open(filter_path, 'r', encoding='utf-8') as f:
158            csv_reader = csv.reader(f, delimiter=',')
159
160            # Skip header
161            next(csv_reader)
162            for row in csv_reader:
163                filter_set.add(row[1])
164
165        status('Parsing certificates from %s' % crts_path)
166        crt_str = []
167        with open(crts_path, 'r', encoding='utf-8') as f:
168            crt_str = f.read()
169
170            # Split all certs into a list of (name, certificate string) tuples
171            pem_crts = re.findall(r'(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)', crt_str, re.MULTILINE)
172
173            filtered_crts = ''
174            for name, crt in pem_crts:
175                if name in filter_set:
176                    filtered_crts += crt
177
178        self.add_from_pem(filtered_crts)
179
180
181class InputError(RuntimeError):
182    def __init__(self, e):
183        super(InputError, self).__init__(e)
184
185
186def main():
187    global quiet
188
189    parser = argparse.ArgumentParser(description='ESP-IDF x509 certificate bundle utility')
190
191    parser.add_argument('--quiet', '-q', help="Don't print non-critical status messages to stderr", action='store_true')
192    parser.add_argument('--input', '-i', nargs='+', required=True,
193                        help='Paths to the custom certificate folders or files to parse, parses all .pem or .der files')
194    parser.add_argument('--filter', '-f', help='Path to CSV-file where the second columns contains the name of the certificates \
195                        that should be included from cacrt_all.pem')
196
197    args = parser.parse_args()
198
199    quiet = args.quiet
200
201    bundle = CertificateBundle()
202
203    for path in args.input:
204        if os.path.isfile(path):
205            if os.path.basename(path) == 'cacrt_all.pem' and args.filter:
206                bundle.add_with_filter(path, args.filter)
207            else:
208                bundle.add_from_file(path)
209        elif os.path.isdir(path):
210            bundle.add_from_path(path)
211        else:
212            raise InputError('Invalid --input=%s, is neither file nor folder' % args.input)
213
214    status('Successfully added %d certificates in total' % len(bundle.certificates))
215
216    crt_bundle = bundle.create_bundle()
217
218    with open(ca_bundle_bin_file, 'wb') as f:
219        f.write(crt_bundle)
220
221
222if __name__ == '__main__':
223    try:
224        main()
225    except InputError as e:
226        print(e)
227        sys.exit(2)
228