1#!/usr/bin/env python 2# 3# Copyright 2019 Espressif Systems (Shanghai) PTE LTD 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from __future__ import print_function 18 19import os 20import re 21import subprocess 22import threading 23import traceback 24 25try: 26 import Queue 27except ImportError: 28 import queue as Queue 29 30import ttfw_idf 31from ble import lib_ble_client 32from tiny_test_fw import Utility 33 34# When running on local machine execute the following before running this script 35# > make app bootloader 36# > make print_flash_cmd | tail -n 1 > build/download.config 37# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw 38 39 40def bleprph_client_task(dut, dut_addr): 41 interface = 'hci0' 42 ble_devname = 'nimble-bleprph' 43 srv_uuid = '2f12' 44 write_value = b'A' 45 46 # Get BLE client module 47 ble_client_obj = lib_ble_client.BLE_Bluez_Client(iface=interface) 48 49 # Discover Bluetooth Adapter and power on 50 is_adapter_set = ble_client_obj.set_adapter() 51 if not is_adapter_set: 52 return 53 54 # Connect BLE Device 55 is_connected = ble_client_obj.connect( 56 devname=ble_devname, 57 devaddr=dut_addr) 58 if not is_connected: 59 return 60 61 # Get services of the connected device 62 services = ble_client_obj.get_services() 63 if not services: 64 ble_client_obj.disconnect() 65 return 66 # Verify service uuid exists 67 service_exists = ble_client_obj.get_service_if_exists(srv_uuid) 68 if not service_exists: 69 ble_client_obj.disconnect() 70 return 71 72 # Get characteristics of the connected device 73 ble_client_obj.get_chars() 74 75 # Read properties of characteristics (uuid, value and permissions) 76 ble_client_obj.read_chars() 77 78 # Write new value to characteristic having read and write permission 79 # and display updated value 80 write_char = ble_client_obj.write_chars(write_value) 81 if not write_char: 82 ble_client_obj.disconnect() 83 return 84 85 # Disconnect device 86 ble_client_obj.disconnect() 87 88 # Check dut responses 89 dut.expect('connection established; status=0', timeout=30) 90 dut.expect('disconnect;', timeout=30) 91 92 93class BlePrphThread(threading.Thread): 94 def __init__(self, dut, dut_addr, exceptions_queue): 95 threading.Thread.__init__(self) 96 self.dut = dut 97 self.dut_addr = dut_addr 98 self.exceptions_queue = exceptions_queue 99 100 def run(self): 101 try: 102 bleprph_client_task(self.dut, self.dut_addr) 103 except RuntimeError: 104 self.exceptions_queue.put(traceback.format_exc(), block=False) 105 106 107@ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT') 108def test_example_app_ble_peripheral(env, extra_data): 109 """ 110 Steps: 111 1. Discover Bluetooth Adapter and Power On 112 2. Connect BLE Device 113 3. Read Services 114 4. Read Characteristics 115 5. Write Characteristics 116 """ 117 # Remove cached bluetooth devices of any previous connections 118 subprocess.check_output(['rm', '-rf', '/var/lib/bluetooth/*']) 119 subprocess.check_output(['hciconfig', 'hci0', 'reset']) 120 121 # Acquire DUT 122 dut = env.get_dut('bleprph', 'examples/bluetooth/nimble/bleprph', dut_class=ttfw_idf.ESP32DUT) 123 124 # Get binary file 125 binary_file = os.path.join(dut.app.binary_path, 'bleprph.bin') 126 bin_size = os.path.getsize(binary_file) 127 ttfw_idf.log_performance('bleprph_bin_size', '{}KB'.format(bin_size // 1024)) 128 129 # Upload binary and start testing 130 Utility.console_log('Starting bleprph simple example test app') 131 dut.start_app() 132 dut.reset() 133 134 # Get device address from dut 135 dut_addr = dut.expect(re.compile(r'Device Address: ([a-fA-F0-9:]+)'), timeout=30)[0] 136 137 # Check dut responses 138 dut.expect('GAP procedure initiated: advertise;', timeout=30) 139 140 # Starting a py-client in a separate thread 141 exceptions_queue = Queue.Queue() 142 bleprph_thread_obj = BlePrphThread(dut, dut_addr, exceptions_queue) 143 bleprph_thread_obj.start() 144 bleprph_thread_obj.join() 145 146 exception_msg = None 147 while True: 148 try: 149 exception_msg = exceptions_queue.get(block=False) 150 except Queue.Empty: 151 break 152 else: 153 Utility.console_log('\n' + exception_msg) 154 155 if exception_msg: 156 raise Exception('BlePrph thread did not run successfully') 157 158 159if __name__ == '__main__': 160 test_example_app_ble_peripheral() 161