1#!/usr/bin/env python
2#
3# Copyright 2019 Espressif Systems (Shanghai) PTE LTD
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17from __future__ import print_function
18
19import os
20import re
21import subprocess
22import threading
23import traceback
24
25try:
26    import Queue
27except ImportError:
28    import queue as Queue
29
30import ttfw_idf
31from ble import lib_ble_client
32from tiny_test_fw import Utility
33
34# When running on local machine execute the following before running this script
35# > make app bootloader
36# > make print_flash_cmd | tail -n 1 > build/download.config
37
38
39def blehr_client_task(hr_obj, dut, dut_addr):
40    interface = 'hci0'
41    ble_devname = 'blehr_sensor_1.0'
42    hr_srv_uuid = '180d'
43    hr_char_uuid = '2a37'
44
45    # Get BLE client module
46    ble_client_obj = lib_ble_client.BLE_Bluez_Client(iface=interface)
47
48    # Discover Bluetooth Adapter and power on
49    is_adapter_set = ble_client_obj.set_adapter()
50    if not is_adapter_set:
51        return
52
53    # Connect BLE Device
54    is_connected = ble_client_obj.connect(
55        devname=ble_devname,
56        devaddr=dut_addr)
57    if not is_connected:
58        return
59
60    # Get services of the connected device
61    services = ble_client_obj.get_services()
62    if not services:
63        ble_client_obj.disconnect()
64        return
65
66    # Get characteristics of the connected device
67    ble_client_obj.get_chars()
68
69    '''
70    Blehr application run:
71        Start Notifications
72        Retrieve updated value
73        Stop Notifications
74    '''
75    # Get service if exists
76    service = ble_client_obj.get_service_if_exists(hr_srv_uuid)
77    if service:
78        # Get characteristic if exists
79        char = ble_client_obj.get_char_if_exists(hr_char_uuid)
80        if not char:
81            ble_client_obj.disconnect()
82            return
83    else:
84        ble_client_obj.disconnect()
85        return
86
87    # Start Notify
88    # Read updated value
89    # Stop Notify
90    notify = ble_client_obj.start_notify(char)
91    if not notify:
92        ble_client_obj.disconnect()
93        return
94
95    # Check dut responses
96    dut.expect('subscribe event; cur_notify=1', timeout=30)
97    dut.expect('subscribe event; cur_notify=0', timeout=30)
98
99    # Call disconnect to perform cleanup operations before exiting application
100    ble_client_obj.disconnect()
101
102    # Check dut responses
103    dut.expect('disconnect;', timeout=30)
104
105
106class BleHRThread(threading.Thread):
107    def __init__(self, dut, dut_addr, exceptions_queue):
108        threading.Thread.__init__(self)
109        self.dut = dut
110        self.dut_addr = dut_addr
111        self.exceptions_queue = exceptions_queue
112
113    def run(self):
114        try:
115            blehr_client_task(self, self.dut, self.dut_addr)
116        except RuntimeError:
117            self.exceptions_queue.put(traceback.format_exc(), block=False)
118
119
120@ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT')
121def test_example_app_ble_hr(env, extra_data):
122    """
123        Steps:
124            1. Discover Bluetooth Adapter and Power On
125            2. Connect BLE Device
126            3. Start Notifications
127            4. Updated value is retrieved
128            5. Stop Notifications
129    """
130    # Remove cached bluetooth devices of any previous connections
131    subprocess.check_output(['rm', '-rf', '/var/lib/bluetooth/*'])
132    subprocess.check_output(['hciconfig', 'hci0', 'reset'])
133
134    # Acquire DUT
135    dut = env.get_dut('blehr', 'examples/bluetooth/nimble/blehr', dut_class=ttfw_idf.ESP32DUT)
136
137    # Get binary file
138    binary_file = os.path.join(dut.app.binary_path, 'blehr.bin')
139    bin_size = os.path.getsize(binary_file)
140    ttfw_idf.log_performance('blehr_bin_size', '{}KB'.format(bin_size // 1024))
141
142    # Upload binary and start testing
143    Utility.console_log('Starting blehr simple example test app')
144    dut.start_app()
145    dut.reset()
146
147    # Get device address from dut
148    dut_addr = dut.expect(re.compile(r'Device Address: ([a-fA-F0-9:]+)'), timeout=30)[0]
149    exceptions_queue = Queue.Queue()
150    # Starting a py-client in a separate thread
151    blehr_thread_obj = BleHRThread(dut, dut_addr, exceptions_queue)
152    blehr_thread_obj.start()
153    blehr_thread_obj.join()
154
155    exception_msg = None
156    while True:
157        try:
158            exception_msg = exceptions_queue.get(block=False)
159        except Queue.Empty:
160            break
161        else:
162            Utility.console_log('\n' + exception_msg)
163
164    if exception_msg:
165        raise Exception('Blehr thread did not run successfully')
166
167
168if __name__ == '__main__':
169    test_example_app_ble_hr()
170