1#!/usr/bin/env python 2# 3# Copyright 2018 Espressif Systems (Shanghai) PTE LTD 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16# 17 18from __future__ import print_function 19 20import argparse 21import json 22import os 23import sys 24import textwrap 25import time 26from builtins import input as binput 27from getpass import getpass 28 29try: 30 import prov 31 import security 32 import transport 33 34except ImportError: 35 idf_path = os.environ['IDF_PATH'] 36 sys.path.insert(0, idf_path + '/components/protocomm/python') 37 sys.path.insert(1, idf_path + '/tools/esp_prov') 38 39 import prov 40 import security 41 import transport 42 43# Set this to true to allow exceptions to be thrown 44config_throw_except = False 45 46 47def on_except(err): 48 if config_throw_except: 49 raise RuntimeError(err) 50 else: 51 print(err) 52 53 54def get_security(secver, pop='', verbose=False): 55 if secver == 1: 56 return security.Security1(pop, verbose) 57 elif secver == 0: 58 return security.Security0(verbose) 59 return None 60 61 62def get_transport(sel_transport, service_name): 63 try: 64 tp = None 65 if (sel_transport == 'softap'): 66 if service_name is None: 67 service_name = '192.168.4.1:80' 68 tp = transport.Transport_HTTP(service_name) 69 elif (sel_transport == 'ble'): 70 if service_name is None: 71 raise RuntimeError('"--service_name" must be specified for ble transport') 72 # BLE client is now capable of automatically figuring out 73 # the primary service from the advertisement data and the 74 # characteristics corresponding to each endpoint. 75 # Below, the service_uuid field and 16bit UUIDs in the nu_lookup 76 # table are provided only to support devices running older firmware, 77 # in which case, the automated discovery will fail and the client 78 # will fallback to using the provided UUIDs instead 79 nu_lookup = {'prov-session': 'ff51', 'prov-config': 'ff52', 'proto-ver': 'ff53'} 80 tp = transport.Transport_BLE(devname=service_name, 81 service_uuid='021a9004-0382-4aea-bff4-6b3f1c5adfb4', 82 nu_lookup=nu_lookup) 83 elif (sel_transport == 'console'): 84 tp = transport.Transport_Console() 85 return tp 86 except RuntimeError as e: 87 on_except(e) 88 return None 89 90 91def version_match(tp, protover, verbose=False): 92 try: 93 response = tp.send_data('proto-ver', protover) 94 95 if verbose: 96 print('proto-ver response : ', response) 97 98 # First assume this to be a simple version string 99 if response.lower() == protover.lower(): 100 return True 101 102 try: 103 # Else interpret this as JSON structure containing 104 # information with versions and capabilities of both 105 # provisioning service and application 106 info = json.loads(response) 107 if info['prov']['ver'].lower() == protover.lower(): 108 return True 109 110 except ValueError: 111 # If decoding as JSON fails, it means that capabilities 112 # are not supported 113 return False 114 115 except Exception as e: 116 on_except(e) 117 return None 118 119 120def has_capability(tp, capability='none', verbose=False): 121 # Note : default value of `capability` argument cannot be empty string 122 # because protocomm_httpd expects non zero content lengths 123 try: 124 response = tp.send_data('proto-ver', capability) 125 126 if verbose: 127 print('proto-ver response : ', response) 128 129 try: 130 # Interpret this as JSON structure containing 131 # information with versions and capabilities of both 132 # provisioning service and application 133 info = json.loads(response) 134 supported_capabilities = info['prov']['cap'] 135 if capability.lower() == 'none': 136 # No specific capability to check, but capabilities 137 # feature is present so return True 138 return True 139 elif capability in supported_capabilities: 140 return True 141 return False 142 143 except ValueError: 144 # If decoding as JSON fails, it means that capabilities 145 # are not supported 146 return False 147 148 except RuntimeError as e: 149 on_except(e) 150 151 return False 152 153 154def get_version(tp): 155 response = None 156 try: 157 response = tp.send_data('proto-ver', '---') 158 except RuntimeError as e: 159 on_except(e) 160 response = '' 161 return response 162 163 164def establish_session(tp, sec): 165 try: 166 response = None 167 while True: 168 request = sec.security_session(response) 169 if request is None: 170 break 171 response = tp.send_data('prov-session', request) 172 if (response is None): 173 return False 174 return True 175 except RuntimeError as e: 176 on_except(e) 177 return None 178 179 180def custom_config(tp, sec, custom_info, custom_ver): 181 try: 182 message = prov.custom_config_request(sec, custom_info, custom_ver) 183 response = tp.send_data('custom-config', message) 184 return (prov.custom_config_response(sec, response) == 0) 185 except RuntimeError as e: 186 on_except(e) 187 return None 188 189 190def custom_data(tp, sec, custom_data): 191 try: 192 message = prov.custom_data_request(sec, custom_data) 193 response = tp.send_data('custom-data', message) 194 return (prov.custom_data_response(sec, response) == 0) 195 except RuntimeError as e: 196 on_except(e) 197 return None 198 199 200def scan_wifi_APs(sel_transport, tp, sec): 201 APs = [] 202 group_channels = 0 203 readlen = 100 204 if sel_transport == 'softap': 205 # In case of softAP we must perform the scan on individual channels, one by one, 206 # so that the Wi-Fi controller gets ample time to send out beacons (necessary to 207 # maintain connectivity with authenticated stations. As scanning one channel at a 208 # time will be slow, we can group more than one channels to be scanned in quick 209 # succession, hence speeding up the scan process. Though if too many channels are 210 # present in a group, the controller may again miss out on sending beacons. Hence, 211 # the application must should use an optimum value. The following value usually 212 # works out in most cases 213 group_channels = 5 214 elif sel_transport == 'ble': 215 # Read at most 4 entries at a time. This is because if we are using BLE transport 216 # then the response packet size should not exceed the present limit of 256 bytes of 217 # characteristic value imposed by protocomm_ble. This limit may be removed in the 218 # future 219 readlen = 4 220 try: 221 message = prov.scan_start_request(sec, blocking=True, group_channels=group_channels) 222 start_time = time.time() 223 response = tp.send_data('prov-scan', message) 224 stop_time = time.time() 225 print('++++ Scan process executed in ' + str(stop_time - start_time) + ' sec') 226 prov.scan_start_response(sec, response) 227 228 message = prov.scan_status_request(sec) 229 response = tp.send_data('prov-scan', message) 230 result = prov.scan_status_response(sec, response) 231 print('++++ Scan results : ' + str(result['count'])) 232 if result['count'] != 0: 233 index = 0 234 remaining = result['count'] 235 while remaining: 236 count = [remaining, readlen][remaining > readlen] 237 message = prov.scan_result_request(sec, index, count) 238 response = tp.send_data('prov-scan', message) 239 APs += prov.scan_result_response(sec, response) 240 remaining -= count 241 index += count 242 243 except RuntimeError as e: 244 on_except(e) 245 return None 246 247 return APs 248 249 250def send_wifi_config(tp, sec, ssid, passphrase): 251 try: 252 message = prov.config_set_config_request(sec, ssid, passphrase) 253 response = tp.send_data('prov-config', message) 254 return (prov.config_set_config_response(sec, response) == 0) 255 except RuntimeError as e: 256 on_except(e) 257 return None 258 259 260def apply_wifi_config(tp, sec): 261 try: 262 message = prov.config_apply_config_request(sec) 263 response = tp.send_data('prov-config', message) 264 return (prov.config_apply_config_response(sec, response) == 0) 265 except RuntimeError as e: 266 on_except(e) 267 return None 268 269 270def get_wifi_config(tp, sec): 271 try: 272 message = prov.config_get_status_request(sec) 273 response = tp.send_data('prov-config', message) 274 return prov.config_get_status_response(sec, response) 275 except RuntimeError as e: 276 on_except(e) 277 return None 278 279 280def wait_wifi_connected(tp, sec): 281 """ 282 Wait for provisioning to report Wi-Fi is connected 283 284 Returns True if Wi-Fi connection succeeded, False if connection consistently failed 285 """ 286 TIME_PER_POLL = 5 287 retry = 3 288 289 while True: 290 time.sleep(TIME_PER_POLL) 291 print('\n==== Wi-Fi connection state ====') 292 ret = get_wifi_config(tp, sec) 293 if ret == 'connecting': 294 continue 295 elif ret == 'connected': 296 print('==== Provisioning was successful ====') 297 return True 298 elif retry > 0: 299 retry -= 1 300 print('Waiting to poll status again (status %s, %d tries left)...' % (ret, retry)) 301 else: 302 print('---- Provisioning failed ----') 303 return False 304 305 306def desc_format(*args): 307 desc = '' 308 for arg in args: 309 desc += textwrap.fill(replace_whitespace=False, text=arg) + '\n' 310 return desc 311 312 313if __name__ == '__main__': 314 parser = argparse.ArgumentParser(description=desc_format( 315 'ESP Provisioning tool for configuring devices ' 316 'running protocomm based provisioning service.', 317 'See esp-idf/examples/provisioning for sample applications'), 318 formatter_class=argparse.RawTextHelpFormatter) 319 320 parser.add_argument('--transport', required=True, dest='mode', type=str, 321 help=desc_format( 322 'Mode of transport over which provisioning is to be performed.', 323 'This should be one of "softap", "ble" or "console"')) 324 325 parser.add_argument('--service_name', dest='name', type=str, 326 help=desc_format( 327 'This specifies the name of the provisioning service to connect to, ' 328 'depending upon the mode of transport :', 329 '\t- transport "ble" : The BLE Device Name', 330 '\t- transport "softap" : HTTP Server hostname or IP', 331 '\t (default "192.168.4.1:80")')) 332 333 parser.add_argument('--proto_ver', dest='version', type=str, default='', 334 help=desc_format( 335 'This checks the protocol version of the provisioning service running ' 336 'on the device before initiating Wi-Fi configuration')) 337 338 parser.add_argument('--sec_ver', dest='secver', type=int, default=None, 339 help=desc_format( 340 'Protocomm security scheme used by the provisioning service for secure ' 341 'session establishment. Accepted values are :', 342 '\t- 0 : No security', 343 '\t- 1 : X25519 key exchange + AES-CTR encryption', 344 '\t + Authentication using Proof of Possession (PoP)', 345 'In case device side application uses IDF\'s provisioning manager, ' 346 'the compatible security version is automatically determined from ' 347 'capabilities retrieved via the version endpoint')) 348 349 parser.add_argument('--pop', dest='pop', type=str, default='', 350 help=desc_format( 351 'This specifies the Proof of possession (PoP) when security scheme 1 ' 352 'is used')) 353 354 parser.add_argument('--ssid', dest='ssid', type=str, default='', 355 help=desc_format( 356 'This configures the device to use SSID of the Wi-Fi network to which ' 357 'we would like it to connect to permanently, once provisioning is complete. ' 358 'If Wi-Fi scanning is supported by the provisioning service, this need not ' 359 'be specified')) 360 361 parser.add_argument('--passphrase', dest='passphrase', type=str, default='', 362 help=desc_format( 363 'This configures the device to use Passphrase for the Wi-Fi network to which ' 364 'we would like it to connect to permanently, once provisioning is complete. ' 365 'If Wi-Fi scanning is supported by the provisioning service, this need not ' 366 'be specified')) 367 368 parser.add_argument('--custom_data', dest='custom_data', type=str, default='', 369 help=desc_format( 370 'This is an optional parameter, only intended for use with ' 371 '"examples/provisioning/wifi_prov_mgr_custom_data"')) 372 373 parser.add_argument('--custom_config', action='store_true', 374 help=desc_format( 375 'This is an optional parameter, only intended for use with ' 376 '"examples/provisioning/custom_config"')) 377 parser.add_argument('--custom_info', dest='custom_info', type=str, default='<some custom info string>', 378 help=desc_format( 379 'Custom Config Info String. "--custom_config" must be specified for using this')) 380 parser.add_argument('--custom_ver', dest='custom_ver', type=int, default=2, 381 help=desc_format( 382 'Custom Config Version Number. "--custom_config" must be specified for using this')) 383 384 parser.add_argument('-v','--verbose', help='Increase output verbosity', action='store_true') 385 386 args = parser.parse_args() 387 388 obj_transport = get_transport(args.mode.lower(), args.name) 389 if obj_transport is None: 390 print('---- Failed to establish connection ----') 391 exit(1) 392 393 # If security version not specified check in capabilities 394 if args.secver is None: 395 # First check if capabilities are supported or not 396 if not has_capability(obj_transport): 397 print('Security capabilities could not be determined. Please specify "--sec_ver" explicitly') 398 print('---- Invalid Security Version ----') 399 exit(2) 400 401 # When no_sec is present, use security 0, else security 1 402 args.secver = int(not has_capability(obj_transport, 'no_sec')) 403 print('Security scheme determined to be :', args.secver) 404 405 if (args.secver != 0) and not has_capability(obj_transport, 'no_pop'): 406 if len(args.pop) == 0: 407 print('---- Proof of Possession argument not provided ----') 408 exit(2) 409 elif len(args.pop) != 0: 410 print('---- Proof of Possession will be ignored ----') 411 args.pop = '' 412 413 obj_security = get_security(args.secver, args.pop, args.verbose) 414 if obj_security is None: 415 print('---- Invalid Security Version ----') 416 exit(2) 417 418 if args.version != '': 419 print('\n==== Verifying protocol version ====') 420 if not version_match(obj_transport, args.version, args.verbose): 421 print('---- Error in protocol version matching ----') 422 exit(3) 423 print('==== Verified protocol version successfully ====') 424 425 print('\n==== Starting Session ====') 426 if not establish_session(obj_transport, obj_security): 427 print('Failed to establish session. Ensure that security scheme and proof of possession are correct') 428 print('---- Error in establishing session ----') 429 exit(4) 430 print('==== Session Established ====') 431 432 if args.custom_config: 433 print('\n==== Sending Custom config to esp32 ====') 434 if not custom_config(obj_transport, obj_security, args.custom_info, args.custom_ver): 435 print('---- Error in custom config ----') 436 exit(5) 437 print('==== Custom config sent successfully ====') 438 439 if args.custom_data != '': 440 print('\n==== Sending Custom data to esp32 ====') 441 if not custom_data(obj_transport, obj_security, args.custom_data): 442 print('---- Error in custom data ----') 443 exit(5) 444 print('==== Custom data sent successfully ====') 445 446 if args.ssid == '': 447 if not has_capability(obj_transport, 'wifi_scan'): 448 print('---- Wi-Fi Scan List is not supported by provisioning service ----') 449 print('---- Rerun esp_prov with SSID and Passphrase as argument ----') 450 exit(3) 451 452 while True: 453 print('\n==== Scanning Wi-Fi APs ====') 454 start_time = time.time() 455 APs = scan_wifi_APs(args.mode.lower(), obj_transport, obj_security) 456 end_time = time.time() 457 print('\n++++ Scan finished in ' + str(end_time - start_time) + ' sec') 458 if APs is None: 459 print('---- Error in scanning Wi-Fi APs ----') 460 exit(8) 461 462 if len(APs) == 0: 463 print('No APs found!') 464 exit(9) 465 466 print('==== Wi-Fi Scan results ====') 467 print('{0: >4} {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format( 468 'S.N.', 'SSID', 'BSSID', 'CHN', 'RSSI', 'AUTH')) 469 for i in range(len(APs)): 470 print('[{0: >2}] {1: <33} {2: <12} {3: >4} {4: <4} {5: <16}'.format( 471 i + 1, APs[i]['ssid'], APs[i]['bssid'], APs[i]['channel'], APs[i]['rssi'], APs[i]['auth'])) 472 473 while True: 474 try: 475 select = int(binput('Select AP by number (0 to rescan) : ')) 476 if select < 0 or select > len(APs): 477 raise ValueError 478 break 479 except ValueError: 480 print('Invalid input! Retry') 481 482 if select != 0: 483 break 484 485 args.ssid = APs[select - 1]['ssid'] 486 prompt_str = 'Enter passphrase for {0} : '.format(args.ssid) 487 args.passphrase = getpass(prompt_str) 488 489 print('\n==== Sending Wi-Fi credential to esp32 ====') 490 if not send_wifi_config(obj_transport, obj_security, args.ssid, args.passphrase): 491 print('---- Error in send Wi-Fi config ----') 492 exit(6) 493 print('==== Wi-Fi Credentials sent successfully ====') 494 495 print('\n==== Applying config to esp32 ====') 496 if not apply_wifi_config(obj_transport, obj_security): 497 print('---- Error in apply Wi-Fi config ----') 498 exit(7) 499 print('==== Apply config sent successfully ====') 500 501 wait_wifi_connected(obj_transport, obj_security) 502