1# -*- coding: utf-8 -*- 2# Copyright 2019 Oticon A/S 3# SPDX-License-Identifier: Apache-2.0 4 5import statistics; 6from enum import IntEnum; 7from components.utils import *; 8from components.basic_commands import *; 9from components.address import *; 10from components.addata import *; 11from components.events import *; 12 13class ScanningFilterPolicy(IntEnum): 14 FILTER_NONE = 0 # Accept all advertising packets except directed advertising packets not addressed to this device (default). 15 FILTER_WHITE_LIST = 1 # Accept only advertising packets from devices where the advertiser’s address is in the White List. 16 # Directed advertising packets which are not addressed to this device shall be ignored. 17 FILTER_ID_DIRECTED = 2 # Accept all advertising packets except directed advertising packets where the initiator's identity address does not address this device. 18 # Note: Directed advertising packets where the initiator's address is a resolvable private address that cannot be resolved are also accepted. 19 FILTER_ID_WHITE_LIST = 3 # Accept all advertising packets except: 20 # • advertising packets where the advertiser's identity address is not in the White List; and 21 # • directed advertising packets where the initiator's identity address does not address this device. 22 # Note: Directed advertising packets where the initiator's address is a resolvable private address that cannot be resolved are also accepted. 23 24class ScanType(IntEnum): 25 PASSIVE = 0 # Use PASSIVE Scanning 26 ACTIVE = 1 # Use ACTIVE Scanning 27 28class ScanFilterDuplicate(IntEnum): 29 DISABLE = 0 # Don't filter duplicate Advertisers 30 ENABLE = 1 # Do filter duplicate Advertisers 31 32class Scan(IntEnum): 33 DISABLE = 0 # Disable Scanning 34 ENABLE = 1 # Enable Scanning 35 36class AdvertisingReport(IntEnum): 37 ADV_IND = 0 # Connectable undirected advertising 38 ADV_DIRECT_IND = 1 # Connectable directed advertising 39 ADV_SCAN_IND = 2 # Scannable undirected advertising 40 ADV_NONCONN_IND = 3 # Non connectable undirected advertising 41 SCAN_RSP = 4 # Scan Response 42 43class Scanner: 44 """ 45 A Scanner handles all aspects of Scanning. 46 - Set Scan parameters. 47 - Enable Scanning. 48 - Disable Scanning. 49 - Monitor Advertising reports and Scan responses. 50 - Monitor Advertising timing (for High Duty-Cycle directed Advertsing) 51 """ 52 """ 53 Constructor: 54 transport - PTTT_nwtsim object 55 idx - Number; Device identifier 56 trace - Trace object 57 scanType - ScanType enum; determining whether to perform PASSIVE or ACTIVE scanning 58 reportType - AdvertisingReport enum holding the type of advertising reports to expect 59 ownAddress - Address object with an ExtendedAddressType address (only the address type is used) 60 filterPolicy - ScannningFilterPolicy enum holding the typ of scanning filter to apply 61 expectedReports - Number; holding the number of advertising reports to expect 62 expectedResponses - Number; holding the number of advertising responses to expect 63 """ 64 def __init__(self, transport, idx, trace, scanType, reportType, ownAddress, filterPolicy=ScanningFilterPolicy.FILTER_NONE, expectedReports=20, expectedResponses=None): 65 self.transport = transport; 66 self.idx = idx; 67 self.trace = trace; 68 self.scanType = scanType; 69 self.reportType = reportType; 70 self.ownAddress = ownAddress; 71 self.filterPolicy = filterPolicy; 72 self.expectedReports = expectedReports; 73 self.expectedResponses = expectedResponses; 74 """ 75 The LE_Scan_Interval and LE_Scan_Window parameters are recommendations from the Host on how long (LE_Scan_Window) and how frequently (LE_Scan_Interval) the Controller should scan 76 (See [Vol 6] Part B, Section 4.5.3). The LE_Scan_Window parameter shall always be set to a value smaller or equal to the value set for the LE_Scan_Interval parameter. 77 If they are set to the same value scanning should be run continuously. 78 """ 79 self.scanInterval = 120; # Scan Interval = 120 x 0.625 ms = 75.0 ms 80 self.scanWindow = 120; # Scan Window = 120 x 0.625 ms = 75.0 ms 81 82 self.counts = 0 83 self.reports = 0; 84 self.directReports = 0; 85 self.responses = 0; 86 self.deltas = []; 87 88 self.reportData = []; 89 self.reportAddress = None; 90 self.responseData = []; 91 self.responseAddress = None; 92 self.targetAddress = None; 93 94 self.firstTime = 0; 95 self.lastTime = 0; 96 self.pivot = 0; 97 98 def __verifyAndShowEvent(self, expectedEvent): 99 event = get_event(self.transport, self.idx, 200); 100 self.trace.trace(7, str(event)); 101 return event.event == expectedEvent; 102 103 def __commandCompleteEvent(self): 104 return self.__verifyAndShowEvent(Events.BT_HCI_EVT_CMD_COMPLETE); 105 106 def __scan_parameters(self): 107 status = le_set_scan_parameters(self.transport, self.idx, self.scanType, self.scanInterval, self.scanWindow, self.ownAddress.type, self.filterPolicy, 200); 108 self.trace.trace(6, "LE Set Scan Parameters Command returns status: 0x%02X" % status); 109 return self.__commandCompleteEvent() and (status == 0); 110 111 def __scan_enable(self, enable): 112 status = le_set_scan_enable(self.transport, self.idx, enable, ScanFilterDuplicate.DISABLE, 200); 113 self.trace.trace(6, "LE Set Scan Enable Command (%s) returns status: 0x%02X" % ("Enabling" if enable else "Disabling", status)); 114 while not self.__commandCompleteEvent(): 115 pass; 116 return status == 0; 117 118 def clear(self): 119 flush_events(self.transport, self.idx, 200); 120 121 """ 122 Enable scanning... 123 """ 124 def enable(self): 125 success = self.__scan_parameters(); 126 return success and self.__scan_enable(Scan.ENABLE); 127 128 """ 129 Disable scanning... 130 """ 131 def disable(self): 132 self.clear(); 133 return self.__scan_enable(Scan.DISABLE); 134 135 def __updateDeltas(self, count, thisTime, prevTime): 136 if count > 1: 137 self.deltas += [thisTime - prevTime]; 138 else: 139 self.firstTime = thisTime; 140 141 def __handleReport(self, prevTime): 142 143 for event in get_event(self.transport, self.idx, 200, True): 144 145 if event.subEvent == MetaEvents.BT_HCI_EVT_LE_ADVERTISING_REPORT: 146 147 eventType, address, data = event.decode()[0:3]; 148 if eventType == self.reportType: 149 self.reports += 1; 150 self.reportData = data[:]; 151 self.reportAddress = address; 152 self.__updateDeltas(self.reports, event.time, prevTime); 153 prevTime = event.time; 154 elif eventType == AdvertisingReport.SCAN_RSP: 155 self.responses += 1; 156 self.responseData = data[:]; 157 self.responseAddress = address; 158 159 elif event.subEvent == MetaEvents.BT_HCI_EVT_LE_DIRECT_ADV_REPORT: 160 161 eventType, address, targetAddress = event.decode()[0:3]; 162 if eventType == self.reportType: 163 self.directReports += 1; 164 self.reportData = []; 165 self.reportAddress = address; 166 self.targetAddress = targetAddress; 167 self.__updateDeltas(self.directReports, event.time, prevTime); 168 prevTime = event.time; 169 170 return prevTime; 171 172 def __monitorReports(self): 173 174 prevTime = 0; 175 while max(self.reports, self.directReports, self.counts/2) < self.expectedReports: 176 177 if has_event(self.transport, self.idx, 200)[0]: 178 prevTime = self.__handleReport(prevTime); 179 else: 180 if self.lastTime == 0: 181 self.lastTime = prevTime; 182 self.counts += 1; 183 184 def __monitorResponses(self): 185 186 prevTime = 0; 187 while (max(self.reports, self.directReports, self.counts/2) < self.expectedReports) or \ 188 (max(self.responses, self.reports/5, self.counts) < self.expectedResponses): 189 190 if has_event(self.transport, self.idx, 200)[0]: 191 prevTime = self.__handleReport(prevTime); 192 else: 193 if self.lastTime == 0: 194 self.lastTime = prevTime; 195 self.counts += 1; 196 197 def __monitorReportTime(self): 198 """ 199 Advertising with connectable high duty cycle directed advertising packages (ADV_DIRECT_IND, high duty cycle) is time limited. 200 Advertising should stop after approx. 1280 ms. 201 When Advertsing stops a LE Connection Complete Event with status := 0x3C is generated on the Advertiser side. 202 Status 0x3C means 'directed advertising timeout'. 203 """ 204 prevTime = 0; 205 while self.lastTime == 0: 206 207 if has_event(self.transport, self.idx, 200)[0]: 208 prevTime = self.__handleReport(prevTime); 209 else: 210 self.lastTime = prevTime; 211 212 """ 213 Monitor advertising reports / responses 214 """ 215 def monitor(self, timeBased=None): 216 self.deltas = []; 217 self.responses = 0; 218 self.reports = 0; 219 self.counts = 0; 220 221 self.firstTime = 0; 222 self.lastTime = 0; 223 224 if not timeBased is None: 225 self.__monitorReportTime(); 226 elif self.expectedResponses is None: 227 self.__monitorReports(); 228 else: 229 self.__monitorResponses(); 230 231 self.clear(); 232 233 """ 234 Qualify advertising reports received; count, from address and content 235 """ 236 def qualifyReports(self, count, address=None, data=None): 237 if self.reports > 0: 238 self.trace.trace(7, "Received %d %s Advertise reports." % (self.reports, self.reportType.name) ); 239 if (self.reports > 1): 240 self.trace.trace(7, "Advertise Events spacing in range [%d, %d] ms. with mean value %d ms. and std. deviation %.1f ms." % \ 241 (min(self.deltas), max(self.deltas), statistics.mean(self.deltas), statistics.pstdev(self.deltas))); 242 success = True; 243 if not address is None: 244 self.trace.trace(5, "Reported address %s / Expected address %s" % (str(self.reportAddress), str(address))); 245 success = success and (self.reportAddress == address); 246 if not data is None: 247 success = success and (self.reportData == data); 248 if not self.reportData == data: 249 self.trace.trace(5, "Reported data: %s / Expected data: %s" % (self.reportData, data)); 250 self.clear(); 251 else: 252 self.trace.trace(7, "Received no %s Advertise reports." % self.reportType.name); 253 success = data is None; 254 return success and (self.reports >= count); 255 256 """ 257 Qualify directed advertising reports received; count, from address and content 258 """ 259 def qualifyDirectedReports(self, count, address=None, directAddress=None): 260 if self.directReports > 0: 261 self.trace.trace(7, "Received %d %s directed Advertise reports." % (self.directReports, self.reportType.name) ); 262 if (self.directReports > 1): 263 self.trace.trace(7, "Mean distance between directed Advertise Events %d ms., std. deviation %.1f ms." % \ 264 (statistics.mean(self.deltas), statistics.pstdev(self.deltas))); 265 success = True 266 if not address is None: 267 self.trace.trace(5, "Reported address %s / Expected address %s" % (str(self.reportAddress), str(address))); 268 success = success and (self.reportAddress == address); 269 if not directAddress is None: 270 self.trace.trace(5, "Reported direct address %s / Expected direct address %s" % (str(self.reportAddress), str(directAddress))); 271 success = success and (self.targetAddress == directAddress); 272 self.clear(); 273 else: 274 self.trace.trace(7, "Received no %s directed Advertise reports." % self.reportType.name); 275 success = address is None and directAddress is None; 276 return success and (self.directReports >= count); 277 278 """ 279 Qualify advertising responses received; count and content 280 """ 281 def qualifyResponses(self, count, data=None): 282 if self.responses > 0: 283 self.trace.trace(7, "Received %d SCAN_RSP Advertise reports." % self.responses ); 284 success = True if data is None else (self.responseData == data); 285 if not success: 286 self.trace.trace(5, "Data MisMatch:"); 287 self.trace.trace(5, self.responseData); 288 self.trace.trace(5, data); 289 self.clear(); 290 else: 291 self.trace.trace(7, "Received no SCAN_RSP Advertise reports."); 292 success = data is None; 293 return success and (self.responses >= count); 294 295 """ 296 Qualify the distribution of advertising reports over time... 297 """ 298 def qualifyReportTime(self, count, time): 299 if self.reports > 0: 300 self.trace.trace(7, "Received %d %s Advertise reports." % (self.reports, self.reportType.name) ); 301 if (self.reports > 1): 302 self.trace.trace(7, "Mean distance between Advertise Events %d ms., std. deviation %.1f ms." % \ 303 (statistics.mean(self.deltas), statistics.pstdev(self.deltas))); 304 self.trace.trace(7, "Advertising stopped after %d ms." % (self.lastTime - self.firstTime) ); 305 success = time >= (self.lastTime - self.firstTime); 306 self.clear(); 307 else: 308 self.trace.trace(7, "Received no %s Advertise reports." % self.reportType.name); 309 success = True; 310 return success and (self.reports >= count); 311 312 def discover(self, time, flags=None): 313 devices = {}; 314 adData = ADData(); 315 success = self.enable(); 316 if success: 317 prevTime = deltaTime = 0; 318 while deltaTime < time: 319 if has_event(self.transport, self.idx, 200)[0]: 320 event = get_event(self.transport, self.idx, 200); 321 322 if event.subEvent == MetaEvents.BT_HCI_EVT_LE_ADVERTISING_REPORT: 323 eventType, address, data, rssi = event.decode(); 324 addressNo = toNumber(address.address); 325 """ 326 eventType - can be any one of the following: 327 ADV_IND - Connectable undirected advertising event { AdvA, AdvData } 328 ADV_DIRECT_IND - Connectable directed advertising event { AdvA, TargetA } 329 ADV_SCAN_IND - Scannable undirected advertising event { AdvA, AdvData } 330 ADV_NONCONN_IND - Non connectable undirected advertising event { AdvA, AdvData } 331 SCAN_RSP - Scan response event { AdvA, ScanRspData } 332 """ 333 if prevTime > 0: 334 deltaTime += event.time - prevTime; 335 336 if not (eventType == AdvertisingReport.ADV_DIRECT_IND): 337 elements = adData.decode(data); 338 339 if not (addressNo in devices): 340 # if (eventType == AdvertisingReport.SCAN_RSP) or \ 341 if (flags is None) or ((ADType.FLAGS in elements) and (elements[ADType.FLAGS] & flags)): 342 devices[addressNo] = { "type": address.type, "rssi": rssi, "name": '?' }; 343 344 if addressNo in devices: 345 if (eventType == AdvertisingReport.SCAN_RSP): 346 devices[addressNo]["resp"] = data; 347 else: 348 devices[addressNo]["data"] = data; 349 if ADType.COMPLETE_LOCAL_NAME in elements: 350 devices[addressNo]["name"] = elements[ADType.COMPLETE_LOCAL_NAME]; 351 elif ADType.SHORTENED_LOCAL_NAME in elements: 352 devices[addressNo]["name"] = elements[ADType.SHORTENED_LOCAL_NAME]; 353 else: 354 if not (addressNo in devices): 355 if flags is None: 356 devices[addressNo] = { "type": address.type, "rssi": rssi, "name": '?' }; 357 358 if eventType == AdvertisingReport.SCAN_RSP: 359 self.responses += 1; 360 else: 361 self.reports += 1; 362 363 prevTime = event.time; 364 else: 365 deltaTime += 100; 366 367 success = self.disable(); 368 369 return success and (len(devices) > 0), devices; 370