1# -*- coding: utf-8 -*- 2# Copyright 2021 Oticon A/S 3# SPDX-License-Identifier: Apache-2.0 4import io 5import math 6import sys 7from collections import namedtuple 8from enum import IntEnum 9 10 11class DeviceDumpFile: 12 def __init__(self, idx, file_path): 13 self.idx = idx 14 self.file_path = file_path 15 self.f = None 16 self.skip_first_line = True 17 18 def __str__(self): 19 return self.file_path 20 21 def open(self): 22 self.f = io.open(self.file_path, 'r') 23 24 def close(self): 25 self.f.close() 26 27 def decode(self, line): 28 raise NotImplementedError 29 30 def fetch(self, cnt=-1): 31 n = 0 32 self.skip_first_line = True 33 while True: 34 offset = self.f.tell() 35 line = self.f.readline() 36 if line == '': 37 # EOF read, return what was collected 38 yield None 39 elif not line.endswith('\n'): 40 # Partial line read, rewind and return what was collected 41 self.f.seek(offset, io.SEEK_SET) 42 yield None 43 else: 44 # Full line read, continue unless a specific number of lines have been collected 45 if self.skip_first_line: 46 self.skip_first_line = False 47 continue 48 49 yield self.decode(line.strip()) 50 n += 1 51 52 if n == cnt: 53 return 54 55 56class DeviceDumpFileTx(DeviceDumpFile): 57 Tx = namedtuple('Tx', 'idx, ts, aa, freq, mod, packet') 58 59 def decode(self, line): 60 try: 61 start_time, end_time, center_freq, phy_address, modulation, power_level, abort_time, \ 62 recheck_time, packet_size, packet, *_ = line.split(',') 63 except ValueError: 64 return 65 66 start_time = int(start_time) 67 end_time = int(end_time) 68 center_freq = float(center_freq) 69 phy_address = int(phy_address, 16) 70 modulation = int(modulation) 71 power_level = float(power_level) 72 abort_time = int(abort_time) 73 recheck_time = int(recheck_time) 74 packet_size = int(packet_size) 75 packet = bytearray(int(v, 16) for v in packet.split()) 76 77 return self.Tx._make((self.idx, start_time, phy_address, center_freq, modulation, memoryview(packet))) 78 79 80class DeviceDumpFileRx(DeviceDumpFile): 81 Rx = namedtuple('Rx', 'idx, ts, aa, freq, mod, status, packet') 82 83 def decode(self, line): 84 try: 85 start_time, scan_duration, phy_address, modulation, center_freq, antenna_gain, \ 86 sync_threshold, header_threshold, pream_and_addr_duration, header_duration, \ 87 bps, abort_time, recheck_time, tx_nbr, biterrors, sync_end, header_end, \ 88 payload_end, rx_time_stamp, status, RSSI, packet_size, packet, *_ = line.split(',') 89 except ValueError: 90 return 91 92 start_time = int(start_time) 93 scan_duration = int(scan_duration) 94 phy_address = int(phy_address, 16) 95 modulation = int(modulation) 96 center_freq = float(center_freq) 97 antenna_gain = float(antenna_gain) 98 sync_threshold = int(sync_threshold) 99 header_threshold = int(header_threshold) 100 pream_and_addr_duration = int(pream_and_addr_duration) 101 header_duration = int(header_duration) 102 bps = int(bps) 103 abort_time = int(abort_time) 104 recheck_time = int(recheck_time) 105 tx_nbr = int(tx_nbr) 106 biterrors = int(biterrors) 107 sync_end = int(sync_end) 108 header_end = int(header_end) 109 payload_end = int(payload_end) 110 rx_time_stamp = int(rx_time_stamp) 111 status = int(status) 112 RSSI = float(RSSI) 113 packet_size = int(packet_size) 114 packet = bytearray(int(v, 16) for v in packet.split()) 115 116 return self.Rx._make((self.idx, rx_time_stamp, phy_address, center_freq, modulation, status, 117 memoryview(packet))) 118 119 120def unpack_bitfield(fmt, value): 121 result = [] 122 for length in fmt.split(","): 123 length = int(length, 10) 124 result.append(value & abs(1 - (1 << length))) 125 value = value >> length 126 return tuple(result) 127 128# Convert RF Channel number to Physical Channel Index (see Bluetooth Core Specification v5.3, vol 6, part B, section 1.4.1) 129def channel_num_to_index(channel_num): 130 ch_num_ch_idx = [ 131 37, 132 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 133 38, 134 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 135 39 136 ] 137 return ch_num_ch_idx[channel_num] 138 139AdvPdu = ('ADV_UNKNOWN_PDU', 'ADV_IND', 'ADV_DIRECT_IND', 'CONNECT_IND', 'ADV_EXT_UNKNOWN_PDU', 'ADV_EXT_IND', 'AUX_ADV_IND', 'AUX_SCAN_RSP', 'AUX_SYNC_IND', 'AUX_CHAIN_IND', 'AUX_CONNECT_REQ', 'AUX_CONNECT_RSP') 140LlControlPdu = ('LL_CONTROL_UNKNOWN_PDU', 'LL_TERMINATE_IND', 'LL_CIS_REQ', 'LL_CIS_RSP', 'LL_CIS_IND', 141 'LL_CIS_TERMINATE_IND') 142LlDataPdu = ('LL_DATA_PDU',) 143IsoPdu = ('ISOC_UNKNOWN_PDU', 'ISOC_UNFRAMED_PDU', 'ISOC_FRAMED_PDU') 144PacketType = IntEnum('PacketType', ','.join(AdvPdu + LlControlPdu + LlDataPdu + IsoPdu)) 145 146 147class Packet: 148 def __init__(self, direction, idx, ts, aa, channel_num, phy, data, payload_type, header, payload): 149 self.direction = direction 150 self.idx = idx 151 self.ts = ts 152 self.aa = aa 153 self.channel_num = channel_num 154 self.phy = phy 155 self.data = data 156 self.type = payload_type 157 self.header = header 158 self.payload = payload 159 160 def __len__(self): 161 return len(self.data) 162 163 def __repr__(self): 164 return f"{self.direction}, {self.type.name}, Idx:{self.idx}, StartTime:{self.ts}, " \ 165 f"AccessAddress:{hex(self.aa)}, ChannelNumber:{self.channel_num}, PHY:{self.phy}, Length:{len(self)}, " \ 166 f"{self.header}, {self.payload}" 167 168 169def adv_ind(data): 170 Payload = namedtuple('Payload', 'AdvA, AdvData') 171 adv_a = data[:6] 172 adv_data = data[6:] 173 return Payload(adv_a, adv_data) 174 175 176def adv_direct_ind(data): 177 Payload = namedtuple('Payload', 'AdvA, TargetA') 178 adv_a = data[:6] 179 target_a = data[6:12] 180 return Payload(adv_a, target_a) 181 182 183def connect_ind(payload): 184 Payload = namedtuple('Payload', 'InitA, AdvA, LLData') 185 LLData = namedtuple('LLData', 'AA, CRCInit, WinSize, WinOffset, Interval, Latency, Timeout, ChM, Hop, SCA') 186 init_a = payload[:6] 187 adv_a = payload[6:12] 188 aa = int.from_bytes(payload[12:16], 'little', signed=False) 189 crc_init = int.from_bytes(payload[16:19], 'little', signed=False) 190 win_size = payload[19] 191 win_offset = int.from_bytes(payload[20:22], 'little', signed=False) 192 interval = int.from_bytes(payload[22:24], 'little', signed=False) 193 latency = int.from_bytes(payload[24:26], 'little', signed=False) 194 timeout = int.from_bytes(payload[26:28], 'little', signed=False) 195 ch_m = int.from_bytes(payload[28:33], 'little', signed=False) 196 hop, sca = unpack_bitfield('5,3', payload[33]) 197 ll_data = LLData(aa, crc_init, win_size, win_offset, interval, latency, timeout, ch_m, hop, sca) 198 return Payload(init_a, adv_a, ll_data) 199 200 201adv_legacy_pdu_dict = { 202 0b0000: (PacketType.ADV_IND, adv_ind), 203 0b0001: (PacketType.ADV_DIRECT_IND, adv_direct_ind), 204 0b0101: (PacketType.CONNECT_IND, connect_ind), 205} 206 207 208def parse_adv_pdu(direction, idx, ts, aa, channel_num, phy, data, aux_ptr_packets): 209 Header = namedtuple('Header', 'PDU_Type, ChSel, TxAdd, RxAdd, Length') 210 pdu_type, _, ch_sel, tx_add, rx_add, payload_len = \ 211 unpack_bitfield('4,1,1,1,1,8', int.from_bytes(data[:2], 'little', signed=False)) 212 header = Header(pdu_type, ch_sel, tx_add, rx_add, payload_len) 213 data = data[2:2 + payload_len] 214 if pdu_type in adv_legacy_pdu_dict and channel_num in [0, 12, 39]: 215 payload_type, func = adv_legacy_pdu_dict[pdu_type] 216 payload = func(data) 217 else: 218 return parse_ext_adv_pdu(direction, idx, ts, aa, channel_num, phy, header, data, aux_ptr_packets); 219 return Packet(direction, idx, ts, aa, channel_num, phy, data, payload_type, header, payload) 220 221 222def parse_common_ext_adv_payload(data): 223 payload = dict() 224 extHeaderLength, AdvMode = unpack_bitfield('6,2', int.from_bytes(data[:1], 'little', signed=False)) 225 data = data[1:] 226 payload['AdvMode'] = AdvMode 227 dataPtr = 0 228 if extHeaderLength > 0: 229 extHeaderFlags = bytes(data[dataPtr:dataPtr+1])[0] 230 dataPtr += 1 231 if extHeaderFlags & 0x01: # AdvA present 232 payload['AdvA'] = int.from_bytes(data[dataPtr:dataPtr+6], 'little', signed=False) 233 dataPtr += 6 234 if extHeaderFlags & 0x02: # TargetA present 235 payload['TargetA'] = int.from_bytes(data[dataPtr:dataPtr+6], 'little', signed=False) 236 dataPtr += 6 237 if extHeaderFlags & 0x04: # CTEInfo present 238 # TODO - decode further 239 payload['CTEInfo'] = bytes(data[dataPtr:dataPtr+1])[0] 240 dataPtr += 1 241 if extHeaderFlags & 0x08: # AdvDataInfo present 242 ADI = namedtuple('ADI', 'DID, SID') 243 did, sid = unpack_bitfield('12,4', int.from_bytes(data[dataPtr:dataPtr+2], 'little', signed=False)) 244 dataPtr += 2 245 payload['ADI'] = ADI(did, sid) 246 if extHeaderFlags & 0x10: # AuxPtr present 247 chIdx, clockAcc, offsetUnits, auxOffset, auxPHY = unpack_bitfield('6,1,1,13,3', int.from_bytes(data[dataPtr:dataPtr+3], 'little', signed=False)) 248 dataPtr += 3 249 AuxPtr = namedtuple('AuxPtr', 'chIdx, CA, offsetUnits, auxOffset, auxPHY') 250 payload['AuxPtr'] = AuxPtr(chIdx, clockAcc, offsetUnits, auxOffset, auxPHY) 251 if extHeaderFlags & 0x20: # SyncInfo present 252 # TODO - decode further 253 payload['SyncInfo'] = bytes(data[dataPtr:dataPtr+18]) 254 dataPtr += 18 255 if extHeaderFlags & 0x40: 256 payload['TxPower'] = bytes(data[dataPtr:dataPtr+1])[0] 257 dataPtr += 1 258 if dataPtr < extHeaderLength: 259 payload['ACAD'] = bytes(data[dataPtr:]) 260 261 data = data[extHeaderLength:] 262 if len(data): 263 payload['AD'] = bytes(data) 264 265 return payload 266 267def parse_ext_adv_pdu(direction, idx, ts, aa, channel_num, phy, header, data, aux_ptr_packets): 268 if channel_num in [0, 12, 39]: 269 if header.PDU_Type == 0b0111: 270 payload_type = PacketType.ADV_EXT_IND 271 payload = parse_common_ext_adv_payload(data) 272 else: 273 payload_type = PacketType.ADV_UNKNOWN_PDU 274 payload = data 275 else: 276 if header.PDU_Type == 0b0011: 277 payload_type = PacketType.AUX_SCAN_REQ 278 Payload = namedtuple('Payload', 'AdvA, TargetA') 279 advA = int.from_bytes(data[:6], 'little', signed=False) 280 targetA = int.from_bytes(data[6:], 'little', signed=False) 281 payload = Payload(advA, targetA) 282 elif header.PDU_Type == 0b0101: 283 payload_type = PacketType.AUX_CONNECT_REQ 284 # Payload is the same as CONNECT_IND 285 payload = connect_ind(data) 286 elif header.PDU_Type == 0b0111: 287 payload = parse_common_ext_adv_payload(data) 288 payload['SuperiorPackets'] = find_superior_packets(ts, channel_num, phy, aux_ptr_packets) 289 payload_type = determine_ext_adv_pdu_type(payload) 290 elif header.PDU_Type == 0b1000: 291 payload_type = PacketType.AUX_CONNECT_RSP 292 payload = parse_common_ext_adv_payload(data) 293 else: 294 payload_type = PacketType.ADV_UNKNOWN_PDU 295 payload = data 296 297 return Packet(direction, idx, ts, aa, channel_num, phy, data, payload_type, header, payload) 298 299def find_superior_packets(ts, channel_num, phy, aux_ptr_packets): 300 superiorPackets = [] 301 for packet in aux_ptr_packets: 302 if aux_ptr_matches(packet.payload['AuxPtr'], packet.ts, ts, channel_num, phy): 303 superiorPackets += [packet] 304 return superiorPackets 305 306def determine_ext_adv_pdu_type(payload): 307 if payload['SuperiorPackets']: 308 if payload['SuperiorPackets'][0].type == PacketType.ADV_EXT_IND: 309 return PacketType.AUX_ADV_IND 310 else: 311 return PacketType.AUX_CHAIN_IND 312 313 # No matches - we could have missed the packet with an aux_ptr, but assume a AUX_SCAN_RSP provided it matches the known limitations 314 if payload['AdvMode'] == 0 and 'AdvA' in payload and 'TargetA' not in payload and 'CTEInfo' not in payload and 'SyncInfo' not in payload and 'AD' in payload: 315 return PacketType.AUX_SCAN_RSP 316 317 # No matches and does not look like a AUX_SCAN_RSP 318 return PacketType.ADV_EXT_UNKNOWN_PDU 319 320def aux_ptr_matches(aux_ptr, superior_packet_ts, ts, channel_num, phy): 321 ch = channel_num_to_index(channel_num) 322 if ch == aux_ptr.chIdx and ((phy == '1M' and (aux_ptr.auxPHY == 0x00 or aux_ptr.auxPHY == 0x02)) or (phy == '2M' and aux_ptr.auxPHY == 0x01)): 323 if (aux_ptr.auxOffset > 0): # AuxOffset == 0 means no auxillary packet will be transmitted 324 t_start_offset = (300 if aux_ptr.offsetUnits == 1 else 30) * aux_ptr.auxOffset 325 t_end_offset = (300 if aux_ptr.offsetUnits == 1 else 30) * (aux_ptr.auxOffset + 1) 326 # Adjust according to clock accuracy value 327 ca_adjustment = math.ceil(t_end_offset * ((50 if aux_ptr.CA == 1 else 500)/1000000)) 328 t_start = superior_packet_ts + (t_start_offset - ca_adjustment) 329 t_end = superior_packet_ts + (t_end_offset + ca_adjustment) 330 if ts >= t_start and ts <= t_end: 331 return True 332 return False 333 334def ll_terminate_ind(data): 335 CtrData = namedtuple('CtrData', 'ErrorCode') 336 return CtrData(data[0]) 337 338 339def ll_cis_req(data): 340 CtrData = namedtuple('CtrData', 'CIG_ID, CIS_ID, PHY_C_To_P, PHY_P_To_C, Max_SDU_C_To_P, Framed, Max_SDU_P_To_C,' 341 'SDU_Interval_C_To_P, SDU_Interval_P_To_C, Max_PDU_C_To_P, Max_PDU_P_To_C, NSE,' 342 'Sub_Interval, BN_C_To_P, BN_P_To_C, FT_C_To_P, FT_P_To_C, ISO_Interval,' 343 'CIS_Offset_Min, CIS_Offset_Max, connEventCount') 344 cig_id, cis_id, phy_c_to_p, phy_p_to_c = data[:4] 345 max_sdu_c_to_p, _, framed = unpack_bitfield('12,3,1', int.from_bytes(data[4:6], 'little', signed=False)) 346 max_sdu_p_to_c, _ = unpack_bitfield('12,4', int.from_bytes(data[6:8], 'little', signed=False)) 347 sdu_interval_c_to_p, _ = unpack_bitfield('20,4', int.from_bytes(data[8:11], 'little', signed=False)) 348 sdu_interval_p_to_c, _ = unpack_bitfield('20,4', int.from_bytes(data[11:14], 'little', signed=False)) 349 max_pdu_c_to_p = int.from_bytes(data[14:16], 'little', signed=False) 350 max_pdu_p_to_c = int.from_bytes(data[16:18], 'little', signed=False) 351 nse = data[19] 352 sub_interval = int.from_bytes(data[19:22], 'little', signed=False) 353 bn_c_to_p, bn_p_to_c = unpack_bitfield('4,4', data[22]) 354 ft_c_to_p, ft_p_to_c = data[23:25] 355 iso_interval = int.from_bytes(data[25:27], 'little', signed=False) 356 cis_offset_min = int.from_bytes(data[27:30], 'little', signed=False) 357 cis_offset_max = int.from_bytes(data[30:33], 'little', signed=False) 358 conn_event_count = int.from_bytes(data[33:35], 'little', signed=False) 359 return CtrData(cig_id, cis_id, phy_c_to_p, phy_p_to_c, max_sdu_c_to_p, framed, max_sdu_p_to_c, sdu_interval_c_to_p, 360 sdu_interval_p_to_c, max_pdu_c_to_p, max_pdu_p_to_c, nse, sub_interval, bn_c_to_p, bn_p_to_c, 361 ft_c_to_p, ft_p_to_c, iso_interval, cis_offset_min, cis_offset_max, conn_event_count) 362 363 364def ll_cis_rsp(data): 365 CtrData = namedtuple('CtrData', 'CIS_Offset_Min, CIS_Offset_Max, connEventCount') 366 cis_offset_min = int.from_bytes(data[:3], 'little', signed=False) 367 cis_offset_max = int.from_bytes(data[3:6], 'little', signed=False) 368 conn_event_count = int.from_bytes(data[6:8], 'little', signed=False) 369 return CtrData(cis_offset_min, cis_offset_max, conn_event_count) 370 371 372def ll_cis_ind(data): 373 CtrData = namedtuple('CtrData', 'AA, CIS_Offset, CIG_Sync_Delay, CIS_Sync_Delay, connEventCount') 374 aa = int.from_bytes(data[0:4], 'little', signed=False) 375 cis_offset = int.from_bytes(data[4:7], 'little', signed=False) 376 cig_sync_delay = int.from_bytes(data[7:10], 'little', signed=False) 377 cis_sync_delay = int.from_bytes(data[10:13], 'little', signed=False) 378 conn_event_counter = int.from_bytes(data[13:15], 'little', signed=False) 379 return CtrData(aa, cis_offset, cig_sync_delay, cis_sync_delay, conn_event_counter) 380 381 382def ll_cis_terminate_ind(data): 383 CtrData = namedtuple('CtrData', 'CIG_ID, CIS_ID, ErrorCode') 384 return CtrData(*data[:3]) 385 386 387ll_control_pdu_dict = { 388 0x02: (PacketType.LL_TERMINATE_IND, ll_terminate_ind), 389 0x1F: (PacketType.LL_CIS_REQ, ll_cis_req), 390 0x20: (PacketType.LL_CIS_RSP, ll_cis_rsp), 391 0x21: (PacketType.LL_CIS_IND, ll_cis_ind), 392 0x22: (PacketType.LL_CIS_TERMINATE_IND, ll_cis_terminate_ind), 393} 394 395 396def parse_data_pdu(direction, idx, ts, aa, channel_num, phy, data): 397 Header = namedtuple('Header', 'LLID, NESN, SN, MD, CP, Length, CTEInfo') 398 llid, nesn, sn, md, cp, rfu, payload_length = \ 399 unpack_bitfield('2,1,1,1,1,2,8', int.from_bytes(data[:2], 'little', signed=False)) 400 cte_info = data[2] if cp else None 401 header = Header(llid, nesn, sn, md, cp, payload_length, cte_info) 402 pdu_offset = 3 if cp else 2 403 data = data[pdu_offset:pdu_offset + payload_length] 404 if llid == 0b11: 405 Payload = namedtuple('Payload', 'Opcode, CtrData') 406 opcode = data[0] 407 if opcode in ll_control_pdu_dict: 408 payload_type, func = ll_control_pdu_dict[opcode] 409 payload = Payload(opcode, func(data[1:])) 410 else: 411 payload_type, payload = PacketType.LL_CONTROL_UNKNOWN_PDU, Payload(opcode, data[1:]) 412 elif llid == 0b10 or llid == 0b01: 413 payload_type = PacketType.LL_DATA_PDU 414 payload = data 415 else: 416 return None 417 return Packet(direction, idx, ts, aa, channel_num, phy, data, payload_type, header, payload) 418 419 420def parse_isoc_pdu(direction, idx, ts, aa, channel_num, phy, data): 421 Header = namedtuple('Header', 'LLID, NESN, SN, CIE, NPI, Length') 422 llid, nesn, sn, cie, rfu_1, npi, rfu_2, payload_length = \ 423 unpack_bitfield('2,1,1,1,1,1,1,8', int.from_bytes(data[:2], 'little', signed=False)) 424 header = Header(llid, nesn, sn, cie, npi, payload_length) 425 if llid == 0b00 or llid == 0b01: 426 payload_type = PacketType.ISOC_UNFRAMED_PDU 427 payload = data[2:2 + payload_length] 428 elif llid == 0b10: 429 payload_type = PacketType.ISOC_FRAMED_PDU 430 SegmentationHeader = namedtuple('SegmentationHeader', 'SC, CMPLT, RFU, Length') 431 sc, cmplt, rfu, payload_length = unpack_bitfield('1,1,6,8', int.from_bytes(data[2:4], 'little', signed=False)) 432 segmentation_header = SegmentationHeader(sc, cmplt, rfu, payload_length) 433 Payload = namedtuple('Payload', 'SegmentationHeader, Payload') 434 payload = Payload(segmentation_header, data[4:4 + payload_length]) 435 else: 436 payload_type = PacketType.ISOC_UNKNOWN_PDU 437 payload = data[2:2 + payload_length] 438 return Packet(direction, idx, ts, aa, channel_num, phy, data, payload_type, header, payload) 439 440 441class ConnectionData: 442 def __init__(self, interval): 443 self.connection_interval = interval 444 self.cis_to_aa_map = {} 445 446 447class PacketParser: 448 def __init__(self): 449 self.__conn_data_by_aa = {} 450 self.__func_by_aa = { 451 0x8E89BED6: parse_adv_pdu, 452 } 453 self.__aux_ptr_packets = []; 454 455 def __get_packet(self, direction, idx, ts, aa, channel_num, phy, packet): 456 if aa in self.__func_by_aa: 457 if self.__func_by_aa[aa] == parse_adv_pdu: 458 return self.__func_by_aa[aa](direction, idx, ts, aa, channel_num, phy, packet, self.__aux_ptr_packets) 459 return self.__func_by_aa[aa](direction, idx, ts, aa, channel_num, phy, packet) 460 return None 461 462 def parse(self, dump): 463 packet = self.__get_packet(type(dump).__name__, dump.idx, dump.ts, dump.aa, 464 self.__get_phy_channel_num(dump.freq), self.__get_phy(dump.mod), dump.packet) 465 if packet: 466 self.__run_hook(packet) 467 468 return packet 469 470 def __run_hook(self, packet): 471 if packet.type in self.__hooks__: 472 self.__hooks__[packet.type](self, packet, self.__conn_data_by_aa.get(packet.aa)) 473 474 @staticmethod 475 def __get_phy_channel_num(center_freq): 476 return int((center_freq - 2) / 2) 477 478 @staticmethod 479 def __get_phy(modulation): 480 if modulation == 16: 481 return '1M' 482 if modulation == 32 or modulation == 33: 483 return '2M' 484 return 'unknown' 485 486 def __on_ext_adv_packet(self, packet, _): 487 # TODO - handle SyncInfo packets as well 488 if 'AuxPtr' in packet.payload: 489 self.__aux_ptr_packets.append(packet) 490 491 def __on_connect_ind(self, packet, _): 492 self.__func_by_aa[packet.payload.LLData.AA] = parse_data_pdu 493 self.__conn_data_by_aa[packet.payload.LLData.AA] = ConnectionData(packet.payload.LLData.Interval) 494 495 def __on_terminate_ind(self, packet, conn_data): 496 del self.__func_by_aa[packet.aa] 497 for _, cis_aa in conn_data.cis_to_aa_map.items(): 498 del self.__func_by_aa[cis_aa] 499 del self.__conn_data_by_aa[packet.aa] 500 501 def __on_cis_req(self, packet, conn_data): 502 conn_data.pending_cis_req_ctr_data = packet.payload.CtrData 503 504 def __on_cis_ind(self, packet, conn_data): 505 cis_id = (conn_data.pending_cis_req_ctr_data.CIG_ID, conn_data.pending_cis_req_ctr_data.CIS_ID) 506 cis_aa = packet.payload.CtrData.AA 507 self.__func_by_aa[cis_aa] = parse_isoc_pdu 508 conn_data.cis_to_aa_map[cis_id] = cis_aa 509 del conn_data.pending_cis_req_ctr_data 510 511 def __on_cis_terminate_ind(self, packet, conn_data): 512 cis_id = (packet.payload.CtrData.CIG_ID, packet.payload.CtrData.CIS_ID) 513 cis_aa = conn_data.cis_to_aa_map.pop(cis_id, None) 514 if cis_aa: 515 del self.__func_by_aa[cis_aa] 516 517 __hooks__ = { 518 PacketType.ADV_EXT_IND: __on_ext_adv_packet, 519 PacketType.AUX_ADV_IND: __on_ext_adv_packet, 520 PacketType.AUX_SYNC_IND: __on_ext_adv_packet, 521 PacketType.AUX_CHAIN_IND: __on_ext_adv_packet, 522 PacketType.AUX_SCAN_RSP: __on_ext_adv_packet, 523 PacketType.CONNECT_IND: __on_connect_ind, 524 PacketType.AUX_CONNECT_REQ: __on_connect_ind, 525 PacketType.LL_TERMINATE_IND: __on_terminate_ind, 526 PacketType.LL_CIS_REQ: __on_cis_req, 527 PacketType.LL_CIS_IND: __on_cis_ind, 528 PacketType.LL_CIS_TERMINATE_IND: __on_cis_terminate_ind, 529 } 530 531 532class SortedDumps: 533 class Dump: 534 def __init__(self, file): 535 self.file = file 536 self.generator = self.file.fetch() 537 self.dump = None 538 539 def __init__(self): 540 self.dumps = [] 541 542 def __del__(self): 543 for dump in self.dumps: 544 dump.file.close() 545 546 def __open(self, file): 547 try: 548 file.open() 549 self.dumps.append(self.Dump(file)) 550 except FileNotFoundError: 551 print('{} so such file'.format(file)) 552 return 553 554 def add_tx(self, idx, file_path): 555 self.__open(DeviceDumpFileTx(idx, file_path)) 556 557 def add_rx(self, idx, file_path): 558 # TODO 559 pass 560 561 def flush(self): 562 for dump in self.dumps: 563 while next(dump.generator): 564 pass 565 566 dump.dump = None 567 568 def fetch(self): 569 while True: 570 for dump in self.dumps: 571 if not dump.dump: 572 try: 573 dump.dump = next(dump.generator) 574 except StopIteration: 575 pass 576 577 # sort by timestamp 578 self.dumps.sort(key=lambda entry: entry.dump.ts if entry.dump else sys.maxsize) 579 dump = self.dumps[0] 580 if not dump.dump: 581 return 582 583 yield dump.dump 584 dump.dump = None 585 586 587class Packets: 588 def __init__(self, dumps): 589 self.__dumps = dumps 590 self.__dumps.flush() 591 self.__parser = PacketParser() 592 self.__packets = [] 593 594 def fetch(self, packet_filter=()): 595 try: 596 iter(packet_filter) 597 except TypeError: 598 packet_filter = (packet_filter,) 599 else: 600 packet_filter = packet_filter 601 602 i = 0 603 while True: 604 # Append new packets 605 for dump in self.__dumps.fetch(): 606 self.__packets.append(self.__parser.parse(dump)) 607 608 if i < len(self.__packets): 609 if self.__packets[i] != None and (not packet_filter or self.__packets[i].type.name in packet_filter): 610 yield self.__packets[i] 611 i += 1 612 else: 613 return 614 615 def find(self, packet_type=None): 616 try: 617 return next(self.fetch(packet_type)) 618 except StopIteration: 619 return None 620 621 def findLast(self, packet_filter=()): 622 try: 623 iter(packet_filter) 624 except TypeError: 625 packet_filter = (packet_filter,) 626 else: 627 packet_filter = packet_filter 628 629 # Append new packets 630 for dump in self.__dumps.fetch(): 631 self.__packets.append(self.__parser.parse(dump)) 632 633 for i in reversed(range(len(self.__packets))): 634 if self.__packets[i] != None and (not packet_filter or self.__packets[i].type.name in packet_filter): 635 return self.__packets[i] 636 return None 637 638 def flush(self): 639 for dump in self.__dumps.fetch(): 640 pass 641 self.__parser = PacketParser() 642 self.__packets = [] 643