1#!/usr/bin/env python 2# 3# Copyright 2019 Espressif Systems (Shanghai) PTE LTD 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from __future__ import print_function 18 19import os 20import re 21import subprocess 22import threading 23import traceback 24 25try: 26 import Queue 27except ImportError: 28 import queue as Queue 29 30import ttfw_idf 31from ble import lib_ble_client 32from tiny_test_fw import Utility 33 34# When running on local machine execute the following before running this script 35# > make app bootloader 36# > make print_flash_cmd | tail -n 1 > build/download.config 37 38 39def blehr_client_task(hr_obj, dut, dut_addr): 40 interface = 'hci0' 41 ble_devname = 'blehr_sensor_1.0' 42 hr_srv_uuid = '180d' 43 hr_char_uuid = '2a37' 44 45 # Get BLE client module 46 ble_client_obj = lib_ble_client.BLE_Bluez_Client(iface=interface) 47 48 # Discover Bluetooth Adapter and power on 49 is_adapter_set = ble_client_obj.set_adapter() 50 if not is_adapter_set: 51 return 52 53 # Connect BLE Device 54 is_connected = ble_client_obj.connect( 55 devname=ble_devname, 56 devaddr=dut_addr) 57 if not is_connected: 58 return 59 60 # Get services of the connected device 61 services = ble_client_obj.get_services() 62 if not services: 63 ble_client_obj.disconnect() 64 return 65 66 # Get characteristics of the connected device 67 ble_client_obj.get_chars() 68 69 ''' 70 Blehr application run: 71 Start Notifications 72 Retrieve updated value 73 Stop Notifications 74 ''' 75 # Get service if exists 76 service = ble_client_obj.get_service_if_exists(hr_srv_uuid) 77 if service: 78 # Get characteristic if exists 79 char = ble_client_obj.get_char_if_exists(hr_char_uuid) 80 if not char: 81 ble_client_obj.disconnect() 82 return 83 else: 84 ble_client_obj.disconnect() 85 return 86 87 # Start Notify 88 # Read updated value 89 # Stop Notify 90 notify = ble_client_obj.start_notify(char) 91 if not notify: 92 ble_client_obj.disconnect() 93 return 94 95 # Check dut responses 96 dut.expect('subscribe event; cur_notify=1', timeout=30) 97 dut.expect('subscribe event; cur_notify=0', timeout=30) 98 99 # Call disconnect to perform cleanup operations before exiting application 100 ble_client_obj.disconnect() 101 102 # Check dut responses 103 dut.expect('disconnect;', timeout=30) 104 105 106class BleHRThread(threading.Thread): 107 def __init__(self, dut, dut_addr, exceptions_queue): 108 threading.Thread.__init__(self) 109 self.dut = dut 110 self.dut_addr = dut_addr 111 self.exceptions_queue = exceptions_queue 112 113 def run(self): 114 try: 115 blehr_client_task(self, self.dut, self.dut_addr) 116 except RuntimeError: 117 self.exceptions_queue.put(traceback.format_exc(), block=False) 118 119 120@ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT') 121def test_example_app_ble_hr(env, extra_data): 122 """ 123 Steps: 124 1. Discover Bluetooth Adapter and Power On 125 2. Connect BLE Device 126 3. Start Notifications 127 4. Updated value is retrieved 128 5. Stop Notifications 129 """ 130 # Remove cached bluetooth devices of any previous connections 131 subprocess.check_output(['rm', '-rf', '/var/lib/bluetooth/*']) 132 subprocess.check_output(['hciconfig', 'hci0', 'reset']) 133 134 # Acquire DUT 135 dut = env.get_dut('blehr', 'examples/bluetooth/nimble/blehr', dut_class=ttfw_idf.ESP32DUT) 136 137 # Get binary file 138 binary_file = os.path.join(dut.app.binary_path, 'blehr.bin') 139 bin_size = os.path.getsize(binary_file) 140 ttfw_idf.log_performance('blehr_bin_size', '{}KB'.format(bin_size // 1024)) 141 142 # Upload binary and start testing 143 Utility.console_log('Starting blehr simple example test app') 144 dut.start_app() 145 dut.reset() 146 147 # Get device address from dut 148 dut_addr = dut.expect(re.compile(r'Device Address: ([a-fA-F0-9:]+)'), timeout=30)[0] 149 exceptions_queue = Queue.Queue() 150 # Starting a py-client in a separate thread 151 blehr_thread_obj = BleHRThread(dut, dut_addr, exceptions_queue) 152 blehr_thread_obj.start() 153 blehr_thread_obj.join() 154 155 exception_msg = None 156 while True: 157 try: 158 exception_msg = exceptions_queue.get(block=False) 159 except Queue.Empty: 160 break 161 else: 162 Utility.console_log('\n' + exception_msg) 163 164 if exception_msg: 165 raise Exception('Blehr thread did not run successfully') 166 167 168if __name__ == '__main__': 169 test_example_app_ble_hr() 170