1# -*- coding: utf-8 -*-
2# Copyright 2019 Oticon A/S
3# SPDX-License-Identifier: Apache-2.0
4
5import statistics;
6from enum import IntEnum;
7from components.utils import *;
8from components.basic_commands import *;
9from .ll_verification import *;
10from components.test_spec import TestSpec;
11
12"""
13    ll_multiple_connections
14"""
15def ll_multiple_connections(transport, trace):
16    trace.trace(3, "ll_multiple_connections");
17
18    try:
19        """
20            Scan interval should be three times the average Advertise interval. Scan window should be the maximum possible.
21        """
22        ownAddress = Address( ExtendedAddressType.PUBLIC );
23        rawPeerAddress = 0x456789ABCDEF
24        peerAddress = Address( SimpleAddressType.PUBLIC, rawPeerAddress );
25        advertiser = Advertiser(transport, 0, trace, AdvertiseChannel.ALL_CHANNELS, Advertising.CONNECTABLE_UNDIRECTED, ownAddress, peerAddress, AdvertisingFilterPolicy.FILTER_NONE);
26        advertiser.responseData = [ 0x04, 0x09 ] + [ ord(char) for char in "IUT" ];
27
28        ownAddress = Address( ExtendedAddressType.PUBLIC );
29        scanner1 = Scanner(transport, 1, trace, ScanType.PASSIVE, AdvertisingReport.ADV_IND, ownAddress, ScanningFilterPolicy.FILTER_NONE, 1);
30        ownAddress = Address( ExtendedAddressType.PUBLIC );
31        scanner2 = Scanner(transport, 2, trace, ScanType.PASSIVE, AdvertisingReport.ADV_IND, ownAddress, ScanningFilterPolicy.FILTER_NONE, 1);
32
33        trace.trace(1, '-'*80);
34
35        success = advertiser.enable();
36
37        trace.trace(1, "\nUsing initiator address: %s\n" % formatAddress( toArray(rawPeerAddress, 6), SimpleAddressType.PUBLIC));
38        success = success and preamble_set_public_address( transport, 1, rawPeerAddress, trace );
39        success = success and preamble_set_public_address( transport, 2, rawPeerAddress, trace );
40
41        success = success and scanner1.enable();
42        scanner1.monitor();
43        success = success and scanner1.disable();
44        success = success and scanner1.qualifyReports( 1 );
45
46        initiatorAddress = Address( ExtendedAddressType.PUBLIC );
47        initiator1 = Initiator(transport, 1, 0, trace, initiatorAddress, Address( ExtendedAddressType.PUBLIC, 0x123456789ABC ));
48        connected = initiator1.connect();
49        success = success and connected;
50
51        initiatorAddress = Address( ExtendedAddressType.PUBLIC );
52        initiator2 = Initiator(transport, 2, 0, trace, initiatorAddress, Address( ExtendedAddressType.PUBLIC, 0x123456789ABC ));
53
54        if connected:
55            print("\nStarting peripheral advertising...");
56            success = advertiser.enable();
57
58            print("\nStarting central 2 scanning...");
59            success = success and scanner2.enable();
60            scanner2.monitor();
61            success = success and scanner2.disable();
62            success = success and scanner2.qualifyReports( 1 );
63
64            print("\nStarting central 2 connection...");
65            connected = initiator2.connect();
66            success = success and connected;
67
68            print("\nDisconnecting...");
69            disconnected = initiator1.disconnect(0x3E);
70            success = success and disconnected;
71            disconnected = initiator2.disconnect(0x3E);
72            success = success and disconnected;
73
74    except Exception as e:
75        trace.trace(1, "Connection Request test failed: %s" % str(e));
76        success = False;
77
78    trace.trace(3, "Connection Request test " + ("PASSED" if success else "FAILED"));
79    return success
80
81
82_spec = {};
83_spec["Echo_test"] = \
84    TestSpec(name = "Multiple Connections Test Suite",
85             number_devices = 3,
86             description = "Verify ability to have multiple BLE connections.");
87
88"""
89    Return the test spec which contains info about all the tests
90    this test module provides
91"""
92def get_tests_specs():
93    return _spec;
94
95"""
96    Run the command...
97"""
98def main(transport, trace):
99    success = True
100    print(("preamble Standby Peripheral "    + ("PASS" if preamble_standby(transport, 0, trace) else "FAIL")));
101    print(("preamble Standby Central 1 " + ("PASS" if preamble_standby(transport, 1, trace) else "FAIL")));
102    print(("preamble Standby Central 2 " + ("PASS" if preamble_standby(transport, 2, trace) else "FAIL")));
103    print(("preamble Device Address Set Peripheral "    + ("PASS" if preamble_device_address_set(transport, 0, trace) else "FAIL")));
104    print(("preamble Device Address Set Central 1 " + ("PASS" if preamble_device_address_set(transport, 1, trace) else "FAIL")));
105    print(("preamble Device Address Set Central 2 " + ("PASS" if preamble_device_address_set(transport, 2, trace) else "FAIL")));
106    print();
107    success = ll_multiple_connections(transport, trace);
108    return (0 if success else 1)
109
110"""
111    Run a test given its test_spec
112"""
113def run_a_test(args, transport, trace, test_spec):
114    return main(transport, trace);
115