1#!/usr/bin/env python
3# Copyright 2018 Espressif Systems (Shanghai) PTE LTD
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
9#     http://www.apache.org/licenses/LICENSE-2.0
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
18from __future__ import print_function
20import argparse
21import json
22import os
23import sys
24import textwrap
25import time
26from builtins import input as binput
27from getpass import getpass
30    import prov
31    import security
32    import transport
34except ImportError:
35    idf_path = os.environ['IDF_PATH']
36    sys.path.insert(0, idf_path + '/components/protocomm/python')
37    sys.path.insert(1, idf_path + '/tools/esp_prov')
39    import prov
40    import security
41    import transport
43# Set this to true to allow exceptions to be thrown
44config_throw_except = False
47def on_except(err):
48    if config_throw_except:
49        raise RuntimeError(err)
50    else:
51        print(err)
54def get_security(secver, pop='', verbose=False):
55    if secver == 1:
56        return security.Security1(pop, verbose)
57    elif secver == 0:
58        return security.Security0(verbose)
59    return None
62def get_transport(sel_transport, service_name):
63    try:
64        tp = None
65        if (sel_transport == 'softap'):
66            if service_name is None:
67                service_name = ''
68            tp = transport.Transport_HTTP(service_name)
69        elif (sel_transport == 'ble'):
70            if service_name is None:
71                raise RuntimeError('"--service_name" must be specified for ble transport')
72            # BLE client is now capable of automatically figuring out
73            # the primary service from the advertisement data and the
74            # characteristics corresponding to each endpoint.
75            # Below, the service_uuid field and 16bit UUIDs in the nu_lookup
76            # table are provided only to support devices running older firmware,
77            # in which case, the automated discovery will fail and the client
78            # will fallback to using the provided UUIDs instead
79            nu_lookup = {'prov-session': 'ff51', 'prov-config': 'ff52', 'proto-ver': 'ff53'}
80            tp = transport.Transport_BLE(devname=service_name,
81                                         service_uuid='021a9004-0382-4aea-bff4-6b3f1c5adfb4',
82                                         nu_lookup=nu_lookup)
83        elif (sel_transport == 'console'):
84            tp = transport.Transport_Console()
85        return tp
86    except RuntimeError as e:
87        on_except(e)
88        return None
91def version_match(tp, protover, verbose=False):
92    try:
93        response = tp.send_data('proto-ver', protover)
95        if verbose:
96            print('proto-ver response : ', response)
98        # First assume this to be a simple version string
99        if response.lower() == protover.lower():
100            return True
102        try:
103            # Else interpret this as JSON structure containing
104            # information with versions and capabilities of both
105            # provisioning service and application
106            info = json.loads(response)
107            if info['prov']['ver'].lower() == protover.lower():
108                return True
110        except ValueError:
111            # If decoding as JSON fails, it means that capabilities
112            # are not supported
113            return False
115    except Exception as e:
116        on_except(e)
117        return None
120def has_capability(tp, capability='none', verbose=False):
121    # Note : default value of `capability` argument cannot be empty string
122    # because protocomm_httpd expects non zero content lengths
123    try:
124        response = tp.send_data('proto-ver', capability)
126        if verbose:
127            print('proto-ver response : ', response)
129        try:
130            # Interpret this as JSON structure containing
131            # information with versions and capabilities of both
132            # provisioning service and application
133            info = json.loads(response)
134            supported_capabilities = info['prov']['cap']
135            if capability.lower() == 'none':
136                # No specific capability to check, but capabilities
137                # feature is present so return True
138                return True
139            elif capability in supported_capabilities:
140                return True
141            return False
143        except ValueError:
144            # If decoding as JSON fails, it means that capabilities
145            # are not supported
146            return False
148    except RuntimeError as e:
149        on_except(e)
151    return False
154def get_version(tp):
155    response = None
156    try:
157        response = tp.send_data('proto-ver', '---')
158    except RuntimeError as e:
159        on_except(e)
160        response = ''
161    return response
164def establish_session(tp, sec):
165    try:
166        response = None
167        while True:
168            request = sec.security_session(response)
169            if request is None:
170                break
171            response = tp.send_data('prov-session', request)
172            if (response is None):
173                return False
174        return True
175    except RuntimeError as e:
176        on_except(e)
177        return None
180def custom_config(tp, sec, custom_info, custom_ver):
181    try:
182        message = prov.custom_config_request(sec, custom_info, custom_ver)
183        response = tp.send_data('custom-config', message)
184        return (prov.custom_config_response(sec, response) == 0)
185    except RuntimeError as e:
186        on_except(e)
187        return None
190def custom_data(tp, sec, custom_data):
191    try:
192        message = prov.custom_data_request(sec, custom_data)
193        response = tp.send_data('custom-data', message)
194        return (prov.custom_data_response(sec, response) == 0)
195    except RuntimeError as e:
196        on_except(e)
197        return None
200def scan_wifi_APs(sel_transport, tp, sec):
201    APs = []
202    group_channels = 0
203    readlen = 100
204    if sel_transport == 'softap':
205        # In case of softAP we must perform the scan on individual channels, one by one,
206        # so that the Wi-Fi controller gets ample time to send out beacons (necessary to
207        # maintain connectivity with authenticated stations. As scanning one channel at a
208        # time will be slow, we can group more than one channels to be scanned in quick
209        # succession, hence speeding up the scan process. Though if too many channels are
210        # present in a group, the controller may again miss out on sending beacons. Hence,
211        # the application must should use an optimum value. The following value usually
212        # works out in most cases
213        group_channels = 5
214    elif sel_transport == 'ble':
215        # Read at most 4 entries at a time. This is because if we are using BLE transport
216        # then the response packet size should not exceed the present limit of 256 bytes of
217        # characteristic value imposed by protocomm_ble. This limit may be removed in the
218        # future
219        readlen = 4
220    try:
221        message = prov.scan_start_request(sec, blocking=True, group_channels=group_channels)
222        start_time = time.time()
223        response = tp.send_data('prov-scan', message)
224        stop_time = time.time()
225        print('++++ Scan process executed in ' + str(stop_time - start_time) + ' sec')
226        prov.scan_start_response(sec, response)
228        message = prov.scan_status_request(sec)
229        response = tp.send_data('prov-scan', message)
230        result = prov.scan_status_response(sec, response)
231        print('++++ Scan results : ' + str(result['count']))
232        if result['count'] != 0:
233            index = 0
234            remaining = result['count']
235            while remaining:
236                count = [remaining, readlen][remaining > readlen]
237                message = prov.scan_result_request(sec, index, count)
238                response = tp.send_data('prov-scan', message)
239                APs += prov.scan_result_response(sec, response)
240                remaining -= count
241                index += count
243    except RuntimeError as e:
244        on_except(e)
245        return None
247    return APs
250def send_wifi_config(tp, sec, ssid, passphrase):
251    try:
252        message = prov.config_set_config_request(sec, ssid, passphrase)
253        response = tp.send_data('prov-config', message)
254        return (prov.config_set_config_response(sec, response) == 0)
255    except RuntimeError as e:
256        on_except(e)
257        return None
260def apply_wifi_config(tp, sec):
261    try:
262        message = prov.config_apply_config_request(sec)
263        response = tp.send_data('prov-config', message)
264        return (prov.config_apply_config_response(sec, response) == 0)
265    except RuntimeError as e:
266        on_except(e)
267        return None
270def get_wifi_config(tp, sec):
271    try:
272        message = prov.config_get_status_request(sec)
273        response = tp.send_data('prov-config', message)
274        return prov.config_get_status_response(sec, response)
275    except RuntimeError as e:
276        on_except(e)
277        return None
280def wait_wifi_connected(tp, sec):
281    """
282    Wait for provisioning to report Wi-Fi is connected
284    Returns True if Wi-Fi connection succeeded, False if connection consistently failed
285    """
286    TIME_PER_POLL = 5
287    retry = 3
289    while True:
290        time.sleep(TIME_PER_POLL)
291        print('\n==== Wi-Fi connection state  ====')
292        ret = get_wifi_config(tp, sec)
293        if ret == 'connecting':
294            continue
295        elif ret == 'connected':
296            print('==== Provisioning was successful ====')
297            return True
298        elif retry > 0:
299            retry -= 1
300            print('Waiting to poll status again (status %s, %d tries left)...' % (ret, retry))
301        else:
302            print('---- Provisioning failed ----')
303            return False
306def desc_format(*args):
307    desc = ''
308    for arg in args:
309        desc += textwrap.fill(replace_whitespace=False, text=arg) + '\n'
310    return desc
313if __name__ == '__main__':
314    parser = argparse.ArgumentParser(description=desc_format(
315                                     'ESP Provisioning tool for configuring devices '
316                                     'running protocomm based provisioning service.',
317                                     'See esp-idf/examples/provisioning for sample applications'),
318                                     formatter_class=argparse.RawTextHelpFormatter)
320    parser.add_argument('--transport', required=True, dest='mode', type=str,
321                        help=desc_format(
322                            'Mode of transport over which provisioning is to be performed.',
323                            'This should be one of "softap", "ble" or "console"'))
325    parser.add_argument('--service_name', dest='name', type=str,
326                        help=desc_format(
327                            'This specifies the name of the provisioning service to connect to, '
328                            'depending upon the mode of transport :',
329                            '\t- transport "ble"    : The BLE Device Name',
330                            '\t- transport "softap" : HTTP Server hostname or IP',
331                            '\t                       (default "")'))
333    parser.add_argument('--proto_ver', dest='version', type=str, default='',
334                        help=desc_format(
335                            'This checks the protocol version of the provisioning service running '
336                            'on the device before initiating Wi-Fi configuration'))
338    parser.add_argument('--sec_ver', dest='secver', type=int, default=None,
339                        help=desc_format(
340                            'Protocomm security scheme used by the provisioning service for secure '
341                            'session establishment. Accepted values are :',
342                            '\t- 0 : No security',
343                            '\t- 1 : X25519 key exchange + AES-CTR encryption',
344                            '\t      + Authentication using Proof of Possession (PoP)',
345                            'In case device side application uses IDF\'s provisioning manager, '
346                            'the compatible security version is automatically determined from '
347                            'capabilities retrieved via the version endpoint'))
349    parser.add_argument('--pop', dest='pop', type=str, default='',
350                        help=desc_format(
351                            'This specifies the Proof of possession (PoP) when security scheme 1 '
352                            'is used'))
354    parser.add_argument('--ssid', dest='ssid', type=str, default='',
355                        help=desc_format(
356                            'This configures the device to use SSID of the Wi-Fi network to which '
357                            'we would like it to connect to permanently, once provisioning is complete. '
358                            'If Wi-Fi scanning is supported by the provisioning service, this need not '
359                            'be specified'))
361    parser.add_argument('--passphrase', dest='passphrase', type=str, default='',
362                        help=desc_format(
363                            'This configures the device to use Passphrase for the Wi-Fi network to which '
364                            'we would like it to connect to permanently, once provisioning is complete. '
365                            'If Wi-Fi scanning is supported by the provisioning service, this need not '
366                            'be specified'))
368    parser.add_argument('--custom_data', dest='custom_data', type=str, default='',
369                        help=desc_format(
370                            'This is an optional parameter, only intended for use with '
371                            '"examples/provisioning/wifi_prov_mgr_custom_data"'))
373    parser.add_argument('--custom_config', action='store_true',
374                        help=desc_format(
375                            'This is an optional parameter, only intended for use with '
376                            '"examples/provisioning/custom_config"'))
377    parser.add_argument('--custom_info', dest='custom_info', type=str, default='<some custom info string>',
378                        help=desc_format(
379                            'Custom Config Info String. "--custom_config" must be specified for using this'))
380    parser.add_argument('--custom_ver', dest='custom_ver', type=int, default=2,
381                        help=desc_format(
382                            'Custom Config Version Number. "--custom_config" must be specified for using this'))
384    parser.add_argument('-v','--verbose', help='Increase output verbosity', action='store_true')
386    args = parser.parse_args()
388    obj_transport = get_transport(args.mode.lower(), args.name)
389    if obj_transport is None:
390        print('---- Failed to establish connection ----')
391        exit(1)
393    # If security version not specified check in capabilities
394    if args.secver is None:
395        # First check if capabilities are supported or not
396        if not has_capability(obj_transport):
397            print('Security capabilities could not be determined. Please specify "--sec_ver" explicitly')
398            print('---- Invalid Security Version ----')
399            exit(2)
401        # When no_sec is present, use security 0, else security 1
402        args.secver = int(not has_capability(obj_transport, 'no_sec'))
403        print('Security scheme determined to be :', args.secver)
405        if (args.secver != 0) and not has_capability(obj_transport, 'no_pop'):
406            if len(args.pop) == 0:
407                print('---- Proof of Possession argument not provided ----')
408                exit(2)
409        elif len(args.pop) != 0:
410            print('---- Proof of Possession will be ignored ----')
411            args.pop = ''
413    obj_security = get_security(args.secver, args.pop, args.verbose)
414    if obj_security is None:
415        print('---- Invalid Security Version ----')
416        exit(2)
418    if args.version != '':
419        print('\n==== Verifying protocol version ====')
420        if not version_match(obj_transport, args.version, args.verbose):
421            print('---- Error in protocol version matching ----')
422            exit(3)
423        print('==== Verified protocol version successfully ====')
425    print('\n==== Starting Session ====')
426    if not establish_session(obj_transport, obj_security):
427        print('Failed to establish session. Ensure that security scheme and proof of possession are correct')
428        print('---- Error in establishing session ----')
429        exit(4)
430    print('==== Session Established ====')
432    if args.custom_config:
433        print('\n==== Sending Custom config to esp32 ====')
434        if not custom_config(obj_transport, obj_security, args.custom_info, args.custom_ver):
435            print('---- Error in custom config ----')
436            exit(5)
437        print('==== Custom config sent successfully ====')
439    if args.custom_data != '':
440        print('\n==== Sending Custom data to esp32 ====')
441        if not custom_data(obj_transport, obj_security, args.custom_data):
442            print('---- Error in custom data ----')
443            exit(5)
444        print('==== Custom data sent successfully ====')
446    if args.ssid == '':
447        if not has_capability(obj_transport, 'wifi_scan'):
448            print('---- Wi-Fi Scan List is not supported by provisioning service ----')
449            print('---- Rerun esp_prov with SSID and Passphrase as argument ----')
450            exit(3)
452        while True:
453            print('\n==== Scanning Wi-Fi APs ====')
454            start_time = time.time()
455            APs = scan_wifi_APs(args.mode.lower(), obj_transport, obj_security)
456            end_time = time.time()
457            print('\n++++ Scan finished in ' + str(end_time - start_time) + ' sec')
458            if APs is None:
459                print('---- Error in scanning Wi-Fi APs ----')
460                exit(8)
462            if len(APs) == 0:
463                print('No APs found!')
464                exit(9)
466            print('==== Wi-Fi Scan results ====')
467            print('{0: >4} {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format(
468                'S.N.', 'SSID', 'BSSID', 'CHN', 'RSSI', 'AUTH'))
469            for i in range(len(APs)):
470                print('[{0: >2}] {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format(
471                    i + 1, APs[i]['ssid'], APs[i]['bssid'], APs[i]['channel'], APs[i]['rssi'], APs[i]['auth']))
473            while True:
474                try:
475                    select = int(binput('Select AP by number (0 to rescan) : '))
476                    if select < 0 or select > len(APs):
477                        raise ValueError
478                    break
479                except ValueError:
480                    print('Invalid input! Retry')
482            if select != 0:
483                break
485        args.ssid = APs[select - 1]['ssid']
486        prompt_str = 'Enter passphrase for {0} : '.format(args.ssid)
487        args.passphrase = getpass(prompt_str)
489    print('\n==== Sending Wi-Fi credential to esp32 ====')
490    if not send_wifi_config(obj_transport, obj_security, args.ssid, args.passphrase):
491        print('---- Error in send Wi-Fi config ----')
492        exit(6)
493    print('==== Wi-Fi Credentials sent successfully ====')
495    print('\n==== Applying config to esp32 ====')
496    if not apply_wifi_config(obj_transport, obj_security):
497        print('---- Error in apply Wi-Fi config ----')
498        exit(7)
499    print('==== Apply config sent successfully ====')
501    wait_wifi_connected(obj_transport, obj_security)