1#!/usr/bin/env python
2#
3# esp-idf NVS partition generation tool. Tool helps in generating NVS-compatible
4# partition binary, with key-value pair entries provided via a CSV file.
5#
6# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
7#
8# Licensed under the Apache License, Version 2.0 (the "License");
9# you may not use this file except in compliance with the License.
10# You may obtain a copy of the License at
11#
12#     http://www.apache.org/licenses/LICENSE-2.0
13#
14# Unless required by applicable law or agreed to in writing, software
15# distributed under the License is distributed on an "AS IS" BASIS,
16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17# See the License for the specific language governing permissions and
18# limitations under the License.
19#
20
21from __future__ import division, print_function
22
23import argparse
24import array
25import binascii
26import codecs
27import datetime
28import distutils.dir_util
29import os
30import random
31import struct
32import sys
33import zlib
34from builtins import bytes, int, range
35from io import open
36
37from future.moves.itertools import zip_longest
38
39try:
40    from cryptography.hazmat.backends import default_backend
41    from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
42except ImportError:
43    print('The cryptography package is not installed.'
44          'Please refer to the Get Started section of the ESP-IDF Programming Guide for '
45          'setting up the required packages.')
46    raise
47
48VERSION1_PRINT = 'V1 - Multipage Blob Support Disabled'
49VERSION2_PRINT = 'V2 - Multipage Blob Support Enabled'
50
51
52def reverse_hexbytes(addr_tmp):
53    addr = []
54    reversed_bytes = ''
55    for i in range(0, len(addr_tmp), 2):
56        addr.append(addr_tmp[i:i + 2])
57    reversed_bytes = ''.join(reversed(addr))
58
59    return reversed_bytes
60
61
62""" Class for standard NVS page structure """
63
64
65class Page(object):
66    PAGE_PARAMS = {
67        'max_size': 4096,
68        'max_old_blob_size': 1984,
69        'max_new_blob_size': 4000,
70        'max_entries': 126
71    }
72
73    # Item type codes
74    U8   = 0x01
75    I8   = 0x11
76    U16  = 0x02
77    I16  = 0x12
78    U32  = 0x04
79    I32  = 0x14
80    U64  = 0x08
81    I64  = 0x18
82    SZ   = 0x21
83    BLOB = 0x41
84    BLOB_DATA = 0x42
85    BLOB_IDX = 0x48
86
87    # Few Page constants
88    HEADER_SIZE = 32
89    BITMAPARRAY_OFFSET = 32
90    BITMAPARRAY_SIZE_IN_BYTES = 32
91    FIRST_ENTRY_OFFSET = 64
92    SINGLE_ENTRY_SIZE = 32
93    CHUNK_ANY = 0xFF
94    ACTIVE = 0xFFFFFFFE
95    FULL = 0xFFFFFFFC
96    VERSION1 = 0xFF
97    VERSION2 = 0xFE
98
99    def __init__(self, page_num, version, is_rsrv_page=False):
100        self.entry_num = 0
101        self.bitmap_array = array.array('B')
102        self.version = version
103        self.page_buf = bytearray(b'\xff') * Page.PAGE_PARAMS['max_size']
104        if not is_rsrv_page:
105            self.bitmap_array = self.create_bitmap_array()
106            self.set_header(page_num, version)
107
108    def set_header(self, page_num, version):
109        # set page state to active
110        page_header = bytearray(b'\xff') * 32
111        page_state_active_seq = Page.ACTIVE
112        struct.pack_into('<I', page_header, 0,  page_state_active_seq)
113        # set page sequence number
114        struct.pack_into('<I', page_header, 4, page_num)
115        # set version
116        if version == Page.VERSION2:
117            page_header[8] = Page.VERSION2
118        elif version == Page.VERSION1:
119            page_header[8] = Page.VERSION1
120        # set header's CRC
121        crc_data = bytes(page_header[4:28])
122        crc = zlib.crc32(crc_data, 0xFFFFFFFF)
123        struct.pack_into('<I', page_header, 28, crc & 0xFFFFFFFF)
124        self.page_buf[0:len(page_header)] = page_header
125
126    def create_bitmap_array(self):
127        bitarray = array.array('B')
128        charsize = 32  # bitmaparray has 256 bits, hence 32 bytes
129        fill = 255  # Fill all 8 bits with 1's
130        bitarray.extend((fill,) * charsize)
131        return bitarray
132
133    def write_bitmaparray(self):
134        bitnum = self.entry_num * 2
135        byte_idx = bitnum // 8  # Find byte index in the array
136        bit_offset = bitnum & 7  # Find bit offset in given byte index
137        mask = ~(1 << bit_offset)
138        self.bitmap_array[byte_idx] &= mask
139        start_idx = Page.BITMAPARRAY_OFFSET
140        end_idx = Page.BITMAPARRAY_OFFSET + Page.BITMAPARRAY_SIZE_IN_BYTES
141        self.page_buf[start_idx:end_idx] = self.bitmap_array
142
143    def encrypt_entry(self, data_arr, tweak_arr, encr_key):
144        # Encrypt 32 bytes of data using AES-XTS encryption
145        backend = default_backend()
146        plain_text = codecs.decode(data_arr, 'hex')
147        tweak = codecs.decode(tweak_arr, 'hex')
148
149        cipher = Cipher(algorithms.AES(encr_key), modes.XTS(tweak), backend=backend)
150        encryptor = cipher.encryptor()
151        encrypted_data = encryptor.update(plain_text)
152
153        return encrypted_data
154
155    def encrypt_data(self, data_input, no_of_entries, nvs_obj):
156        # Set values needed for encryption and encrypt data byte wise
157        encr_data_to_write = bytearray()
158        data_len_needed = 64  # in hex
159        tweak_len_needed = 32  # in hex
160        key_len_needed = 64
161        init_tweak_val = '0'
162        init_data_val = 'f'
163        tweak_tmp = ''
164        encr_key_input = None
165
166        # Extract encryption key and tweak key from given key input
167        if len(nvs_obj.encr_key) == key_len_needed:
168            encr_key_input = nvs_obj.encr_key
169        else:
170            encr_key_input = codecs.decode(nvs_obj.encr_key, 'hex')
171
172        rel_addr = nvs_obj.page_num * Page.PAGE_PARAMS['max_size'] + Page.FIRST_ENTRY_OFFSET
173
174        if not isinstance(data_input, bytearray):
175            byte_arr = bytearray(b'\xff') * 32
176            byte_arr[0:len(data_input)] = data_input
177            data_input = byte_arr
178
179        data_input = binascii.hexlify(data_input)
180
181        entry_no = self.entry_num
182        start_idx = 0
183        end_idx = start_idx + 64
184
185        for _ in range(0, no_of_entries):
186            # Set tweak value
187            offset = entry_no * Page.SINGLE_ENTRY_SIZE
188            addr = hex(rel_addr + offset)[2:]
189            addr_len = len(addr)
190            if addr_len > 2:
191                if not addr_len % 2:
192                    addr_tmp = addr
193                else:
194                    addr_tmp = init_tweak_val + addr
195                tweak_tmp = reverse_hexbytes(addr_tmp)
196                tweak_val = tweak_tmp + (init_tweak_val * (tweak_len_needed - (len(tweak_tmp))))
197            else:
198                tweak_val = addr + (init_tweak_val * (tweak_len_needed - len(addr)))
199
200            # Encrypt data
201            data_bytes = data_input[start_idx:end_idx]
202            if type(data_bytes) == bytes:
203                data_bytes = data_bytes.decode()
204
205            data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes)))
206            encr_data_ret = self.encrypt_entry(data_val, tweak_val, encr_key_input)
207            encr_data_to_write = encr_data_to_write + encr_data_ret
208            # Update values for encrypting next set of data bytes
209            start_idx = end_idx
210            end_idx = start_idx + 64
211            entry_no += 1
212
213        return encr_data_to_write
214
215    def write_entry_to_buf(self, data, entrycount,nvs_obj):
216        encr_data = bytearray()
217
218        if nvs_obj.encrypt:
219            encr_data_ret = self.encrypt_data(data, entrycount,nvs_obj)
220            encr_data[0:len(encr_data_ret)] = encr_data_ret
221            data = encr_data
222
223        data_offset = Page.FIRST_ENTRY_OFFSET + (Page.SINGLE_ENTRY_SIZE * self.entry_num)
224        start_idx = data_offset
225        end_idx = data_offset + len(data)
226        self.page_buf[start_idx:end_idx]  = data
227
228        # Set bitmap array for entries in current page
229        for i in range(0, entrycount):
230            self.write_bitmaparray()
231            self.entry_num += 1
232
233    def set_crc_header(self, entry_struct):
234        crc_data = bytearray(b'28')
235        crc_data[0:4] = entry_struct[0:4]
236        crc_data[4:28] = entry_struct[8:32]
237        crc_data = bytes(crc_data)
238        crc = zlib.crc32(crc_data, 0xFFFFFFFF)
239        struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
240        return entry_struct
241
242    def write_varlen_binary_data(self, entry_struct, ns_index, key, data, data_size, total_entry_count, encoding, nvs_obj):
243        chunk_start = 0
244        chunk_count = 0
245        chunk_index = Page.CHUNK_ANY
246        offset = 0
247        remaining_size = data_size
248        tailroom = None
249
250        while True:
251            chunk_size = 0
252
253            # Get the size available in current page
254            tailroom = (Page.PAGE_PARAMS['max_entries'] - self.entry_num - 1) * Page.SINGLE_ENTRY_SIZE
255            assert tailroom >= 0, 'Page overflow!!'
256
257            # Split the binary data into two and store a chunk of available size onto curr page
258            if tailroom < remaining_size:
259                chunk_size = tailroom
260            else:
261                chunk_size = remaining_size
262
263            remaining_size = remaining_size - chunk_size
264
265            # Change type of data to BLOB_DATA
266            entry_struct[1] = Page.BLOB_DATA
267
268            # Calculate no. of entries data chunk will require
269            datachunk_rounded_size = (chunk_size + 31) & ~31
270            datachunk_entry_count = datachunk_rounded_size // 32
271            datachunk_total_entry_count = datachunk_entry_count + 1  # +1 for the entry header
272
273            # Set Span
274            entry_struct[2] = datachunk_total_entry_count
275
276            # Update the chunkIndex
277            chunk_index = chunk_start + chunk_count
278            entry_struct[3] = chunk_index
279
280            # Set data chunk
281            data_chunk = data[offset:offset + chunk_size]
282
283            # Compute CRC of data chunk
284            struct.pack_into('<H', entry_struct, 24, chunk_size)
285
286            if type(data) != bytes:
287                data_chunk = bytes(data_chunk, encoding='utf8')
288
289            crc = zlib.crc32(data_chunk, 0xFFFFFFFF)
290            struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
291
292            # compute crc of entry header
293            entry_struct = self.set_crc_header(entry_struct)
294
295            # write entry header
296            self.write_entry_to_buf(entry_struct, 1,nvs_obj)
297            # write actual data
298            self.write_entry_to_buf(data_chunk, datachunk_entry_count,nvs_obj)
299
300            chunk_count = chunk_count + 1
301
302            if remaining_size or (tailroom - chunk_size) < Page.SINGLE_ENTRY_SIZE:
303                nvs_obj.create_new_page()
304                self = nvs_obj.cur_page
305
306            offset = offset + chunk_size
307
308            # All chunks are stored, now store the index
309            if not remaining_size:
310                # Initialise data field to 0xff
311                data_array = bytearray(b'\xff') * 8
312                entry_struct[24:32] = data_array
313
314                # change type of data to BLOB_IDX
315                entry_struct[1] = Page.BLOB_IDX
316
317                # Set Span
318                entry_struct[2] = 1
319
320                # Update the chunkIndex
321                chunk_index = Page.CHUNK_ANY
322                entry_struct[3] = chunk_index
323
324                struct.pack_into('<I', entry_struct, 24, data_size)
325                entry_struct[28] = chunk_count
326                entry_struct[29] = chunk_start
327
328                # compute crc of entry header
329                entry_struct = self.set_crc_header(entry_struct)
330
331                # write last entry
332                self.write_entry_to_buf(entry_struct, 1,nvs_obj)
333                break
334
335        return entry_struct
336
337    def write_single_page_entry(self, entry_struct, data, datalen, data_entry_count, nvs_obj):
338        # compute CRC of data
339        struct.pack_into('<H', entry_struct, 24, datalen)
340
341        if type(data) != bytes:
342            data = bytes(data, encoding='utf8')
343
344        crc = zlib.crc32(data, 0xFFFFFFFF)
345        struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF)
346
347        # compute crc of entry header
348        entry_struct = self.set_crc_header(entry_struct)
349
350        # write entry header
351        self.write_entry_to_buf(entry_struct, 1, nvs_obj)
352        # write actual data
353        self.write_entry_to_buf(data, data_entry_count, nvs_obj)
354
355    """
356    Low-level function to write variable length data into page buffer. Data should be formatted
357    according to encoding specified.
358    """
359    def write_varlen_data(self, key, data, encoding, ns_index,nvs_obj):
360        # Set size of data
361        datalen = len(data)
362
363        if datalen > Page.PAGE_PARAMS['max_old_blob_size']:
364            if self.version == Page.VERSION1:
365                raise InputError(' Input File: Size (%d) exceeds max allowed length `%s` bytes for key `%s`.'
366                                 % (datalen, Page.PAGE_PARAMS['max_old_blob_size'], key))
367            else:
368                if encoding == 'string':
369                    raise InputError(' Input File: Size (%d) exceeds max allowed length `%s` bytes for key `%s`.'
370                                     % (datalen, Page.PAGE_PARAMS['max_old_blob_size'], key))
371
372        # Calculate no. of entries data will require
373        rounded_size = (datalen + 31) & ~31
374        data_entry_count = rounded_size // 32
375        total_entry_count = data_entry_count + 1  # +1 for the entry header
376
377        # Check if page is already full and new page is needed to be created right away
378        if self.entry_num >= Page.PAGE_PARAMS['max_entries']:
379            raise PageFullError()
380        elif (self.entry_num + total_entry_count) >= Page.PAGE_PARAMS['max_entries']:
381            if not (self.version == Page.VERSION2 and encoding in ['hex2bin', 'binary', 'base64']):
382                raise PageFullError()
383
384        # Entry header
385        entry_struct = bytearray(b'\xff') * 32
386        # Set Namespace Index
387        entry_struct[0] = ns_index
388        # Set Span
389        if self.version == Page.VERSION2:
390            if encoding == 'string':
391                entry_struct[2] = data_entry_count + 1
392            # Set Chunk Index
393            chunk_index = Page.CHUNK_ANY
394            entry_struct[3] = chunk_index
395        else:
396            entry_struct[2] = data_entry_count + 1
397
398        # set key
399        key_array = b'\x00' * 16
400        entry_struct[8:24] = key_array
401        entry_struct[8:8 + len(key)] = key.encode()
402
403        # set Type
404        if encoding == 'string':
405            entry_struct[1] = Page.SZ
406        elif encoding in ['hex2bin', 'binary', 'base64']:
407            entry_struct[1] = Page.BLOB
408
409        if self.version == Page.VERSION2 and (encoding in ['hex2bin', 'binary', 'base64']):
410            entry_struct = self.write_varlen_binary_data(entry_struct,ns_index,key,data,
411                                                         datalen,total_entry_count, encoding, nvs_obj)
412        else:
413            self.write_single_page_entry(entry_struct, data, datalen, data_entry_count, nvs_obj)
414
415    """ Low-level function to write data of primitive type into page buffer. """
416    def write_primitive_data(self, key, data, encoding, ns_index,nvs_obj):
417        # Check if entry exceeds max number of entries allowed per page
418        if self.entry_num >= Page.PAGE_PARAMS['max_entries']:
419            raise PageFullError()
420
421        entry_struct = bytearray(b'\xff') * 32
422        entry_struct[0] = ns_index  # namespace index
423        entry_struct[2] = 0x01  # Span
424        chunk_index = Page.CHUNK_ANY
425        entry_struct[3] = chunk_index
426
427        # write key
428        key_array = b'\x00' * 16
429        entry_struct[8:24] = key_array
430        entry_struct[8:8 + len(key)] = key.encode()
431
432        if encoding == 'u8':
433            entry_struct[1] = Page.U8
434            struct.pack_into('<B', entry_struct, 24, data)
435        elif encoding == 'i8':
436            entry_struct[1] = Page.I8
437            struct.pack_into('<b', entry_struct, 24, data)
438        elif encoding == 'u16':
439            entry_struct[1] = Page.U16
440            struct.pack_into('<H', entry_struct, 24, data)
441        elif encoding == 'i16':
442            entry_struct[1] = Page.I16
443            struct.pack_into('<h', entry_struct, 24, data)
444        elif encoding == 'u32':
445            entry_struct[1] = Page.U32
446            struct.pack_into('<I', entry_struct, 24, data)
447        elif encoding == 'i32':
448            entry_struct[1] = Page.I32
449            struct.pack_into('<i', entry_struct, 24, data)
450        elif encoding == 'u64':
451            entry_struct[1] = Page.U64
452            struct.pack_into('<Q', entry_struct, 24, data)
453        elif encoding == 'i64':
454            entry_struct[1] = Page.I64
455            struct.pack_into('<q', entry_struct, 24, data)
456
457        # Compute CRC
458        crc_data = bytearray(b'28')
459        crc_data[0:4] = entry_struct[0:4]
460        crc_data[4:28] = entry_struct[8:32]
461        crc_data = bytes(crc_data)
462        crc = zlib.crc32(crc_data, 0xFFFFFFFF)
463        struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF)
464
465        # write to file
466        self.write_entry_to_buf(entry_struct, 1,nvs_obj)
467
468    """ Get page buffer data of a given page """
469    def get_data(self):
470        return self.page_buf
471
472
473"""
474NVS class encapsulates all NVS specific operations to create a binary with given key-value pairs.
475Binary can later be flashed onto device via a flashing utility.
476"""
477
478
479class NVS(object):
480    def __init__(self, fout, input_size, version, encrypt=False, key_input=None):
481        self.size = input_size
482        self.encrypt = encrypt
483        self.encr_key = None
484        self.namespace_idx = 0
485        self.page_num = -1
486        self.pages = []
487        self.version = version
488        self.fout = fout
489        if self.encrypt:
490            self.encr_key = key_input
491        self.cur_page = self.create_new_page(version)
492
493    def __enter__(self):
494        return self
495
496    def __exit__(self, exc_type, exc_value, traceback):
497        if exc_type is None and exc_value is None:
498            # Create pages for remaining available size
499            while True:
500                try:
501                    self.create_new_page()
502                except InsufficientSizeError:
503                    self.size = None
504                    # Creating the last reserved page
505                    self.create_new_page(is_rsrv_page=True)
506                    break
507            result = self.get_binary_data()
508            self.fout.write(result)
509
510    def create_new_page(self, version=None, is_rsrv_page=False):
511        # Set previous page state to FULL before creating new page
512        if self.pages:
513            curr_page_state = struct.unpack('<I', self.cur_page.page_buf[0:4])[0]
514            if curr_page_state == Page.ACTIVE:
515                page_state_full_seq = Page.FULL
516                struct.pack_into('<I', self.cur_page.page_buf, 0, page_state_full_seq)
517        # Set version for NVS binary generated
518        version = self.version
519        # Update available size as each page is created
520        if self.size == 0:
521            raise InsufficientSizeError('Error: Size parameter is less than the size of data in csv.Please increase size.')
522        if not is_rsrv_page:
523            self.size = self.size - Page.PAGE_PARAMS['max_size']
524        self.page_num += 1
525        # Set version for each page and page header
526        new_page = Page(self.page_num, version, is_rsrv_page)
527        self.pages.append(new_page)
528        self.cur_page = new_page
529        return new_page
530
531    """
532    Write namespace entry and subsequently increase namespace count so that all upcoming entries
533    will be mapped to a new namespace.
534    """
535    def write_namespace(self, key):
536        self.namespace_idx += 1
537        try:
538            self.cur_page.write_primitive_data(key, self.namespace_idx, 'u8', 0,self)
539        except PageFullError:
540            new_page = self.create_new_page()
541            new_page.write_primitive_data(key, self.namespace_idx, 'u8', 0,self)
542
543    """
544    Write key-value pair. Function accepts value in the form of ascii character and converts
545    it into appropriate format before calling Page class's functions to write entry into NVS format.
546    Function handles PageFullError and creates a new page and re-invokes the function on a new page.
547    We don't have to guard re-invocation with try-except since no entry can span multiple pages.
548    """
549    def write_entry(self, key, value, encoding):
550        if encoding == 'hex2bin':
551            value = value.strip()
552            if len(value) % 2 != 0:
553                raise InputError('%s: Invalid data length. Should be multiple of 2.' % key)
554            value = binascii.a2b_hex(value)
555
556        if encoding == 'base64':
557            value = binascii.a2b_base64(value)
558
559        if encoding == 'string':
560            if type(value) == bytes:
561                value = value.decode()
562            value += '\0'
563
564        encoding = encoding.lower()
565        varlen_encodings = ['string', 'binary', 'hex2bin', 'base64']
566        primitive_encodings = ['u8', 'i8', 'u16', 'i16', 'u32', 'i32', 'u64', 'i64']
567
568        if encoding in varlen_encodings:
569            try:
570                self.cur_page.write_varlen_data(key, value, encoding, self.namespace_idx,self)
571            except PageFullError:
572                new_page = self.create_new_page()
573                new_page.write_varlen_data(key, value, encoding, self.namespace_idx,self)
574        elif encoding in primitive_encodings:
575            try:
576                self.cur_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self)
577            except PageFullError:
578                new_page = self.create_new_page()
579                new_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self)
580        else:
581            raise InputError('%s: Unsupported encoding' % encoding)
582
583    """ Return accumulated data of all pages """
584    def get_binary_data(self):
585        data = bytearray()
586        for page in self.pages:
587            data += page.get_data()
588        return data
589
590
591class PageFullError(RuntimeError):
592    """
593    Represents error when current page doesn't have sufficient entries left
594    to accommodate current request
595    """
596    def __init__(self):
597        super(PageFullError, self).__init__()
598
599
600class InputError(RuntimeError):
601    """
602    Represents error on the input
603    """
604    def __init__(self, e):
605        print('\nError:')
606        super(InputError, self).__init__(e)
607
608
609class InsufficientSizeError(RuntimeError):
610    """
611    Represents error when NVS Partition size given is insufficient
612    to accomodate the data in the given csv file
613    """
614    def __init__(self, e):
615        super(InsufficientSizeError, self).__init__(e)
616
617
618def nvs_open(result_obj, input_size, version=None, is_encrypt=False, key=None):
619    """ Wrapper to create and NVS class object. This object can later be used to set key-value pairs
620
621    :param result_obj: File/Stream object to dump resultant binary. If data is to be dumped into memory, one way is to use BytesIO object
622    :param input_size: Size of Partition
623    :return: NVS class instance
624    """
625    return NVS(result_obj, input_size, version, encrypt=is_encrypt, key_input=key)
626
627
628def write_entry(nvs_instance, key, datatype, encoding, value):
629    """ Wrapper to set key-value pair in NVS format
630
631    :param nvs_instance: Instance of an NVS class returned by nvs_open()
632    :param key: Key of the data
633    :param datatype: Data type. Valid values are "file", "data" and "namespace"
634    :param encoding: Data encoding. Valid values are "u8", "i8", "u16", "i16", "u32", "i32", "u64", "i64", "string", "binary", "hex2bin" and "base64"
635    :param value: Data value in ascii encoded string format for "data" datatype and filepath for "file" datatype
636    :return: None
637    """
638
639    if datatype == 'file':
640        abs_file_path = value
641        if os.path.isabs(value) is False:
642            script_dir = os.getcwd()
643            abs_file_path = os.path.join(script_dir, value)
644
645        with open(abs_file_path, 'rb') as f:
646            value = f.read()
647
648    if datatype == 'namespace':
649        nvs_instance.write_namespace(key)
650    else:
651        nvs_instance.write_entry(key, value, encoding)
652
653
654def nvs_close(nvs_instance):
655    """ Wrapper to finish writing to NVS and write data to file/stream object provided to nvs_open method
656
657    :param nvs_instance: Instance of NVS class returned by nvs_open()
658    :return: None
659    """
660    nvs_instance.__exit__(None, None, None)
661
662
663def check_size(size):
664    '''
665    Checks for input partition size
666    :param size: Input partition size
667    '''
668    try:
669        # Set size
670        input_size = int(size, 0)
671        if input_size % 4096 != 0:
672            sys.exit('Size of partition must be multiple of 4096')
673
674        # Update size as a page needs to be reserved of size 4KB
675        input_size = input_size - Page.PAGE_PARAMS['max_size']
676
677        if input_size < (2 * Page.PAGE_PARAMS['max_size']):
678            sys.exit('Minimum NVS partition size needed is 0x3000 bytes.')
679        return input_size
680    except Exception as e:
681        print(e)
682        sys.exit(0)
683
684
685def set_target_filepath(outdir, filepath):
686    '''
687    Set target file path: <outdir>/<filepath>
688    :param outdir: Target output dir to store files
689    :param filepath: Path of target file
690    '''
691    bin_ext = '.bin'
692    # Expand if tilde(~) provided in path
693    outdir = os.path.expanduser(outdir)
694
695    if filepath:
696        key_file_name, ext  = os.path.splitext(filepath)
697        if not ext:
698            filepath = key_file_name + bin_ext
699        elif bin_ext not in ext:
700            sys.exit('Error: `%s`. Only `%s` extension allowed.' % (filepath, bin_ext))
701
702    # Create dir if does not exist
703    if not (os.path.isdir(outdir)):
704        distutils.dir_util.mkpath(outdir)
705
706    filedir, filename = os.path.split(filepath)
707    filedir = os.path.join(outdir,filedir,'')
708    if filedir and not os.path.isdir(filedir):
709        distutils.dir_util.mkpath(filedir)
710
711    if os.path.isabs(filepath):
712        if not outdir == os.getcwd():
713            print('\nWarning: `%s` \n\t==> absolute path given so outdir is ignored for this file.' % filepath)
714        # Set to empty as outdir is ignored here
715        outdir = ''
716
717    # Set full path - outdir + filename
718    filepath = os.path.join(outdir, '') + filepath
719
720    return outdir, filepath
721
722
723def encrypt(args):
724    '''
725    Generate encrypted NVS Partition
726    :param args: Command line arguments given
727    '''
728    key = None
729    bin_ext = '.bin'
730
731    check_size(args.size)
732    if (args.keygen is False) and (not args.inputkey):
733        sys.exit('Error. --keygen or --inputkey argument needed.')
734    elif args.keygen and args.inputkey:
735        sys.exit('Error. --keygen and --inputkey both are not allowed.')
736    elif not args.keygen and args.keyfile:
737        print('\nWarning:','--inputkey argument is given. --keyfile argument will be ignored...')
738
739    if args.inputkey:
740        # Check if key file has .bin extension
741        filename, ext = os.path.splitext(args.inputkey)
742        if bin_ext not in ext:
743            sys.exit('Error: `%s`. Only `%s` extension allowed.' % (args.inputkey, bin_ext))
744        key = bytearray()
745        with open(args.inputkey, 'rb') as key_f:
746            key = key_f.read(64)
747
748    # Generate encrypted NVS Partition
749    generate(args, is_encr_enabled=True, encr_key=key)
750
751
752def decrypt_data(data_input, decr_key, page_num, entry_no, entry_size):
753    '''
754    Decrypt NVS data entry
755    '''
756    page_max_size = 4096
757    first_entry_offset = 64
758    init_tweak_val = '0'
759    tweak_len_needed = 32  # in hex
760    tweak_tmp = ''
761
762    data_input = binascii.hexlify(data_input)
763    rel_addr = page_num * page_max_size + first_entry_offset
764
765    # Set tweak value
766    offset = entry_no * entry_size
767    addr = hex(rel_addr + offset)[2:]
768    addr_len = len(addr)
769    if addr_len > 2:
770        if not addr_len % 2:
771            addr_tmp = addr
772        else:
773            addr_tmp = init_tweak_val + addr
774        tweak_tmp = reverse_hexbytes(addr_tmp)
775        tweak_val = tweak_tmp + (init_tweak_val * (tweak_len_needed - (len(tweak_tmp))))
776    else:
777        tweak_val = addr + (init_tweak_val * (tweak_len_needed - len(addr)))
778
779    if type(data_input) == bytes:
780        data_input = data_input.decode()
781
782    # Decrypt 32 bytes of data using AES-XTS decryption
783    backend = default_backend()
784    plain_text = codecs.decode(data_input, 'hex')
785    tweak = codecs.decode(tweak_val, 'hex')
786    cipher = Cipher(algorithms.AES(decr_key), modes.XTS(tweak), backend=backend)
787    decryptor = cipher.decryptor()
788    decrypted_data = decryptor.update(plain_text)
789
790    return decrypted_data
791
792
793def decrypt(args):
794    '''
795    Decrypt encrypted NVS Partition
796    :param args: Command line arguments given
797    '''
798    bin_ext = '.bin'
799    nvs_read_bytes = 32
800    decrypted_entry_no = 0
801    file_entry_no = 0
802    page_num = 0
803    page_max_size = 4096
804    start_entry_offset = 0
805    empty_data_entry = bytearray(b'\xff') * nvs_read_bytes
806
807    # Check if key file has .bin extension
808    input_files = [args.input, args.key, args.output]
809    for filepath in input_files:
810        filename, ext = os.path.splitext(filepath)
811        if bin_ext not in ext:
812            sys.exit('Error: `%s`. Only `%s` extension allowed.' % (filepath, bin_ext))
813    with open(args.key,'rb') as decr_key_file:
814        decr_key = decr_key_file.read(64)
815
816    args.outdir, args.output = set_target_filepath(args.outdir, args.output)
817
818    output_buf = bytearray(b'\xff')
819
820    with open(args.input, 'rb') as input_file, open(args.output,'wb') as output_file:
821        while True:
822            if file_entry_no == 128:
823                decrypted_entry_no = 0
824                file_entry_no = 0
825                page_num += 1
826            data_entry = input_file.read(nvs_read_bytes)
827            if not data_entry:
828                break
829            if data_entry != empty_data_entry and file_entry_no not in [0,1]:
830                data_entry = decrypt_data(data_entry, decr_key, page_num, decrypted_entry_no, nvs_read_bytes)
831                decrypted_entry_no += 1
832            write_entry_no = ((page_num * page_max_size) + file_entry_no)
833            start_idx = start_entry_offset + (write_entry_no  * nvs_read_bytes)
834            end_idx = nvs_read_bytes
835            output_buf[start_idx:end_idx] = data_entry
836            file_entry_no += 1
837            start_entry_offset += nvs_read_bytes
838        output_file.write(output_buf)
839
840    print('\nCreated NVS decrypted binary: ===>', args.output)
841
842
843def generate_key(args):
844    '''
845    Generate encryption keys
846    :param args: Command line arguments given
847    '''
848    page_max_size = 4096
849    keys_dir = 'keys'
850    output_keyfile = None
851    bin_ext = '.bin'
852
853    if not args.keyfile:
854        timestamp = datetime.datetime.now().strftime('%m-%d_%H-%M')
855        args.keyfile = 'keys-' + timestamp + bin_ext
856
857    keys_outdir = os.path.join(args.outdir,keys_dir, '')
858    # Create keys/ dir in <outdir> if does not exist
859    if not (os.path.isdir(keys_outdir)):
860        distutils.dir_util.mkpath(keys_outdir)
861    keys_outdir, output_keyfile = set_target_filepath(keys_outdir, args.keyfile)
862
863    key = ''.join(random.choice('0123456789abcdef') for _ in range(128)).strip()
864    encr_key_bytes = codecs.decode(key, 'hex')
865    key_len = len(encr_key_bytes)
866
867    keys_buf = bytearray(b'\xff') * page_max_size
868    keys_buf[0:key_len] = encr_key_bytes
869    crc_data = keys_buf[0:key_len]
870    crc_data = bytes(crc_data)
871    crc = zlib.crc32(crc_data, 0xFFFFFFFF)
872    struct.pack_into('<I', keys_buf, key_len,  crc & 0xFFFFFFFF)
873
874    with open(output_keyfile, 'wb') as output_keys_file:
875        output_keys_file.write(keys_buf)
876
877    print('\nCreated encryption keys: ===> ', output_keyfile)
878
879    return key
880
881
882def generate(args, is_encr_enabled=False, encr_key=None):
883    '''
884    Generate NVS Partition
885    :param args: Command line arguments given
886    :param is_encr_enabled: Encryption enabled/disabled
887    :param encr_key: Key to encrypt NVS partition
888    '''
889    is_dir_new = False
890    bin_ext = '.bin'
891
892    input_size = check_size(args.size)
893    if args.version == 1:
894        args.version = Page.VERSION1
895    elif args.version == 2:
896        args.version = Page.VERSION2
897
898    # Check if key file has .bin extension
899    filename, ext = os.path.splitext(args.output)
900    if bin_ext not in ext:
901        sys.exit('Error: `%s`. Only `.bin` extension allowed.' % args.output)
902    args.outdir, args.output = set_target_filepath(args.outdir, args.output)
903
904    if is_encr_enabled and not encr_key:
905        encr_key = generate_key(args)
906
907    input_file = open(args.input, 'rt', encoding='utf8')
908    output_file = open(args.output, 'wb')
909
910    with open(args.input, 'rt', encoding='utf8') as input_file,\
911            open(args.output, 'wb') as output_file,\
912            nvs_open(output_file, input_size, args.version, is_encrypt=is_encr_enabled, key=encr_key) as nvs_obj:
913
914        if nvs_obj.version == Page.VERSION1:
915            version_set = VERSION1_PRINT
916        else:
917            version_set = VERSION2_PRINT
918
919        print('\nCreating NVS binary with version:', version_set)
920
921        line = input_file.readline().strip()
922
923        # Comments are skipped
924        while line.startswith('#'):
925            line = input_file.readline().strip()
926        if not isinstance(line, str):
927            line = line.encode('utf-8')
928
929        header = line.split(',')
930
931        while True:
932            line = input_file.readline().strip()
933            if not isinstance(line, str):
934                line = line.encode('utf-8')
935
936            value = line.split(',')
937            if len(value) == 1 and '' in value:
938                break
939
940            data = dict(zip_longest(header, value))
941
942            try:
943                # Check key length
944                if len(data['key']) > 15:
945                    raise InputError('Length of key `{}` should be <= 15 characters.'.format(data['key']))
946                write_entry(nvs_obj, data['key'], data['type'], data['encoding'], data['value'])
947            except InputError as e:
948                print(e)
949                filedir, filename = os.path.split(args.output)
950                if filename:
951                    print('\nWarning: NVS binary not created...')
952                    os.remove(args.output)
953                if is_dir_new and not filedir == os.getcwd():
954                        print('\nWarning: Output dir not created...')
955                        os.rmdir(filedir)
956                sys.exit(-2)
957
958    print('\nCreated NVS binary: ===>', args.output)
959
960
961def main():
962    parser = argparse.ArgumentParser(description='\nESP NVS partition generation utility', formatter_class=argparse.RawTextHelpFormatter)
963    subparser = parser.add_subparsers(title='Commands',
964                                      dest='command',
965                                      help='\nRun nvs_partition_gen.py {command} -h for additional help\n\n')
966
967    parser_gen = subparser.add_parser('generate',
968                                      help='Generate NVS partition',
969                                      formatter_class=argparse.RawTextHelpFormatter)
970    parser_gen.set_defaults(func=generate)
971    parser_gen.add_argument('input',
972                            default=None,
973                            help='Path to CSV file to parse')
974    parser_gen.add_argument('output',
975                            default=None,
976                            help='Path to output NVS binary file')
977    parser_gen.add_argument('size',
978                            default=None,
979                            help='Size of NVS partition in bytes\
980                            \n(must be multiple of 4096)')
981    parser_gen.add_argument('--version',
982                            choices=[1,2],
983                            default=2,
984                            type=int,
985                            help='''Set multipage blob version.\
986                            \nVersion 1 - Multipage blob support disabled.\
987                            \nVersion 2 - Multipage blob support enabled.\
988                            \nDefault: Version 2''')
989    parser_gen.add_argument('--outdir',
990                            default=os.getcwd(),
991                            help='Output directory to store files created\
992                            \n(Default: current directory)')
993    parser_gen_key = subparser.add_parser('generate-key',
994                                          help='Generate keys for encryption',
995                                          formatter_class=argparse.RawTextHelpFormatter)
996    parser_gen_key.set_defaults(func=generate_key)
997    parser_gen_key.add_argument('--keyfile',
998                                default=None,
999                                help='Path to output encryption keys file')
1000    parser_gen_key.add_argument('--outdir',
1001                                default=os.getcwd(),
1002                                help='Output directory to store files created.\
1003                                \n(Default: current directory)')
1004    parser_encr = subparser.add_parser('encrypt',
1005                                       help='Generate NVS encrypted partition',
1006                                       formatter_class=argparse.RawTextHelpFormatter)
1007    parser_encr.set_defaults(func=encrypt)
1008    parser_encr.add_argument('input',
1009                             default=None,
1010                             help='Path to CSV file to parse')
1011    parser_encr.add_argument('output',
1012                             default=None,
1013                             help='Path to output NVS binary file')
1014    parser_encr.add_argument('size',
1015                             default=None,
1016                             help='Size of NVS partition in bytes\
1017                             \n(must be multiple of 4096)')
1018    parser_encr.add_argument('--version',
1019                             choices=[1,2],
1020                             default=2,
1021                             type=int,
1022                             help='''Set multipage blob version.\
1023                             \nVersion 1 - Multipage blob support disabled.\
1024                             \nVersion 2 - Multipage blob support enabled.\
1025                             \nDefault: Version 2''')
1026    parser_encr.add_argument('--keygen',
1027                             action='store_true',
1028                             default=False,
1029                             help='Generates key for encrypting NVS partition')
1030    parser_encr.add_argument('--keyfile',
1031                             default=None,
1032                             help='Path to output encryption keys file')
1033    parser_encr.add_argument('--inputkey',
1034                             default=None,
1035                             help='File having key for encrypting NVS partition')
1036    parser_encr.add_argument('--outdir',
1037                             default=os.getcwd(),
1038                             help='Output directory to store files created.\
1039                             \n(Default: current directory)')
1040    parser_decr = subparser.add_parser('decrypt',
1041                                       help='Decrypt NVS encrypted partition',
1042                                       formatter_class=argparse.RawTextHelpFormatter)
1043    parser_decr.set_defaults(func=decrypt)
1044    parser_decr.add_argument('input',
1045                             default=None,
1046                             help='Path to encrypted NVS partition file to parse')
1047    parser_decr.add_argument('key',
1048                             default=None,
1049                             help='Path to file having keys for decryption')
1050    parser_decr.add_argument('output',
1051                             default=None,
1052                             help='Path to output decrypted binary file')
1053    parser_decr.add_argument('--outdir',
1054                             default=os.getcwd(),
1055                             help='Output directory to store files created.\
1056                             \n(Default: current directory)')
1057    args = parser.parse_args()
1058
1059    args.func(args)
1060
1061
1062if __name__ == '__main__':
1063    main()
1064