1#!/usr/bin/env python 2# 3# esp-idf NVS partition generation tool. Tool helps in generating NVS-compatible 4# partition binary, with key-value pair entries provided via a CSV file. 5# 6# Copyright 2018 Espressif Systems (Shanghai) PTE LTD 7# 8# Licensed under the Apache License, Version 2.0 (the "License"); 9# you may not use this file except in compliance with the License. 10# You may obtain a copy of the License at 11# 12# http://www.apache.org/licenses/LICENSE-2.0 13# 14# Unless required by applicable law or agreed to in writing, software 15# distributed under the License is distributed on an "AS IS" BASIS, 16# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 17# See the License for the specific language governing permissions and 18# limitations under the License. 19# 20 21from __future__ import division, print_function 22 23import argparse 24import array 25import binascii 26import codecs 27import datetime 28import distutils.dir_util 29import os 30import random 31import struct 32import sys 33import zlib 34from builtins import bytes, int, range 35from io import open 36 37from future.moves.itertools import zip_longest 38 39try: 40 from cryptography.hazmat.backends import default_backend 41 from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 42except ImportError: 43 print('The cryptography package is not installed.' 44 'Please refer to the Get Started section of the ESP-IDF Programming Guide for ' 45 'setting up the required packages.') 46 raise 47 48VERSION1_PRINT = 'V1 - Multipage Blob Support Disabled' 49VERSION2_PRINT = 'V2 - Multipage Blob Support Enabled' 50 51 52def reverse_hexbytes(addr_tmp): 53 addr = [] 54 reversed_bytes = '' 55 for i in range(0, len(addr_tmp), 2): 56 addr.append(addr_tmp[i:i + 2]) 57 reversed_bytes = ''.join(reversed(addr)) 58 59 return reversed_bytes 60 61 62""" Class for standard NVS page structure """ 63 64 65class Page(object): 66 PAGE_PARAMS = { 67 'max_size': 4096, 68 'max_old_blob_size': 1984, 69 'max_new_blob_size': 4000, 70 'max_entries': 126 71 } 72 73 # Item type codes 74 U8 = 0x01 75 I8 = 0x11 76 U16 = 0x02 77 I16 = 0x12 78 U32 = 0x04 79 I32 = 0x14 80 U64 = 0x08 81 I64 = 0x18 82 SZ = 0x21 83 BLOB = 0x41 84 BLOB_DATA = 0x42 85 BLOB_IDX = 0x48 86 87 # Few Page constants 88 HEADER_SIZE = 32 89 BITMAPARRAY_OFFSET = 32 90 BITMAPARRAY_SIZE_IN_BYTES = 32 91 FIRST_ENTRY_OFFSET = 64 92 SINGLE_ENTRY_SIZE = 32 93 CHUNK_ANY = 0xFF 94 ACTIVE = 0xFFFFFFFE 95 FULL = 0xFFFFFFFC 96 VERSION1 = 0xFF 97 VERSION2 = 0xFE 98 99 def __init__(self, page_num, version, is_rsrv_page=False): 100 self.entry_num = 0 101 self.bitmap_array = array.array('B') 102 self.version = version 103 self.page_buf = bytearray(b'\xff') * Page.PAGE_PARAMS['max_size'] 104 if not is_rsrv_page: 105 self.bitmap_array = self.create_bitmap_array() 106 self.set_header(page_num, version) 107 108 def set_header(self, page_num, version): 109 # set page state to active 110 page_header = bytearray(b'\xff') * 32 111 page_state_active_seq = Page.ACTIVE 112 struct.pack_into('<I', page_header, 0, page_state_active_seq) 113 # set page sequence number 114 struct.pack_into('<I', page_header, 4, page_num) 115 # set version 116 if version == Page.VERSION2: 117 page_header[8] = Page.VERSION2 118 elif version == Page.VERSION1: 119 page_header[8] = Page.VERSION1 120 # set header's CRC 121 crc_data = bytes(page_header[4:28]) 122 crc = zlib.crc32(crc_data, 0xFFFFFFFF) 123 struct.pack_into('<I', page_header, 28, crc & 0xFFFFFFFF) 124 self.page_buf[0:len(page_header)] = page_header 125 126 def create_bitmap_array(self): 127 bitarray = array.array('B') 128 charsize = 32 # bitmaparray has 256 bits, hence 32 bytes 129 fill = 255 # Fill all 8 bits with 1's 130 bitarray.extend((fill,) * charsize) 131 return bitarray 132 133 def write_bitmaparray(self): 134 bitnum = self.entry_num * 2 135 byte_idx = bitnum // 8 # Find byte index in the array 136 bit_offset = bitnum & 7 # Find bit offset in given byte index 137 mask = ~(1 << bit_offset) 138 self.bitmap_array[byte_idx] &= mask 139 start_idx = Page.BITMAPARRAY_OFFSET 140 end_idx = Page.BITMAPARRAY_OFFSET + Page.BITMAPARRAY_SIZE_IN_BYTES 141 self.page_buf[start_idx:end_idx] = self.bitmap_array 142 143 def encrypt_entry(self, data_arr, tweak_arr, encr_key): 144 # Encrypt 32 bytes of data using AES-XTS encryption 145 backend = default_backend() 146 plain_text = codecs.decode(data_arr, 'hex') 147 tweak = codecs.decode(tweak_arr, 'hex') 148 149 cipher = Cipher(algorithms.AES(encr_key), modes.XTS(tweak), backend=backend) 150 encryptor = cipher.encryptor() 151 encrypted_data = encryptor.update(plain_text) 152 153 return encrypted_data 154 155 def encrypt_data(self, data_input, no_of_entries, nvs_obj): 156 # Set values needed for encryption and encrypt data byte wise 157 encr_data_to_write = bytearray() 158 data_len_needed = 64 # in hex 159 tweak_len_needed = 32 # in hex 160 key_len_needed = 64 161 init_tweak_val = '0' 162 init_data_val = 'f' 163 tweak_tmp = '' 164 encr_key_input = None 165 166 # Extract encryption key and tweak key from given key input 167 if len(nvs_obj.encr_key) == key_len_needed: 168 encr_key_input = nvs_obj.encr_key 169 else: 170 encr_key_input = codecs.decode(nvs_obj.encr_key, 'hex') 171 172 rel_addr = nvs_obj.page_num * Page.PAGE_PARAMS['max_size'] + Page.FIRST_ENTRY_OFFSET 173 174 if not isinstance(data_input, bytearray): 175 byte_arr = bytearray(b'\xff') * 32 176 byte_arr[0:len(data_input)] = data_input 177 data_input = byte_arr 178 179 data_input = binascii.hexlify(data_input) 180 181 entry_no = self.entry_num 182 start_idx = 0 183 end_idx = start_idx + 64 184 185 for _ in range(0, no_of_entries): 186 # Set tweak value 187 offset = entry_no * Page.SINGLE_ENTRY_SIZE 188 addr = hex(rel_addr + offset)[2:] 189 addr_len = len(addr) 190 if addr_len > 2: 191 if not addr_len % 2: 192 addr_tmp = addr 193 else: 194 addr_tmp = init_tweak_val + addr 195 tweak_tmp = reverse_hexbytes(addr_tmp) 196 tweak_val = tweak_tmp + (init_tweak_val * (tweak_len_needed - (len(tweak_tmp)))) 197 else: 198 tweak_val = addr + (init_tweak_val * (tweak_len_needed - len(addr))) 199 200 # Encrypt data 201 data_bytes = data_input[start_idx:end_idx] 202 if type(data_bytes) == bytes: 203 data_bytes = data_bytes.decode() 204 205 data_val = data_bytes + (init_data_val * (data_len_needed - len(data_bytes))) 206 encr_data_ret = self.encrypt_entry(data_val, tweak_val, encr_key_input) 207 encr_data_to_write = encr_data_to_write + encr_data_ret 208 # Update values for encrypting next set of data bytes 209 start_idx = end_idx 210 end_idx = start_idx + 64 211 entry_no += 1 212 213 return encr_data_to_write 214 215 def write_entry_to_buf(self, data, entrycount,nvs_obj): 216 encr_data = bytearray() 217 218 if nvs_obj.encrypt: 219 encr_data_ret = self.encrypt_data(data, entrycount,nvs_obj) 220 encr_data[0:len(encr_data_ret)] = encr_data_ret 221 data = encr_data 222 223 data_offset = Page.FIRST_ENTRY_OFFSET + (Page.SINGLE_ENTRY_SIZE * self.entry_num) 224 start_idx = data_offset 225 end_idx = data_offset + len(data) 226 self.page_buf[start_idx:end_idx] = data 227 228 # Set bitmap array for entries in current page 229 for i in range(0, entrycount): 230 self.write_bitmaparray() 231 self.entry_num += 1 232 233 def set_crc_header(self, entry_struct): 234 crc_data = bytearray(b'28') 235 crc_data[0:4] = entry_struct[0:4] 236 crc_data[4:28] = entry_struct[8:32] 237 crc_data = bytes(crc_data) 238 crc = zlib.crc32(crc_data, 0xFFFFFFFF) 239 struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF) 240 return entry_struct 241 242 def write_varlen_binary_data(self, entry_struct, ns_index, key, data, data_size, total_entry_count, encoding, nvs_obj): 243 chunk_start = 0 244 chunk_count = 0 245 chunk_index = Page.CHUNK_ANY 246 offset = 0 247 remaining_size = data_size 248 tailroom = None 249 250 while True: 251 chunk_size = 0 252 253 # Get the size available in current page 254 tailroom = (Page.PAGE_PARAMS['max_entries'] - self.entry_num - 1) * Page.SINGLE_ENTRY_SIZE 255 assert tailroom >= 0, 'Page overflow!!' 256 257 # Split the binary data into two and store a chunk of available size onto curr page 258 if tailroom < remaining_size: 259 chunk_size = tailroom 260 else: 261 chunk_size = remaining_size 262 263 remaining_size = remaining_size - chunk_size 264 265 # Change type of data to BLOB_DATA 266 entry_struct[1] = Page.BLOB_DATA 267 268 # Calculate no. of entries data chunk will require 269 datachunk_rounded_size = (chunk_size + 31) & ~31 270 datachunk_entry_count = datachunk_rounded_size // 32 271 datachunk_total_entry_count = datachunk_entry_count + 1 # +1 for the entry header 272 273 # Set Span 274 entry_struct[2] = datachunk_total_entry_count 275 276 # Update the chunkIndex 277 chunk_index = chunk_start + chunk_count 278 entry_struct[3] = chunk_index 279 280 # Set data chunk 281 data_chunk = data[offset:offset + chunk_size] 282 283 # Compute CRC of data chunk 284 struct.pack_into('<H', entry_struct, 24, chunk_size) 285 286 if type(data) != bytes: 287 data_chunk = bytes(data_chunk, encoding='utf8') 288 289 crc = zlib.crc32(data_chunk, 0xFFFFFFFF) 290 struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF) 291 292 # compute crc of entry header 293 entry_struct = self.set_crc_header(entry_struct) 294 295 # write entry header 296 self.write_entry_to_buf(entry_struct, 1,nvs_obj) 297 # write actual data 298 self.write_entry_to_buf(data_chunk, datachunk_entry_count,nvs_obj) 299 300 chunk_count = chunk_count + 1 301 302 if remaining_size or (tailroom - chunk_size) < Page.SINGLE_ENTRY_SIZE: 303 nvs_obj.create_new_page() 304 self = nvs_obj.cur_page 305 306 offset = offset + chunk_size 307 308 # All chunks are stored, now store the index 309 if not remaining_size: 310 # Initialise data field to 0xff 311 data_array = bytearray(b'\xff') * 8 312 entry_struct[24:32] = data_array 313 314 # change type of data to BLOB_IDX 315 entry_struct[1] = Page.BLOB_IDX 316 317 # Set Span 318 entry_struct[2] = 1 319 320 # Update the chunkIndex 321 chunk_index = Page.CHUNK_ANY 322 entry_struct[3] = chunk_index 323 324 struct.pack_into('<I', entry_struct, 24, data_size) 325 entry_struct[28] = chunk_count 326 entry_struct[29] = chunk_start 327 328 # compute crc of entry header 329 entry_struct = self.set_crc_header(entry_struct) 330 331 # write last entry 332 self.write_entry_to_buf(entry_struct, 1,nvs_obj) 333 break 334 335 return entry_struct 336 337 def write_single_page_entry(self, entry_struct, data, datalen, data_entry_count, nvs_obj): 338 # compute CRC of data 339 struct.pack_into('<H', entry_struct, 24, datalen) 340 341 if type(data) != bytes: 342 data = bytes(data, encoding='utf8') 343 344 crc = zlib.crc32(data, 0xFFFFFFFF) 345 struct.pack_into('<I', entry_struct, 28, crc & 0xFFFFFFFF) 346 347 # compute crc of entry header 348 entry_struct = self.set_crc_header(entry_struct) 349 350 # write entry header 351 self.write_entry_to_buf(entry_struct, 1, nvs_obj) 352 # write actual data 353 self.write_entry_to_buf(data, data_entry_count, nvs_obj) 354 355 """ 356 Low-level function to write variable length data into page buffer. Data should be formatted 357 according to encoding specified. 358 """ 359 def write_varlen_data(self, key, data, encoding, ns_index,nvs_obj): 360 # Set size of data 361 datalen = len(data) 362 363 if datalen > Page.PAGE_PARAMS['max_old_blob_size']: 364 if self.version == Page.VERSION1: 365 raise InputError(' Input File: Size (%d) exceeds max allowed length `%s` bytes for key `%s`.' 366 % (datalen, Page.PAGE_PARAMS['max_old_blob_size'], key)) 367 else: 368 if encoding == 'string': 369 raise InputError(' Input File: Size (%d) exceeds max allowed length `%s` bytes for key `%s`.' 370 % (datalen, Page.PAGE_PARAMS['max_old_blob_size'], key)) 371 372 # Calculate no. of entries data will require 373 rounded_size = (datalen + 31) & ~31 374 data_entry_count = rounded_size // 32 375 total_entry_count = data_entry_count + 1 # +1 for the entry header 376 377 # Check if page is already full and new page is needed to be created right away 378 if self.entry_num >= Page.PAGE_PARAMS['max_entries']: 379 raise PageFullError() 380 elif (self.entry_num + total_entry_count) >= Page.PAGE_PARAMS['max_entries']: 381 if not (self.version == Page.VERSION2 and encoding in ['hex2bin', 'binary', 'base64']): 382 raise PageFullError() 383 384 # Entry header 385 entry_struct = bytearray(b'\xff') * 32 386 # Set Namespace Index 387 entry_struct[0] = ns_index 388 # Set Span 389 if self.version == Page.VERSION2: 390 if encoding == 'string': 391 entry_struct[2] = data_entry_count + 1 392 # Set Chunk Index 393 chunk_index = Page.CHUNK_ANY 394 entry_struct[3] = chunk_index 395 else: 396 entry_struct[2] = data_entry_count + 1 397 398 # set key 399 key_array = b'\x00' * 16 400 entry_struct[8:24] = key_array 401 entry_struct[8:8 + len(key)] = key.encode() 402 403 # set Type 404 if encoding == 'string': 405 entry_struct[1] = Page.SZ 406 elif encoding in ['hex2bin', 'binary', 'base64']: 407 entry_struct[1] = Page.BLOB 408 409 if self.version == Page.VERSION2 and (encoding in ['hex2bin', 'binary', 'base64']): 410 entry_struct = self.write_varlen_binary_data(entry_struct,ns_index,key,data, 411 datalen,total_entry_count, encoding, nvs_obj) 412 else: 413 self.write_single_page_entry(entry_struct, data, datalen, data_entry_count, nvs_obj) 414 415 """ Low-level function to write data of primitive type into page buffer. """ 416 def write_primitive_data(self, key, data, encoding, ns_index,nvs_obj): 417 # Check if entry exceeds max number of entries allowed per page 418 if self.entry_num >= Page.PAGE_PARAMS['max_entries']: 419 raise PageFullError() 420 421 entry_struct = bytearray(b'\xff') * 32 422 entry_struct[0] = ns_index # namespace index 423 entry_struct[2] = 0x01 # Span 424 chunk_index = Page.CHUNK_ANY 425 entry_struct[3] = chunk_index 426 427 # write key 428 key_array = b'\x00' * 16 429 entry_struct[8:24] = key_array 430 entry_struct[8:8 + len(key)] = key.encode() 431 432 if encoding == 'u8': 433 entry_struct[1] = Page.U8 434 struct.pack_into('<B', entry_struct, 24, data) 435 elif encoding == 'i8': 436 entry_struct[1] = Page.I8 437 struct.pack_into('<b', entry_struct, 24, data) 438 elif encoding == 'u16': 439 entry_struct[1] = Page.U16 440 struct.pack_into('<H', entry_struct, 24, data) 441 elif encoding == 'i16': 442 entry_struct[1] = Page.I16 443 struct.pack_into('<h', entry_struct, 24, data) 444 elif encoding == 'u32': 445 entry_struct[1] = Page.U32 446 struct.pack_into('<I', entry_struct, 24, data) 447 elif encoding == 'i32': 448 entry_struct[1] = Page.I32 449 struct.pack_into('<i', entry_struct, 24, data) 450 elif encoding == 'u64': 451 entry_struct[1] = Page.U64 452 struct.pack_into('<Q', entry_struct, 24, data) 453 elif encoding == 'i64': 454 entry_struct[1] = Page.I64 455 struct.pack_into('<q', entry_struct, 24, data) 456 457 # Compute CRC 458 crc_data = bytearray(b'28') 459 crc_data[0:4] = entry_struct[0:4] 460 crc_data[4:28] = entry_struct[8:32] 461 crc_data = bytes(crc_data) 462 crc = zlib.crc32(crc_data, 0xFFFFFFFF) 463 struct.pack_into('<I', entry_struct, 4, crc & 0xFFFFFFFF) 464 465 # write to file 466 self.write_entry_to_buf(entry_struct, 1,nvs_obj) 467 468 """ Get page buffer data of a given page """ 469 def get_data(self): 470 return self.page_buf 471 472 473""" 474NVS class encapsulates all NVS specific operations to create a binary with given key-value pairs. 475Binary can later be flashed onto device via a flashing utility. 476""" 477 478 479class NVS(object): 480 def __init__(self, fout, input_size, version, encrypt=False, key_input=None): 481 self.size = input_size 482 self.encrypt = encrypt 483 self.encr_key = None 484 self.namespace_idx = 0 485 self.page_num = -1 486 self.pages = [] 487 self.version = version 488 self.fout = fout 489 if self.encrypt: 490 self.encr_key = key_input 491 self.cur_page = self.create_new_page(version) 492 493 def __enter__(self): 494 return self 495 496 def __exit__(self, exc_type, exc_value, traceback): 497 if exc_type is None and exc_value is None: 498 # Create pages for remaining available size 499 while True: 500 try: 501 self.create_new_page() 502 except InsufficientSizeError: 503 self.size = None 504 # Creating the last reserved page 505 self.create_new_page(is_rsrv_page=True) 506 break 507 result = self.get_binary_data() 508 self.fout.write(result) 509 510 def create_new_page(self, version=None, is_rsrv_page=False): 511 # Set previous page state to FULL before creating new page 512 if self.pages: 513 curr_page_state = struct.unpack('<I', self.cur_page.page_buf[0:4])[0] 514 if curr_page_state == Page.ACTIVE: 515 page_state_full_seq = Page.FULL 516 struct.pack_into('<I', self.cur_page.page_buf, 0, page_state_full_seq) 517 # Set version for NVS binary generated 518 version = self.version 519 # Update available size as each page is created 520 if self.size == 0: 521 raise InsufficientSizeError('Error: Size parameter is less than the size of data in csv.Please increase size.') 522 if not is_rsrv_page: 523 self.size = self.size - Page.PAGE_PARAMS['max_size'] 524 self.page_num += 1 525 # Set version for each page and page header 526 new_page = Page(self.page_num, version, is_rsrv_page) 527 self.pages.append(new_page) 528 self.cur_page = new_page 529 return new_page 530 531 """ 532 Write namespace entry and subsequently increase namespace count so that all upcoming entries 533 will be mapped to a new namespace. 534 """ 535 def write_namespace(self, key): 536 self.namespace_idx += 1 537 try: 538 self.cur_page.write_primitive_data(key, self.namespace_idx, 'u8', 0,self) 539 except PageFullError: 540 new_page = self.create_new_page() 541 new_page.write_primitive_data(key, self.namespace_idx, 'u8', 0,self) 542 543 """ 544 Write key-value pair. Function accepts value in the form of ascii character and converts 545 it into appropriate format before calling Page class's functions to write entry into NVS format. 546 Function handles PageFullError and creates a new page and re-invokes the function on a new page. 547 We don't have to guard re-invocation with try-except since no entry can span multiple pages. 548 """ 549 def write_entry(self, key, value, encoding): 550 if encoding == 'hex2bin': 551 value = value.strip() 552 if len(value) % 2 != 0: 553 raise InputError('%s: Invalid data length. Should be multiple of 2.' % key) 554 value = binascii.a2b_hex(value) 555 556 if encoding == 'base64': 557 value = binascii.a2b_base64(value) 558 559 if encoding == 'string': 560 if type(value) == bytes: 561 value = value.decode() 562 value += '\0' 563 564 encoding = encoding.lower() 565 varlen_encodings = ['string', 'binary', 'hex2bin', 'base64'] 566 primitive_encodings = ['u8', 'i8', 'u16', 'i16', 'u32', 'i32', 'u64', 'i64'] 567 568 if encoding in varlen_encodings: 569 try: 570 self.cur_page.write_varlen_data(key, value, encoding, self.namespace_idx,self) 571 except PageFullError: 572 new_page = self.create_new_page() 573 new_page.write_varlen_data(key, value, encoding, self.namespace_idx,self) 574 elif encoding in primitive_encodings: 575 try: 576 self.cur_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self) 577 except PageFullError: 578 new_page = self.create_new_page() 579 new_page.write_primitive_data(key, int(value), encoding, self.namespace_idx,self) 580 else: 581 raise InputError('%s: Unsupported encoding' % encoding) 582 583 """ Return accumulated data of all pages """ 584 def get_binary_data(self): 585 data = bytearray() 586 for page in self.pages: 587 data += page.get_data() 588 return data 589 590 591class PageFullError(RuntimeError): 592 """ 593 Represents error when current page doesn't have sufficient entries left 594 to accommodate current request 595 """ 596 def __init__(self): 597 super(PageFullError, self).__init__() 598 599 600class InputError(RuntimeError): 601 """ 602 Represents error on the input 603 """ 604 def __init__(self, e): 605 print('\nError:') 606 super(InputError, self).__init__(e) 607 608 609class InsufficientSizeError(RuntimeError): 610 """ 611 Represents error when NVS Partition size given is insufficient 612 to accomodate the data in the given csv file 613 """ 614 def __init__(self, e): 615 super(InsufficientSizeError, self).__init__(e) 616 617 618def nvs_open(result_obj, input_size, version=None, is_encrypt=False, key=None): 619 """ Wrapper to create and NVS class object. This object can later be used to set key-value pairs 620 621 :param result_obj: File/Stream object to dump resultant binary. If data is to be dumped into memory, one way is to use BytesIO object 622 :param input_size: Size of Partition 623 :return: NVS class instance 624 """ 625 return NVS(result_obj, input_size, version, encrypt=is_encrypt, key_input=key) 626 627 628def write_entry(nvs_instance, key, datatype, encoding, value): 629 """ Wrapper to set key-value pair in NVS format 630 631 :param nvs_instance: Instance of an NVS class returned by nvs_open() 632 :param key: Key of the data 633 :param datatype: Data type. Valid values are "file", "data" and "namespace" 634 :param encoding: Data encoding. Valid values are "u8", "i8", "u16", "i16", "u32", "i32", "u64", "i64", "string", "binary", "hex2bin" and "base64" 635 :param value: Data value in ascii encoded string format for "data" datatype and filepath for "file" datatype 636 :return: None 637 """ 638 639 if datatype == 'file': 640 abs_file_path = value 641 if os.path.isabs(value) is False: 642 script_dir = os.getcwd() 643 abs_file_path = os.path.join(script_dir, value) 644 645 with open(abs_file_path, 'rb') as f: 646 value = f.read() 647 648 if datatype == 'namespace': 649 nvs_instance.write_namespace(key) 650 else: 651 nvs_instance.write_entry(key, value, encoding) 652 653 654def nvs_close(nvs_instance): 655 """ Wrapper to finish writing to NVS and write data to file/stream object provided to nvs_open method 656 657 :param nvs_instance: Instance of NVS class returned by nvs_open() 658 :return: None 659 """ 660 nvs_instance.__exit__(None, None, None) 661 662 663def check_size(size): 664 ''' 665 Checks for input partition size 666 :param size: Input partition size 667 ''' 668 try: 669 # Set size 670 input_size = int(size, 0) 671 if input_size % 4096 != 0: 672 sys.exit('Size of partition must be multiple of 4096') 673 674 # Update size as a page needs to be reserved of size 4KB 675 input_size = input_size - Page.PAGE_PARAMS['max_size'] 676 677 if input_size < (2 * Page.PAGE_PARAMS['max_size']): 678 sys.exit('Minimum NVS partition size needed is 0x3000 bytes.') 679 return input_size 680 except Exception as e: 681 print(e) 682 sys.exit(0) 683 684 685def set_target_filepath(outdir, filepath): 686 ''' 687 Set target file path: <outdir>/<filepath> 688 :param outdir: Target output dir to store files 689 :param filepath: Path of target file 690 ''' 691 bin_ext = '.bin' 692 # Expand if tilde(~) provided in path 693 outdir = os.path.expanduser(outdir) 694 695 if filepath: 696 key_file_name, ext = os.path.splitext(filepath) 697 if not ext: 698 filepath = key_file_name + bin_ext 699 elif bin_ext not in ext: 700 sys.exit('Error: `%s`. Only `%s` extension allowed.' % (filepath, bin_ext)) 701 702 # Create dir if does not exist 703 if not (os.path.isdir(outdir)): 704 distutils.dir_util.mkpath(outdir) 705 706 filedir, filename = os.path.split(filepath) 707 filedir = os.path.join(outdir,filedir,'') 708 if filedir and not os.path.isdir(filedir): 709 distutils.dir_util.mkpath(filedir) 710 711 if os.path.isabs(filepath): 712 if not outdir == os.getcwd(): 713 print('\nWarning: `%s` \n\t==> absolute path given so outdir is ignored for this file.' % filepath) 714 # Set to empty as outdir is ignored here 715 outdir = '' 716 717 # Set full path - outdir + filename 718 filepath = os.path.join(outdir, '') + filepath 719 720 return outdir, filepath 721 722 723def encrypt(args): 724 ''' 725 Generate encrypted NVS Partition 726 :param args: Command line arguments given 727 ''' 728 key = None 729 bin_ext = '.bin' 730 731 check_size(args.size) 732 if (args.keygen is False) and (not args.inputkey): 733 sys.exit('Error. --keygen or --inputkey argument needed.') 734 elif args.keygen and args.inputkey: 735 sys.exit('Error. --keygen and --inputkey both are not allowed.') 736 elif not args.keygen and args.keyfile: 737 print('\nWarning:','--inputkey argument is given. --keyfile argument will be ignored...') 738 739 if args.inputkey: 740 # Check if key file has .bin extension 741 filename, ext = os.path.splitext(args.inputkey) 742 if bin_ext not in ext: 743 sys.exit('Error: `%s`. Only `%s` extension allowed.' % (args.inputkey, bin_ext)) 744 key = bytearray() 745 with open(args.inputkey, 'rb') as key_f: 746 key = key_f.read(64) 747 748 # Generate encrypted NVS Partition 749 generate(args, is_encr_enabled=True, encr_key=key) 750 751 752def decrypt_data(data_input, decr_key, page_num, entry_no, entry_size): 753 ''' 754 Decrypt NVS data entry 755 ''' 756 page_max_size = 4096 757 first_entry_offset = 64 758 init_tweak_val = '0' 759 tweak_len_needed = 32 # in hex 760 tweak_tmp = '' 761 762 data_input = binascii.hexlify(data_input) 763 rel_addr = page_num * page_max_size + first_entry_offset 764 765 # Set tweak value 766 offset = entry_no * entry_size 767 addr = hex(rel_addr + offset)[2:] 768 addr_len = len(addr) 769 if addr_len > 2: 770 if not addr_len % 2: 771 addr_tmp = addr 772 else: 773 addr_tmp = init_tweak_val + addr 774 tweak_tmp = reverse_hexbytes(addr_tmp) 775 tweak_val = tweak_tmp + (init_tweak_val * (tweak_len_needed - (len(tweak_tmp)))) 776 else: 777 tweak_val = addr + (init_tweak_val * (tweak_len_needed - len(addr))) 778 779 if type(data_input) == bytes: 780 data_input = data_input.decode() 781 782 # Decrypt 32 bytes of data using AES-XTS decryption 783 backend = default_backend() 784 plain_text = codecs.decode(data_input, 'hex') 785 tweak = codecs.decode(tweak_val, 'hex') 786 cipher = Cipher(algorithms.AES(decr_key), modes.XTS(tweak), backend=backend) 787 decryptor = cipher.decryptor() 788 decrypted_data = decryptor.update(plain_text) 789 790 return decrypted_data 791 792 793def decrypt(args): 794 ''' 795 Decrypt encrypted NVS Partition 796 :param args: Command line arguments given 797 ''' 798 bin_ext = '.bin' 799 nvs_read_bytes = 32 800 decrypted_entry_no = 0 801 file_entry_no = 0 802 page_num = 0 803 page_max_size = 4096 804 start_entry_offset = 0 805 empty_data_entry = bytearray(b'\xff') * nvs_read_bytes 806 807 # Check if key file has .bin extension 808 input_files = [args.input, args.key, args.output] 809 for filepath in input_files: 810 filename, ext = os.path.splitext(filepath) 811 if bin_ext not in ext: 812 sys.exit('Error: `%s`. Only `%s` extension allowed.' % (filepath, bin_ext)) 813 with open(args.key,'rb') as decr_key_file: 814 decr_key = decr_key_file.read(64) 815 816 args.outdir, args.output = set_target_filepath(args.outdir, args.output) 817 818 output_buf = bytearray(b'\xff') 819 820 with open(args.input, 'rb') as input_file, open(args.output,'wb') as output_file: 821 while True: 822 if file_entry_no == 128: 823 decrypted_entry_no = 0 824 file_entry_no = 0 825 page_num += 1 826 data_entry = input_file.read(nvs_read_bytes) 827 if not data_entry: 828 break 829 if data_entry != empty_data_entry and file_entry_no not in [0,1]: 830 data_entry = decrypt_data(data_entry, decr_key, page_num, decrypted_entry_no, nvs_read_bytes) 831 decrypted_entry_no += 1 832 write_entry_no = ((page_num * page_max_size) + file_entry_no) 833 start_idx = start_entry_offset + (write_entry_no * nvs_read_bytes) 834 end_idx = nvs_read_bytes 835 output_buf[start_idx:end_idx] = data_entry 836 file_entry_no += 1 837 start_entry_offset += nvs_read_bytes 838 output_file.write(output_buf) 839 840 print('\nCreated NVS decrypted binary: ===>', args.output) 841 842 843def generate_key(args): 844 ''' 845 Generate encryption keys 846 :param args: Command line arguments given 847 ''' 848 page_max_size = 4096 849 keys_dir = 'keys' 850 output_keyfile = None 851 bin_ext = '.bin' 852 853 if not args.keyfile: 854 timestamp = datetime.datetime.now().strftime('%m-%d_%H-%M') 855 args.keyfile = 'keys-' + timestamp + bin_ext 856 857 keys_outdir = os.path.join(args.outdir,keys_dir, '') 858 # Create keys/ dir in <outdir> if does not exist 859 if not (os.path.isdir(keys_outdir)): 860 distutils.dir_util.mkpath(keys_outdir) 861 keys_outdir, output_keyfile = set_target_filepath(keys_outdir, args.keyfile) 862 863 key = ''.join(random.choice('0123456789abcdef') for _ in range(128)).strip() 864 encr_key_bytes = codecs.decode(key, 'hex') 865 key_len = len(encr_key_bytes) 866 867 keys_buf = bytearray(b'\xff') * page_max_size 868 keys_buf[0:key_len] = encr_key_bytes 869 crc_data = keys_buf[0:key_len] 870 crc_data = bytes(crc_data) 871 crc = zlib.crc32(crc_data, 0xFFFFFFFF) 872 struct.pack_into('<I', keys_buf, key_len, crc & 0xFFFFFFFF) 873 874 with open(output_keyfile, 'wb') as output_keys_file: 875 output_keys_file.write(keys_buf) 876 877 print('\nCreated encryption keys: ===> ', output_keyfile) 878 879 return key 880 881 882def generate(args, is_encr_enabled=False, encr_key=None): 883 ''' 884 Generate NVS Partition 885 :param args: Command line arguments given 886 :param is_encr_enabled: Encryption enabled/disabled 887 :param encr_key: Key to encrypt NVS partition 888 ''' 889 is_dir_new = False 890 bin_ext = '.bin' 891 892 input_size = check_size(args.size) 893 if args.version == 1: 894 args.version = Page.VERSION1 895 elif args.version == 2: 896 args.version = Page.VERSION2 897 898 # Check if key file has .bin extension 899 filename, ext = os.path.splitext(args.output) 900 if bin_ext not in ext: 901 sys.exit('Error: `%s`. Only `.bin` extension allowed.' % args.output) 902 args.outdir, args.output = set_target_filepath(args.outdir, args.output) 903 904 if is_encr_enabled and not encr_key: 905 encr_key = generate_key(args) 906 907 input_file = open(args.input, 'rt', encoding='utf8') 908 output_file = open(args.output, 'wb') 909 910 with open(args.input, 'rt', encoding='utf8') as input_file,\ 911 open(args.output, 'wb') as output_file,\ 912 nvs_open(output_file, input_size, args.version, is_encrypt=is_encr_enabled, key=encr_key) as nvs_obj: 913 914 if nvs_obj.version == Page.VERSION1: 915 version_set = VERSION1_PRINT 916 else: 917 version_set = VERSION2_PRINT 918 919 print('\nCreating NVS binary with version:', version_set) 920 921 line = input_file.readline().strip() 922 923 # Comments are skipped 924 while line.startswith('#'): 925 line = input_file.readline().strip() 926 if not isinstance(line, str): 927 line = line.encode('utf-8') 928 929 header = line.split(',') 930 931 while True: 932 line = input_file.readline().strip() 933 if not isinstance(line, str): 934 line = line.encode('utf-8') 935 936 value = line.split(',') 937 if len(value) == 1 and '' in value: 938 break 939 940 data = dict(zip_longest(header, value)) 941 942 try: 943 # Check key length 944 if len(data['key']) > 15: 945 raise InputError('Length of key `{}` should be <= 15 characters.'.format(data['key'])) 946 write_entry(nvs_obj, data['key'], data['type'], data['encoding'], data['value']) 947 except InputError as e: 948 print(e) 949 filedir, filename = os.path.split(args.output) 950 if filename: 951 print('\nWarning: NVS binary not created...') 952 os.remove(args.output) 953 if is_dir_new and not filedir == os.getcwd(): 954 print('\nWarning: Output dir not created...') 955 os.rmdir(filedir) 956 sys.exit(-2) 957 958 print('\nCreated NVS binary: ===>', args.output) 959 960 961def main(): 962 parser = argparse.ArgumentParser(description='\nESP NVS partition generation utility', formatter_class=argparse.RawTextHelpFormatter) 963 subparser = parser.add_subparsers(title='Commands', 964 dest='command', 965 help='\nRun nvs_partition_gen.py {command} -h for additional help\n\n') 966 967 parser_gen = subparser.add_parser('generate', 968 help='Generate NVS partition', 969 formatter_class=argparse.RawTextHelpFormatter) 970 parser_gen.set_defaults(func=generate) 971 parser_gen.add_argument('input', 972 default=None, 973 help='Path to CSV file to parse') 974 parser_gen.add_argument('output', 975 default=None, 976 help='Path to output NVS binary file') 977 parser_gen.add_argument('size', 978 default=None, 979 help='Size of NVS partition in bytes\ 980 \n(must be multiple of 4096)') 981 parser_gen.add_argument('--version', 982 choices=[1,2], 983 default=2, 984 type=int, 985 help='''Set multipage blob version.\ 986 \nVersion 1 - Multipage blob support disabled.\ 987 \nVersion 2 - Multipage blob support enabled.\ 988 \nDefault: Version 2''') 989 parser_gen.add_argument('--outdir', 990 default=os.getcwd(), 991 help='Output directory to store files created\ 992 \n(Default: current directory)') 993 parser_gen_key = subparser.add_parser('generate-key', 994 help='Generate keys for encryption', 995 formatter_class=argparse.RawTextHelpFormatter) 996 parser_gen_key.set_defaults(func=generate_key) 997 parser_gen_key.add_argument('--keyfile', 998 default=None, 999 help='Path to output encryption keys file') 1000 parser_gen_key.add_argument('--outdir', 1001 default=os.getcwd(), 1002 help='Output directory to store files created.\ 1003 \n(Default: current directory)') 1004 parser_encr = subparser.add_parser('encrypt', 1005 help='Generate NVS encrypted partition', 1006 formatter_class=argparse.RawTextHelpFormatter) 1007 parser_encr.set_defaults(func=encrypt) 1008 parser_encr.add_argument('input', 1009 default=None, 1010 help='Path to CSV file to parse') 1011 parser_encr.add_argument('output', 1012 default=None, 1013 help='Path to output NVS binary file') 1014 parser_encr.add_argument('size', 1015 default=None, 1016 help='Size of NVS partition in bytes\ 1017 \n(must be multiple of 4096)') 1018 parser_encr.add_argument('--version', 1019 choices=[1,2], 1020 default=2, 1021 type=int, 1022 help='''Set multipage blob version.\ 1023 \nVersion 1 - Multipage blob support disabled.\ 1024 \nVersion 2 - Multipage blob support enabled.\ 1025 \nDefault: Version 2''') 1026 parser_encr.add_argument('--keygen', 1027 action='store_true', 1028 default=False, 1029 help='Generates key for encrypting NVS partition') 1030 parser_encr.add_argument('--keyfile', 1031 default=None, 1032 help='Path to output encryption keys file') 1033 parser_encr.add_argument('--inputkey', 1034 default=None, 1035 help='File having key for encrypting NVS partition') 1036 parser_encr.add_argument('--outdir', 1037 default=os.getcwd(), 1038 help='Output directory to store files created.\ 1039 \n(Default: current directory)') 1040 parser_decr = subparser.add_parser('decrypt', 1041 help='Decrypt NVS encrypted partition', 1042 formatter_class=argparse.RawTextHelpFormatter) 1043 parser_decr.set_defaults(func=decrypt) 1044 parser_decr.add_argument('input', 1045 default=None, 1046 help='Path to encrypted NVS partition file to parse') 1047 parser_decr.add_argument('key', 1048 default=None, 1049 help='Path to file having keys for decryption') 1050 parser_decr.add_argument('output', 1051 default=None, 1052 help='Path to output decrypted binary file') 1053 parser_decr.add_argument('--outdir', 1054 default=os.getcwd(), 1055 help='Output directory to store files created.\ 1056 \n(Default: current directory)') 1057 args = parser.parse_args() 1058 1059 args.func(args) 1060 1061 1062if __name__ == '__main__': 1063 main() 1064