1#!/usr/bin/env python
2#
3# Copyright 2019 Espressif Systems (Shanghai) PTE LTD
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17from __future__ import print_function
18
19import os
20import re
21import subprocess
22import threading
23import traceback
24
25try:
26    import Queue
27except ImportError:
28    import queue as Queue
29
30import ttfw_idf
31from ble import lib_ble_client
32from tiny_test_fw import Utility
33
34# When running on local machine execute the following before running this script
35# > make app bootloader
36# > make print_flash_cmd | tail -n 1 > build/download.config
37# > export TEST_FW_PATH=~/esp/esp-idf/tools/tiny-test-fw
38
39
40def bleprph_client_task(dut, dut_addr):
41    interface = 'hci0'
42    ble_devname = 'nimble-bleprph'
43    srv_uuid = '2f12'
44    write_value = b'A'
45
46    # Get BLE client module
47    ble_client_obj = lib_ble_client.BLE_Bluez_Client(iface=interface)
48
49    # Discover Bluetooth Adapter and power on
50    is_adapter_set = ble_client_obj.set_adapter()
51    if not is_adapter_set:
52        return
53
54    # Connect BLE Device
55    is_connected = ble_client_obj.connect(
56        devname=ble_devname,
57        devaddr=dut_addr)
58    if not is_connected:
59        return
60
61    # Get services of the connected device
62    services = ble_client_obj.get_services()
63    if not services:
64        ble_client_obj.disconnect()
65        return
66    # Verify service uuid exists
67    service_exists = ble_client_obj.get_service_if_exists(srv_uuid)
68    if not service_exists:
69        ble_client_obj.disconnect()
70        return
71
72    # Get characteristics of the connected device
73    ble_client_obj.get_chars()
74
75    # Read properties of characteristics (uuid, value and permissions)
76    ble_client_obj.read_chars()
77
78    # Write new value to characteristic having read and write permission
79    # and display updated value
80    write_char = ble_client_obj.write_chars(write_value)
81    if not write_char:
82        ble_client_obj.disconnect()
83        return
84
85    # Disconnect device
86    ble_client_obj.disconnect()
87
88    # Check dut responses
89    dut.expect('connection established; status=0', timeout=30)
90    dut.expect('disconnect;', timeout=30)
91
92
93class BlePrphThread(threading.Thread):
94    def __init__(self, dut, dut_addr, exceptions_queue):
95        threading.Thread.__init__(self)
96        self.dut = dut
97        self.dut_addr = dut_addr
98        self.exceptions_queue = exceptions_queue
99
100    def run(self):
101        try:
102            bleprph_client_task(self.dut, self.dut_addr)
103        except RuntimeError:
104            self.exceptions_queue.put(traceback.format_exc(), block=False)
105
106
107@ttfw_idf.idf_example_test(env_tag='Example_WIFI_BT')
108def test_example_app_ble_peripheral(env, extra_data):
109    """
110        Steps:
111            1. Discover Bluetooth Adapter and Power On
112            2. Connect BLE Device
113            3. Read Services
114            4. Read Characteristics
115            5. Write Characteristics
116    """
117    # Remove cached bluetooth devices of any previous connections
118    subprocess.check_output(['rm', '-rf', '/var/lib/bluetooth/*'])
119    subprocess.check_output(['hciconfig', 'hci0', 'reset'])
120
121    # Acquire DUT
122    dut = env.get_dut('bleprph', 'examples/bluetooth/nimble/bleprph', dut_class=ttfw_idf.ESP32DUT)
123
124    # Get binary file
125    binary_file = os.path.join(dut.app.binary_path, 'bleprph.bin')
126    bin_size = os.path.getsize(binary_file)
127    ttfw_idf.log_performance('bleprph_bin_size', '{}KB'.format(bin_size // 1024))
128
129    # Upload binary and start testing
130    Utility.console_log('Starting bleprph simple example test app')
131    dut.start_app()
132    dut.reset()
133
134    # Get device address from dut
135    dut_addr = dut.expect(re.compile(r'Device Address: ([a-fA-F0-9:]+)'), timeout=30)[0]
136
137    # Check dut responses
138    dut.expect('GAP procedure initiated: advertise;', timeout=30)
139
140    # Starting a py-client in a separate thread
141    exceptions_queue = Queue.Queue()
142    bleprph_thread_obj = BlePrphThread(dut, dut_addr, exceptions_queue)
143    bleprph_thread_obj.start()
144    bleprph_thread_obj.join()
145
146    exception_msg = None
147    while True:
148        try:
149            exception_msg = exceptions_queue.get(block=False)
150        except Queue.Empty:
151            break
152        else:
153            Utility.console_log('\n' + exception_msg)
154
155    if exception_msg:
156        raise Exception('BlePrph thread did not run successfully')
157
158
159if __name__ == '__main__':
160    test_example_app_ble_peripheral()
161