1#!/usr/bin/env python 2# 3# otatool is used to perform ota-level operations - flashing ota partition 4# erasing ota partition and switching ota partition 5# 6# SPDX-FileCopyrightText: 2018-2021 Espressif Systems (Shanghai) CO LTD 7# SPDX-License-Identifier: Apache-2.0 8from __future__ import division, print_function 9 10import argparse 11import binascii 12import collections 13import os 14import struct 15import sys 16import tempfile 17 18try: 19 from parttool import PARTITION_TABLE_OFFSET, PartitionName, PartitionType, ParttoolTarget 20except ImportError: 21 COMPONENTS_PATH = os.path.expandvars(os.path.join('$IDF_PATH', 'components')) 22 PARTTOOL_DIR = os.path.join(COMPONENTS_PATH, 'partition_table') 23 sys.path.append(PARTTOOL_DIR) 24 from parttool import PARTITION_TABLE_OFFSET, PartitionName, PartitionType, ParttoolTarget 25 26__version__ = '2.0' 27 28SPI_FLASH_SEC_SIZE = 0x2000 29 30quiet = False 31 32 33def status(msg): 34 if not quiet: 35 print(msg) 36 37 38class OtatoolTarget(): 39 40 OTADATA_PARTITION = PartitionType('data', 'ota') 41 42 def __init__(self, port=None, baud=None, partition_table_offset=PARTITION_TABLE_OFFSET, partition_table_file=None, 43 spi_flash_sec_size=SPI_FLASH_SEC_SIZE, esptool_args=[], esptool_write_args=[], 44 esptool_read_args=[], esptool_erase_args=[]): 45 self.target = ParttoolTarget(port, baud, partition_table_offset, partition_table_file, esptool_args, 46 esptool_write_args, esptool_read_args, esptool_erase_args) 47 self.spi_flash_sec_size = spi_flash_sec_size 48 49 temp_file = tempfile.NamedTemporaryFile(delete=False) 50 temp_file.close() 51 try: 52 self.target.read_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name) 53 with open(temp_file.name, 'rb') as f: 54 self.otadata = f.read() 55 finally: 56 os.unlink(temp_file.name) 57 58 def _check_otadata_partition(self): 59 if not self.otadata: 60 raise Exception('No otadata partition found') 61 62 def erase_otadata(self): 63 self._check_otadata_partition() 64 self.target.erase_partition(OtatoolTarget.OTADATA_PARTITION) 65 66 def _get_otadata_info(self): 67 info = [] 68 69 otadata_info = collections.namedtuple('otadata_info', 'seq crc') 70 71 for i in range(2): 72 start = i * (self.spi_flash_sec_size >> 1) 73 74 seq = bytearray(self.otadata[start:start + 4]) 75 crc = bytearray(self.otadata[start + 28:start + 32]) 76 77 seq = struct.unpack('I', seq) 78 crc = struct.unpack('I', crc) 79 info.append(otadata_info(seq[0], crc[0])) 80 81 return info 82 83 def _get_partition_id_from_ota_id(self, ota_id): 84 if isinstance(ota_id, int): 85 return PartitionType('app', 'ota_' + str(ota_id)) 86 else: 87 return PartitionName(ota_id) 88 89 def switch_ota_partition(self, ota_id): 90 self._check_otadata_partition() 91 92 import gen_esp32part as gen 93 94 def is_otadata_info_valid(status): 95 seq = status.seq % (1 << 32) 96 crc = binascii.crc32(struct.pack('I', seq), 0xFFFFFFFF) % (1 << 32) 97 return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc 98 99 partition_table = self.target.partition_table 100 101 ota_partitions = list() 102 103 for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA): 104 ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table) 105 106 try: 107 ota_partitions.append(list(ota_partition)[0]) 108 except IndexError: 109 break 110 111 ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype) 112 113 if not ota_partitions: 114 raise Exception('No ota app partitions found') 115 116 # Look for the app partition to switch to 117 ota_partition_next = None 118 119 try: 120 if isinstance(ota_id, int): 121 ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA == ota_id, ota_partitions) 122 else: 123 ota_partition_next = filter(lambda p: p.name == ota_id, ota_partitions) 124 125 ota_partition_next = list(ota_partition_next)[0] 126 except IndexError: 127 raise Exception('Partition to switch to not found') 128 129 otadata_info = self._get_otadata_info() 130 131 # Find the copy to base the computation for ota sequence number on 132 otadata_compute_base = -1 133 134 # Both are valid, take the max as computation base 135 if is_otadata_info_valid(otadata_info[0]) and is_otadata_info_valid(otadata_info[1]): 136 if otadata_info[0].seq >= otadata_info[1].seq: 137 otadata_compute_base = 0 138 else: 139 otadata_compute_base = 1 140 # Only one copy is valid, use that 141 elif is_otadata_info_valid(otadata_info[0]): 142 otadata_compute_base = 0 143 elif is_otadata_info_valid(otadata_info[1]): 144 otadata_compute_base = 1 145 # Both are invalid (could be initial state - all 0xFF's) 146 else: 147 pass 148 149 ota_seq_next = 0 150 ota_partitions_num = len(ota_partitions) 151 152 target_seq = (ota_partition_next.subtype & 0x0F) + 1 153 154 # Find the next ota sequence number 155 if otadata_compute_base == 0 or otadata_compute_base == 1: 156 base_seq = otadata_info[otadata_compute_base].seq % (1 << 32) 157 158 i = 0 159 while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num: 160 i += 1 161 162 ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num 163 else: 164 ota_seq_next = target_seq 165 166 # Create binary data from computed values 167 ota_seq_next = struct.pack('I', ota_seq_next) 168 ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32) 169 ota_seq_crc_next = struct.pack('I', ota_seq_crc_next) 170 171 temp_file = tempfile.NamedTemporaryFile(delete=False) 172 temp_file.close() 173 174 try: 175 with open(temp_file.name, 'wb') as otadata_next_file: 176 start = (1 if otadata_compute_base == 0 else 0) * (self.spi_flash_sec_size >> 1) 177 178 otadata_next_file.write(self.otadata) 179 180 otadata_next_file.seek(start) 181 otadata_next_file.write(ota_seq_next) 182 183 otadata_next_file.seek(start + 28) 184 otadata_next_file.write(ota_seq_crc_next) 185 186 otadata_next_file.flush() 187 188 self.target.write_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name) 189 finally: 190 os.unlink(temp_file.name) 191 192 def read_ota_partition(self, ota_id, output): 193 self.target.read_partition(self._get_partition_id_from_ota_id(ota_id), output) 194 195 def write_ota_partition(self, ota_id, input): 196 self.target.write_partition(self._get_partition_id_from_ota_id(ota_id), input) 197 198 def erase_ota_partition(self, ota_id): 199 self.target.erase_partition(self._get_partition_id_from_ota_id(ota_id)) 200 201 202def _read_otadata(target): 203 target._check_otadata_partition() 204 205 otadata_info = target._get_otadata_info() 206 207 print(' {:8s} \t {:8s} | \t {:8s} \t {:8s}'.format('OTA_SEQ', 'CRC', 'OTA_SEQ', 'CRC')) 208 print('Firmware: 0x{:08x} \t0x{:08x} | \t0x{:08x} \t 0x{:08x}'.format(otadata_info[0].seq, otadata_info[0].crc, 209 otadata_info[1].seq, otadata_info[1].crc)) 210 211 212def _erase_otadata(target): 213 target.erase_otadata() 214 status('Erased ota_data partition contents') 215 216 217def _switch_ota_partition(target, ota_id): 218 target.switch_ota_partition(ota_id) 219 220 221def _read_ota_partition(target, ota_id, output): 222 target.read_ota_partition(ota_id, output) 223 status('Read ota partition contents to file {}'.format(output)) 224 225 226def _write_ota_partition(target, ota_id, input): 227 target.write_ota_partition(ota_id, input) 228 status('Written contents of file {} to ota partition'.format(input)) 229 230 231def _erase_ota_partition(target, ota_id): 232 target.erase_ota_partition(ota_id) 233 status('Erased contents of ota partition') 234 235 236def main(): 237 global quiet 238 239 parser = argparse.ArgumentParser('ESP-IDF OTA Partitions Tool') 240 241 parser.add_argument('--quiet', '-q', help='suppress stderr messages', action='store_true') 242 parser.add_argument('--esptool-args', help='additional main arguments for esptool', nargs='+') 243 parser.add_argument('--esptool-write-args', help='additional subcommand arguments for esptool write_flash', nargs='+') 244 parser.add_argument('--esptool-read-args', help='additional subcommand arguments for esptool read_flash', nargs='+') 245 parser.add_argument('--esptool-erase-args', help='additional subcommand arguments for esptool erase_region', nargs='+') 246 247 # There are two possible sources for the partition table: a device attached to the host 248 # or a partition table CSV/binary file. These sources are mutually exclusive. 249 parser.add_argument('--port', '-p', help='port where the device to read the partition table from is attached') 250 251 parser.add_argument('--baud', '-b', help='baudrate to use', type=int) 252 253 parser.add_argument('--partition-table-offset', '-o', help='offset to read the partition table from', type=str) 254 255 parser.add_argument('--partition-table-file', '-f', help='file (CSV/binary) to read the partition table from; \ 256 overrides device attached to specified port as the partition table source when defined') 257 258 subparsers = parser.add_subparsers(dest='operation', help='run otatool -h for additional help') 259 260 spi_flash_sec_size = argparse.ArgumentParser(add_help=False) 261 spi_flash_sec_size.add_argument('--spi-flash-sec-size', help='value of SPI_FLASH_SEC_SIZE macro', type=str) 262 263 # Specify the supported operations 264 subparsers.add_parser('read_otadata', help='read otadata partition', parents=[spi_flash_sec_size]) 265 subparsers.add_parser('erase_otadata', help='erase otadata partition') 266 267 slot_or_name_parser = argparse.ArgumentParser(add_help=False) 268 slot_or_name_parser_args = slot_or_name_parser.add_mutually_exclusive_group() 269 slot_or_name_parser_args.add_argument('--slot', help='slot number of the ota partition', type=int) 270 slot_or_name_parser_args.add_argument('--name', help='name of the ota partition') 271 272 subparsers.add_parser('switch_ota_partition', help='switch otadata partition', parents=[slot_or_name_parser, spi_flash_sec_size]) 273 274 read_ota_partition_subparser = subparsers.add_parser('read_ota_partition', help='read contents of an ota partition', parents=[slot_or_name_parser]) 275 read_ota_partition_subparser.add_argument('--output', help='file to write the contents of the ota partition to', required=True) 276 277 write_ota_partition_subparser = subparsers.add_parser('write_ota_partition', help='write contents to an ota partition', parents=[slot_or_name_parser]) 278 write_ota_partition_subparser.add_argument('--input', help='file whose contents to write to the ota partition') 279 280 subparsers.add_parser('erase_ota_partition', help='erase contents of an ota partition', parents=[slot_or_name_parser]) 281 282 args = parser.parse_args() 283 284 quiet = args.quiet 285 286 # No operation specified, display help and exit 287 if args.operation is None: 288 if not quiet: 289 parser.print_help() 290 sys.exit(1) 291 292 target_args = {} 293 294 if args.port: 295 target_args['port'] = args.port 296 297 if args.partition_table_file: 298 target_args['partition_table_file'] = args.partition_table_file 299 300 if args.partition_table_offset: 301 target_args['partition_table_offset'] = int(args.partition_table_offset, 0) 302 303 try: 304 if args.spi_flash_sec_size: 305 target_args['spi_flash_sec_size'] = int(args.spi_flash_sec_size, 0) 306 except AttributeError: 307 pass 308 309 if args.esptool_args: 310 target_args['esptool_args'] = args.esptool_args 311 312 if args.esptool_write_args: 313 target_args['esptool_write_args'] = args.esptool_write_args 314 315 if args.esptool_read_args: 316 target_args['esptool_read_args'] = args.esptool_read_args 317 318 if args.esptool_erase_args: 319 target_args['esptool_erase_args'] = args.esptool_erase_args 320 321 if args.baud: 322 target_args['baud'] = args.baud 323 324 target = OtatoolTarget(**target_args) 325 326 # Create the operation table and execute the operation 327 common_args = {'target':target} 328 329 ota_id = [] 330 331 try: 332 if args.name is not None: 333 ota_id = ['name'] 334 else: 335 if args.slot is not None: 336 ota_id = ['slot'] 337 except AttributeError: 338 pass 339 340 otatool_ops = { 341 'read_otadata':(_read_otadata, []), 342 'erase_otadata':(_erase_otadata, []), 343 'switch_ota_partition':(_switch_ota_partition, ota_id), 344 'read_ota_partition':(_read_ota_partition, ['output'] + ota_id), 345 'write_ota_partition':(_write_ota_partition, ['input'] + ota_id), 346 'erase_ota_partition':(_erase_ota_partition, ota_id) 347 } 348 349 (op, op_args) = otatool_ops[args.operation] 350 351 for op_arg in op_args: 352 common_args.update({op_arg:vars(args)[op_arg]}) 353 354 try: 355 common_args['ota_id'] = common_args.pop('name') 356 except KeyError: 357 try: 358 common_args['ota_id'] = common_args.pop('slot') 359 except KeyError: 360 pass 361 362 if quiet: 363 # If exceptions occur, suppress and exit quietly 364 try: 365 op(**common_args) 366 except Exception: 367 sys.exit(2) 368 else: 369 op(**common_args) 370 371 372if __name__ == '__main__': 373 main() 374