1#!/usr/bin/env python
2#
3# otatool is used to perform ota-level operations - flashing ota partition
4# erasing ota partition and switching ota partition
5#
6# SPDX-FileCopyrightText: 2018-2021 Espressif Systems (Shanghai) CO LTD
7# SPDX-License-Identifier: Apache-2.0
8from __future__ import division, print_function
9
10import argparse
11import binascii
12import collections
13import os
14import struct
15import sys
16import tempfile
17
18try:
19    from parttool import PARTITION_TABLE_OFFSET, PartitionName, PartitionType, ParttoolTarget
20except ImportError:
21    COMPONENTS_PATH = os.path.expandvars(os.path.join('$IDF_PATH', 'components'))
22    PARTTOOL_DIR = os.path.join(COMPONENTS_PATH, 'partition_table')
23    sys.path.append(PARTTOOL_DIR)
24    from parttool import PARTITION_TABLE_OFFSET, PartitionName, PartitionType, ParttoolTarget
25
26__version__ = '2.0'
27
28SPI_FLASH_SEC_SIZE = 0x2000
29
30quiet = False
31
32
33def status(msg):
34    if not quiet:
35        print(msg)
36
37
38class OtatoolTarget():
39
40    OTADATA_PARTITION = PartitionType('data', 'ota')
41
42    def __init__(self, port=None, baud=None, partition_table_offset=PARTITION_TABLE_OFFSET, partition_table_file=None,
43                 spi_flash_sec_size=SPI_FLASH_SEC_SIZE, esptool_args=[], esptool_write_args=[],
44                 esptool_read_args=[], esptool_erase_args=[]):
45        self.target = ParttoolTarget(port, baud, partition_table_offset, partition_table_file, esptool_args,
46                                     esptool_write_args, esptool_read_args, esptool_erase_args)
47        self.spi_flash_sec_size = spi_flash_sec_size
48
49        temp_file = tempfile.NamedTemporaryFile(delete=False)
50        temp_file.close()
51        try:
52            self.target.read_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
53            with open(temp_file.name, 'rb') as f:
54                self.otadata = f.read()
55        finally:
56            os.unlink(temp_file.name)
57
58    def _check_otadata_partition(self):
59        if not self.otadata:
60            raise Exception('No otadata partition found')
61
62    def erase_otadata(self):
63        self._check_otadata_partition()
64        self.target.erase_partition(OtatoolTarget.OTADATA_PARTITION)
65
66    def _get_otadata_info(self):
67        info = []
68
69        otadata_info = collections.namedtuple('otadata_info', 'seq crc')
70
71        for i in range(2):
72            start = i * (self.spi_flash_sec_size >> 1)
73
74            seq = bytearray(self.otadata[start:start + 4])
75            crc = bytearray(self.otadata[start + 28:start + 32])
76
77            seq = struct.unpack('I', seq)
78            crc = struct.unpack('I', crc)
79            info.append(otadata_info(seq[0], crc[0]))
80
81        return info
82
83    def _get_partition_id_from_ota_id(self, ota_id):
84        if isinstance(ota_id, int):
85            return PartitionType('app', 'ota_' + str(ota_id))
86        else:
87            return PartitionName(ota_id)
88
89    def switch_ota_partition(self, ota_id):
90        self._check_otadata_partition()
91
92        import gen_esp32part as gen
93
94        def is_otadata_info_valid(status):
95            seq = status.seq % (1 << 32)
96            crc = binascii.crc32(struct.pack('I', seq), 0xFFFFFFFF) % (1 << 32)
97            return seq < (int('0xFFFFFFFF', 16) % (1 << 32)) and status.crc == crc
98
99        partition_table = self.target.partition_table
100
101        ota_partitions = list()
102
103        for i in range(gen.NUM_PARTITION_SUBTYPE_APP_OTA):
104            ota_partition = filter(lambda p: p.subtype == (gen.MIN_PARTITION_SUBTYPE_APP_OTA + i), partition_table)
105
106            try:
107                ota_partitions.append(list(ota_partition)[0])
108            except IndexError:
109                break
110
111        ota_partitions = sorted(ota_partitions, key=lambda p: p.subtype)
112
113        if not ota_partitions:
114            raise Exception('No ota app partitions found')
115
116        # Look for the app partition to switch to
117        ota_partition_next = None
118
119        try:
120            if isinstance(ota_id, int):
121                ota_partition_next = filter(lambda p: p.subtype - gen.MIN_PARTITION_SUBTYPE_APP_OTA  == ota_id, ota_partitions)
122            else:
123                ota_partition_next = filter(lambda p: p.name == ota_id, ota_partitions)
124
125            ota_partition_next = list(ota_partition_next)[0]
126        except IndexError:
127            raise Exception('Partition to switch to not found')
128
129        otadata_info = self._get_otadata_info()
130
131        # Find the copy to base the computation for ota sequence number on
132        otadata_compute_base = -1
133
134        # Both are valid, take the max as computation base
135        if is_otadata_info_valid(otadata_info[0]) and is_otadata_info_valid(otadata_info[1]):
136            if otadata_info[0].seq >= otadata_info[1].seq:
137                otadata_compute_base = 0
138            else:
139                otadata_compute_base = 1
140        # Only one copy is valid, use that
141        elif is_otadata_info_valid(otadata_info[0]):
142            otadata_compute_base = 0
143        elif is_otadata_info_valid(otadata_info[1]):
144            otadata_compute_base = 1
145        # Both are invalid (could be initial state - all 0xFF's)
146        else:
147            pass
148
149        ota_seq_next = 0
150        ota_partitions_num = len(ota_partitions)
151
152        target_seq = (ota_partition_next.subtype & 0x0F) + 1
153
154        # Find the next ota sequence number
155        if otadata_compute_base == 0 or otadata_compute_base == 1:
156            base_seq = otadata_info[otadata_compute_base].seq % (1 << 32)
157
158            i = 0
159            while base_seq > target_seq % ota_partitions_num + i * ota_partitions_num:
160                i += 1
161
162            ota_seq_next = target_seq % ota_partitions_num + i * ota_partitions_num
163        else:
164            ota_seq_next = target_seq
165
166        # Create binary data from computed values
167        ota_seq_next = struct.pack('I', ota_seq_next)
168        ota_seq_crc_next = binascii.crc32(ota_seq_next, 0xFFFFFFFF) % (1 << 32)
169        ota_seq_crc_next = struct.pack('I', ota_seq_crc_next)
170
171        temp_file = tempfile.NamedTemporaryFile(delete=False)
172        temp_file.close()
173
174        try:
175            with open(temp_file.name, 'wb') as otadata_next_file:
176                start = (1 if otadata_compute_base == 0 else 0) * (self.spi_flash_sec_size >> 1)
177
178                otadata_next_file.write(self.otadata)
179
180                otadata_next_file.seek(start)
181                otadata_next_file.write(ota_seq_next)
182
183                otadata_next_file.seek(start + 28)
184                otadata_next_file.write(ota_seq_crc_next)
185
186                otadata_next_file.flush()
187
188            self.target.write_partition(OtatoolTarget.OTADATA_PARTITION, temp_file.name)
189        finally:
190            os.unlink(temp_file.name)
191
192    def read_ota_partition(self, ota_id, output):
193        self.target.read_partition(self._get_partition_id_from_ota_id(ota_id), output)
194
195    def write_ota_partition(self, ota_id, input):
196        self.target.write_partition(self._get_partition_id_from_ota_id(ota_id), input)
197
198    def erase_ota_partition(self, ota_id):
199        self.target.erase_partition(self._get_partition_id_from_ota_id(ota_id))
200
201
202def _read_otadata(target):
203    target._check_otadata_partition()
204
205    otadata_info = target._get_otadata_info()
206
207    print('             {:8s} \t  {:8s} | \t  {:8s} \t   {:8s}'.format('OTA_SEQ', 'CRC', 'OTA_SEQ', 'CRC'))
208    print('Firmware:  0x{:08x} \t0x{:08x} | \t0x{:08x} \t 0x{:08x}'.format(otadata_info[0].seq, otadata_info[0].crc,
209          otadata_info[1].seq, otadata_info[1].crc))
210
211
212def _erase_otadata(target):
213    target.erase_otadata()
214    status('Erased ota_data partition contents')
215
216
217def _switch_ota_partition(target, ota_id):
218    target.switch_ota_partition(ota_id)
219
220
221def _read_ota_partition(target, ota_id, output):
222    target.read_ota_partition(ota_id, output)
223    status('Read ota partition contents to file {}'.format(output))
224
225
226def _write_ota_partition(target, ota_id, input):
227    target.write_ota_partition(ota_id, input)
228    status('Written contents of file {} to ota partition'.format(input))
229
230
231def _erase_ota_partition(target, ota_id):
232    target.erase_ota_partition(ota_id)
233    status('Erased contents of ota partition')
234
235
236def main():
237    global quiet
238
239    parser = argparse.ArgumentParser('ESP-IDF OTA Partitions Tool')
240
241    parser.add_argument('--quiet', '-q', help='suppress stderr messages', action='store_true')
242    parser.add_argument('--esptool-args', help='additional main arguments for esptool', nargs='+')
243    parser.add_argument('--esptool-write-args', help='additional subcommand arguments for esptool write_flash', nargs='+')
244    parser.add_argument('--esptool-read-args', help='additional subcommand arguments for esptool read_flash', nargs='+')
245    parser.add_argument('--esptool-erase-args', help='additional subcommand arguments for esptool erase_region', nargs='+')
246
247    # There are two possible sources for the partition table: a device attached to the host
248    # or a partition table CSV/binary file. These sources are mutually exclusive.
249    parser.add_argument('--port', '-p', help='port where the device to read the partition table from is attached')
250
251    parser.add_argument('--baud', '-b', help='baudrate to use', type=int)
252
253    parser.add_argument('--partition-table-offset', '-o', help='offset to read the partition table from',  type=str)
254
255    parser.add_argument('--partition-table-file', '-f', help='file (CSV/binary) to read the partition table from; \
256                                                            overrides device attached to specified port as the partition table source when defined')
257
258    subparsers = parser.add_subparsers(dest='operation', help='run otatool -h for additional help')
259
260    spi_flash_sec_size = argparse.ArgumentParser(add_help=False)
261    spi_flash_sec_size.add_argument('--spi-flash-sec-size', help='value of SPI_FLASH_SEC_SIZE macro', type=str)
262
263    # Specify the supported operations
264    subparsers.add_parser('read_otadata', help='read otadata partition', parents=[spi_flash_sec_size])
265    subparsers.add_parser('erase_otadata', help='erase otadata partition')
266
267    slot_or_name_parser = argparse.ArgumentParser(add_help=False)
268    slot_or_name_parser_args = slot_or_name_parser.add_mutually_exclusive_group()
269    slot_or_name_parser_args.add_argument('--slot', help='slot number of the ota partition', type=int)
270    slot_or_name_parser_args.add_argument('--name', help='name of the ota partition')
271
272    subparsers.add_parser('switch_ota_partition', help='switch otadata partition', parents=[slot_or_name_parser, spi_flash_sec_size])
273
274    read_ota_partition_subparser = subparsers.add_parser('read_ota_partition', help='read contents of an ota partition', parents=[slot_or_name_parser])
275    read_ota_partition_subparser.add_argument('--output', help='file to write the contents of the ota partition to', required=True)
276
277    write_ota_partition_subparser = subparsers.add_parser('write_ota_partition', help='write contents to an ota partition', parents=[slot_or_name_parser])
278    write_ota_partition_subparser.add_argument('--input', help='file whose contents to write to the ota partition')
279
280    subparsers.add_parser('erase_ota_partition', help='erase contents of an ota partition', parents=[slot_or_name_parser])
281
282    args = parser.parse_args()
283
284    quiet = args.quiet
285
286    # No operation specified, display help and exit
287    if args.operation is None:
288        if not quiet:
289            parser.print_help()
290        sys.exit(1)
291
292    target_args = {}
293
294    if args.port:
295        target_args['port'] = args.port
296
297    if args.partition_table_file:
298        target_args['partition_table_file'] = args.partition_table_file
299
300    if args.partition_table_offset:
301        target_args['partition_table_offset'] = int(args.partition_table_offset, 0)
302
303    try:
304        if args.spi_flash_sec_size:
305            target_args['spi_flash_sec_size'] = int(args.spi_flash_sec_size, 0)
306    except AttributeError:
307        pass
308
309    if args.esptool_args:
310        target_args['esptool_args'] = args.esptool_args
311
312    if args.esptool_write_args:
313        target_args['esptool_write_args'] = args.esptool_write_args
314
315    if args.esptool_read_args:
316        target_args['esptool_read_args'] = args.esptool_read_args
317
318    if args.esptool_erase_args:
319        target_args['esptool_erase_args'] = args.esptool_erase_args
320
321    if args.baud:
322        target_args['baud'] = args.baud
323
324    target = OtatoolTarget(**target_args)
325
326    # Create the operation table and execute the operation
327    common_args = {'target':target}
328
329    ota_id = []
330
331    try:
332        if args.name is not None:
333            ota_id = ['name']
334        else:
335            if args.slot is not None:
336                ota_id = ['slot']
337    except AttributeError:
338        pass
339
340    otatool_ops = {
341        'read_otadata':(_read_otadata, []),
342        'erase_otadata':(_erase_otadata, []),
343        'switch_ota_partition':(_switch_ota_partition, ota_id),
344        'read_ota_partition':(_read_ota_partition, ['output'] + ota_id),
345        'write_ota_partition':(_write_ota_partition, ['input'] + ota_id),
346        'erase_ota_partition':(_erase_ota_partition, ota_id)
347    }
348
349    (op, op_args) = otatool_ops[args.operation]
350
351    for op_arg in op_args:
352        common_args.update({op_arg:vars(args)[op_arg]})
353
354    try:
355        common_args['ota_id'] = common_args.pop('name')
356    except KeyError:
357        try:
358            common_args['ota_id'] = common_args.pop('slot')
359        except KeyError:
360            pass
361
362    if quiet:
363        # If exceptions occur, suppress and exit quietly
364        try:
365            op(**common_args)
366        except Exception:
367            sys.exit(2)
368    else:
369        op(**common_args)
370
371
372if __name__ == '__main__':
373    main()
374