1# -*- coding: utf-8 -*- 2# Copyright 2019 Oticon A/S 3# SPDX-License-Identifier: Apache-2.0 4 5import math 6from components.utils import *; 7from components.basic_commands import *; 8from components.address import *; 9from components.events import *; 10from components.resolvable import *; 11from components.advertiser import *; 12from components.scanner import *; 13from components.initiator import *; 14from components.addata import *; 15from components.preambles import *; 16from components.dump import PacketType; 17from enum import IntEnum 18 19global lowerRandomAddress, upperRandomAddress; 20 21def verifyAndShowEvent(transport, idx, expectedEvent, trace, to=100): 22 23 event = get_event(transport, idx, to); 24 trace.trace(7, str(event)); 25 return event.event == expectedEvent; 26 27def verifyNumCompleteEvents(transport, idx, handle, count, trace, to=100): 28 29 success = True 30 while success and count > 0: 31 event = get_event(transport, idx, to) 32 trace.trace(7, str(event)) 33 numHandles, handles, packets = event.decode() 34 success = (event.event == Events.BT_HCI_EVT_NUM_COMPLETED_PACKETS and 35 numHandles == 1 and handles[0] == handle and success) 36 count -= packets[0] 37 38 return success 39 40def verifyAndShowMetaEvent(transport, idx, expectedEvent, trace): 41 42 event = get_event(transport, idx, 100); 43 trace.trace(7, str(event)); 44 return event.subEvent == expectedEvent; 45 46def verifyAndFetchEvent(transport, idx, expectedEvent, trace): 47 48 event = get_event(transport, idx, 100); 49 trace.trace(7, str(event)); 50 return event.event == expectedEvent, event; 51 52def verifyAndFetchMetaEvent(transport, idx, expectedEvent, trace, to=100): 53 54 event = get_event(transport, idx, to); 55 trace.trace(7, str(event)); 56 return event.subEvent == expectedEvent, event; 57 58def getCommandCompleteEvent(transport, idx, trace): 59 60 return verifyAndShowEvent(transport, idx, Events.BT_HCI_EVT_CMD_COMPLETE, trace); 61 62def readLocalResolvableAddress(transport, idx, identityAddress, trace): 63 64 status, resolvableAddress = le_read_local_resolvable_address(transport, idx, identityAddress.type, identityAddress.address, 100); 65 trace.trace(6, "LE Read Local Resolvable Address returns status: 0x%02X" % status); 66 return getCommandCompleteEvent(transport, idx, trace) and (status == 0), resolvableAddress; 67 68def readPeerResolvableAddress(transport, idx, identityAddress, trace): 69 70 status, resolvableAddress = le_read_peer_resolvable_address(transport, idx, identityAddress.type, identityAddress.address, 100); 71 trace.trace(6, "LE Read Peer Resolvable Address returns status: 0x%02X" % status); 72 return getCommandCompleteEvent(transport, idx, trace) and (status == 0), resolvableAddress; 73 74""" 75 Issue a channel Map Update 76""" 77def channelMapUpdate(transport, idx, channelMap, trace): 78 79 status = le_set_host_channel_classification(transport, idx, toArray(channelMap, 5), 100); 80 trace.trace(6, "LE Set Host Channel Classification returns status: 0x%02X" % status); 81 return getCommandCompleteEvent(transport, idx, trace) and (status == 0); 82 83def setLEEventMask(transport, idx, events, trace): 84 85 status = le_set_event_mask(transport, idx, events, 100); 86 trace.trace(6, "LE Set Event Mask returns status: 0x%02X" % status); 87 return getCommandCompleteEvent(transport, idx, trace) and (status == 0); 88 89def setPrivacyMode(transport, idx, address, mode, trace): 90 91 status = le_set_privacy_mode(transport, idx, address.type, address.address, mode, 100); 92 trace.trace(6, "LE Set Privacy Mode returns status: 0x%02X" % status); 93 return getCommandCompleteEvent(transport, idx, trace) and (status == 0); 94 95def setDataLength(transport, idx, handle, octets, time, trace): 96 97 status, handle = le_set_data_length(transport, idx, handle, octets, time, 100); 98 trace.trace(6, "LE Set Data Length returns status: 0x%02X handle: 0x%04X" % (status, handle)); 99 return getCommandCompleteEvent(transport, idx, trace) and (status == 0); 100 101def readBufferSize(transport, idx, trace): 102 103 status, maxPacketLength, maxPacketNumbers = le_read_buffer_size(transport, idx, 100); 104 trace.trace(6, "LE Read Buffer Size returns status: 0x%02X - Data Packet length %d, Number of Data Packets %d" % (status, maxPacketLength, maxPacketNumbers)); 105 return getCommandCompleteEvent(transport, idx, trace) and (status == 0), maxPacketLength, maxPacketNumbers; 106 107def readBufferSizeV2(transport, idx, trace): 108 109 status, maxPacketLength, maxPacketNumbers, maxIsoPacketLength, maxIsoPacketNumbers = le_read_buffer_size_v2(transport, idx, 100); 110 trace.trace(6, "LE Read Buffer Size V2 returns status: 0x%02X - Data Packet length %d, Number of Data Packets %d, ISO Data Packet length %d, Number of ISO Data Packets %d" % (status, maxPacketLength, maxPacketNumbers, maxIsoPacketLength, maxIsoPacketNumbers)); 111 return getCommandCompleteEvent(transport, idx, trace) and (status == 0), maxPacketLength, maxPacketNumbers, maxIsoPacketLength, maxIsoPacketNumbers; 112 113def readLocalFeatures(transport, idx, trace): 114 115 status, features = le_read_local_supported_features(transport, idx, 100) 116 trace.trace(6, "LE Read Local Supported Features returns status: 0x%02X" % status); 117 return getCommandCompleteEvent(transport, idx, trace) and (status == 0), features; 118 119def readRemoteFeatures(transport, idx, handle, trace): 120 121 status = le_read_remote_features(transport, idx, handle, 100); 122 trace.trace(6, "LE Read Remote Features returns status: 0x%02X" % status); 123 return verifyAndShowEvent(transport, idx, Events.BT_HCI_EVT_CMD_STATUS, trace) and (status == 0); 124 125def readRemoteVersionInformation(transport, idx, handle, trace): 126 127 status = read_remote_version_information(transport, idx, handle, 100); 128 trace.trace(6, "Read Remote Version Information returns status: 0x%02X" % status); 129 return verifyAndShowEvent(transport, idx, Events.BT_HCI_EVT_CMD_STATUS, trace) and (status == 0); 130 131def addAddressesToFilterAcceptList(transport, idx, addresses, trace): 132 133 _addresses = [ [ _.type, toNumber(_.address) ] for _ in addresses ]; 134 return preamble_specific_filter_accept_listed(transport, idx, _addresses, trace); 135 136""" 137 Send a DATA package... 138""" 139def writeData(transport, idx, handle, pbFlags, txData, trace): 140 141 status = le_data_write(transport, idx, handle, pbFlags, 0, txData, 100); 142 trace.trace(6, "LE Data Write returns status: 0x%02X" % status); 143 success = status == 0; 144 145 dataSent = has_event(transport, idx, 200)[0]; 146 success = success and dataSent; 147 if dataSent: 148 dataSent = verifyAndShowEvent(transport, idx, Events.BT_HCI_EVT_NUM_COMPLETED_PACKETS, trace); 149 success = success and dataSent; 150 151 return success; 152 153def encrypt(transport, idx, key, plaintext, trace): 154 155 status, encrypted = le_encrypt(transport, idx, key, plaintext, 2000); 156 trace.trace(6, "LE Encrypt Command returns status: 0x%02X" % status); 157 success = getCommandCompleteEvent(transport, idx, trace) and (status == 0); 158 return success, encrypted; 159 160""" 161 Read a single DATA Package... 162""" 163def readData(transport, idx, trace, timeout=200): 164 rxData = []; 165 166 dataReady = le_data_ready(transport, idx, timeout); 167 if dataReady: 168 rxPBFlags, rxBCFlags, rxDataPart = le_data_read(transport, idx, 100)[2:]; 169 trace.trace(6, "LE Data Read returns PB=%d BC=%d - %2d data bytes: %s" % \ 170 (rxPBFlags, rxBCFlags, len(rxDataPart), formatArray(rxDataPart))); 171 rxData = list(rxDataPart); 172 173 return (len(rxData) > 0), rxData; 174 175""" 176 Read and concatenate multiple DATA Packages... 177""" 178def readDataFragments(transport, idx, trace, timeout=100): 179 success, rxData = True, []; 180 181 while success: 182 dataReady = le_data_ready(transport, idx, timeout); 183 success = success and dataReady; 184 if dataReady: 185 rxPBFlags, rxBCFlags, rxDataPart = le_data_read(transport, idx, 100)[2:]; 186 trace.trace(6, "LE Data Read returns PB=%d BC=%d - %2d data bytes: %s" % \ 187 (rxPBFlags, rxBCFlags, len(rxDataPart), formatArray(rxDataPart))); 188 rxData += rxDataPart; 189 timeout = 99; 190 191 return (len(rxData) > 0), rxData; 192 193def hasConnectionUpdateCompleteEvent(transport, idx, trace): 194 195 success, status = has_event(transport, idx, 100)[0], -1; 196 if success: 197 success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_CONN_UPDATE_COMPLETE, trace); 198 if success: 199 status, handle, interval, latency, timeout = event.decode(); 200 success = status == 0; 201 return success, status; 202 203def hasChannelSelectionAlgorithmEvent(transport, idx, trace): 204 205 success, status, handle, chSelAlgorithm = has_event(transport, idx, 100)[0], -1, -1, -1; 206 if success: 207 success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_CHAN_SEL_ALGO, trace); 208 if success: 209 handle, chSelAlgorithm = event.decode(); 210 return success, handle, chSelAlgorithm; 211 212def hasDataLengthChangedEvent(transport, idx, trace): 213 214 success, handle, maxTxOctets, maxTxTime, maxRxOctets, maxRxTime = has_event(transport, idx, 200)[0], -1, -1, -1, -1, -1; 215 if success: 216 success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_DATA_LEN_CHANGE, trace); 217 if success: 218 handle, maxTxOctets, maxTxTime, maxRxOctets, maxRxTime = event.decode(); 219 return success, handle, maxTxOctets, maxTxTime, maxRxOctets, maxRxTime; 220 221def hasReadRemoteFeaturesCompleteEvent(transport, idx, trace): 222 223 success, handle, features = has_event(transport, idx, 100)[0], -1, []; 224 if success: 225 success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_REMOTE_FEAT_COMPLETE, trace); 226 if success: 227 status, handle, features = event.decode(); 228 success = status == 0; 229 return success, handle, toArray(features,8); 230 231def hasReadRemoteVersionInformationCompleteEvent(transport, idx, trace): 232 233 success, handle, version, manufacturer, subVersion = has_event(transport, idx, 100)[0], -1, -1, -1, -1; 234 if success: 235 success, event = verifyAndFetchEvent(transport, idx, Events.BT_HCI_EVT_REMOTE_VERSION_INFO, trace); 236 if success: 237 status, handle, version, manufacturer, subVersion = event.decode(); 238 success = status == 0; 239 return success, handle, version, manufacturer, subVersion; 240 241def hasLeCisRequestMetaEvent(transport, idx, trace, to): 242 243 success, aclConnectionHandle, cisConnectionHandle, cigId, cisId = has_event(transport, idx, to)[0], -1, -1, -1, -1 244 if success: 245 success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_CIS_REQUEST, trace) 246 if success: 247 aclConnectionHandle, cisConnectionHandle, cigId, cisId = event.decode() 248 return success, aclConnectionHandle, cisConnectionHandle, cigId, cisId 249 250 251def hasLeLtkRequestMetaEvent(transport, idx, trace, to): 252 253 success, handle, rand, ediv = has_event(transport, idx, to)[0], -1, -1, -1 254 if success: 255 success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_LTK_REQUEST, trace) 256 if success: 257 handle, rand, ediv = event.decode() 258 return success, handle, rand, ediv 259 260 261def hasEncryptionChangeEvent(transport, idx, trace, to): 262 success, status, handle, enabled, key_size = has_event(transport, idx, to)[0], -1, -1, -1, -1 263 if success: 264 event = get_event(transport, idx, to) 265 trace.trace(7, str(event)) 266 if event.event == Events.BT_HCI_EVT_ENCRYPT_CHANGE_V1: 267 status, handle, enabled = event.decode() 268 elif event.event == Events.BT_HCI_EVT_ENCRYPT_CHANGE_V2: 269 status, handle, enabled, key_size = event.decode() 270 271 success = status == 0 and success 272 273 return success, handle, enabled, key_size 274 275 276def calcMaxPacketTime(packetLength): 277 # (preamble + AA + header + packetLength + MIC + CRC) * us/byte 278 return (1 + 4 + 2 + packetLength + 4 + 3 ) * 8 279 280def calcMaxIsoSdu(Framing, BN, Max_PDU, ISO_Interval, SDU_Interval, Max_SDU_Supported): 281 if Framing == 0: 282 return calcUnframedMaxIsoSdu(BN, Max_PDU, ISO_Interval, SDU_Interval, Max_SDU_Supported) 283 elif Framing == 1: 284 return Max_SDU_Supported 285 else: 286 raise ValueError("Framing must be 0 or 1") 287 288def calcUnframedMaxIsoSdu(BN, Max_PDU, ISO_Interval, SDU_Interval, Max_SDU_Supported): 289 # BLUETOOTH CORE SPECIFICATION Version 5.2 | Vol 6, Part G, 2.1 UNFRAMED PDU: 290 # 291 # BN = ceil(Max_SDU / Max_PDU) * (ISO_Interval / SDU_Interval). 292 # (BN / (ISO_Interval / SDU_Interval)) = ceil(Max_SDU / Max_PDU) 293 # (BN / (ISO_Interval / SDU_Interval)) - 1 < Max_SDU / Max_PDU <= (BN / (ISO_Interval / SDU_Interval)) 294 # ((BN / (ISO_Interval / SDU_Interval)) - 1) * Max_PDU < Max_SDU <= (BN / (ISO_Interval / SDU_Interval)) * Max_PDU 295 # 296 # Max_SDU = Max_PDU * BN * (SDU_Interval / ISO_Interval) 297 298 Max_SDU = int(Max_PDU * BN * (SDU_Interval / ISO_Interval)) 299 300 # Clamp 301 return min(Max_SDU, Max_SDU_Supported) 302 303def matchingReportType(advertiseType): 304 305 if advertiseType == Advertising.CONNECTABLE_UNDIRECTED: 306 reportType = AdvertisingReport.ADV_IND; 307 elif advertiseType == Advertising.CONNECTABLE_HDC_DIRECTED or advertiseType == Advertising.CONNECTABLE_LDC_DIRECTED: 308 reportType = AdvertisingReport.ADV_DIRECT_IND; 309 elif advertiseType == Advertising.SCANNABLE_UNDIRECTED: 310 reportType = AdvertisingReport.ADV_SCAN_IND; 311 elif advertiseType == Advertising.NON_CONNECTABLE_UNDIRECTED: 312 reportType = AdvertisingReport.ADV_NONCONN_IND; 313 else: 314 reportType = AdvertisingReport.ADV_IND; 315 return reportType; 316 317def publicIdentityAddress(idx): 318 319 return Address( SimpleAddressType.PUBLIC, 0x123456789ABC if idx == 0 else 0x456789ABCDEF ); 320 321def randomIdentityAddress(idx): 322 global lowerRandomAddress, upperRandomAddress; 323 324 return Address( SimpleAddressType.RANDOM, upperRandomAddress if idx == 0 else lowerRandomAddress ); 325 326def resolvablePublicAddress(idx): 327 328 return Address( ExtendedAddressType.RESOLVABLE_OR_PUBLIC, 0x123456789ABC if idx == 0 else 0x456789ABCDEF ); 329 330def setStaticRandomAddress(transport, idx, trace): 331 global lowerRandomAddress, upperRandomAddress; 332 333 address = upperRandomAddress if idx == 0 else lowerRandomAddress; 334 if (toNumber(address)) & 0xC00000000000 != 0xC00000000000: 335 address = toArray(toNumber(address) | 0xC00000000000, 6); 336 preamble_set_random_address(transport, idx, toNumber(address), trace); 337 if idx == 0: 338 upperRandomAddress = address; 339 else: 340 lowerRandomAddress = address; 341 342def setNonResolvableRandomAddress(transport, idx, trace): 343 global lowerRandomAddress, upperRandomAddress; 344 345 address = upperRandomAddress if idx == 0 else lowerRandomAddress; 346 if (toNumber(address)) & 0xC00000000000 != 0x000000000000: 347 address = toArray(toNumber(address) & 0x3FFFFFFFFFFF, 6); 348 preamble_set_random_address(transport, idx, toNumber(address), trace); 349 if idx == 0: 350 upperRandomAddress = address; 351 else: 352 lowerRandomAddress = address; 353 354def setPassiveScanning(transport, scannerId, trace, advertiseType, advertiseReports=100, \ 355 advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, advertiseChannels=AdvertiseChannel.ALL_CHANNELS, \ 356 scanFilter=ScanningFilterPolicy.FILTER_NONE): 357 358 advertiserId = scannerId ^ 1; 359 advertiserAddress = Address( ExtendedAddressType.PUBLIC ); 360 peerAddress = publicIdentityAddress( scannerId ); 361 362 advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter); 363 364 scannerAddress = Address( ExtendedAddressType.PUBLIC ); 365 366 scanner = Scanner(transport, scannerId, trace, ScanType.PASSIVE, matchingReportType(advertiseType), scannerAddress, scanFilter, advertiseReports); 367 368 return advertiser, scanner; 369 370def setActiveScanning(transport, scannerId, trace, advertiseType, advertiseReports=1, advertiseResponses=1, \ 371 advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, advertiseChannels=AdvertiseChannel.ALL_CHANNELS, \ 372 scanFilter=ScanningFilterPolicy.FILTER_NONE): 373 374 advertiserId = scannerId ^ 1; 375 advertiserAddress = Address( ExtendedAddressType.PUBLIC ); 376 peerAddress = publicIdentityAddress( scannerId ); 377 378 advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter); 379 380 scannerAddress = Address( ExtendedAddressType.PUBLIC ); 381 382 scanner = Scanner(transport, scannerId, trace, ScanType.ACTIVE, matchingReportType(advertiseType), scannerAddress, scanFilter, advertiseReports, advertiseResponses); 383 384 return advertiser, scanner; 385 386def setPrivatePassiveScanning(transport, scannerId, trace, advertiseType, advertiseReports=100, \ 387 advertiserAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, scannerAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, \ 388 advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, advertiseChannels=AdvertiseChannel.ALL_CHANNELS, \ 389 scanFilter=ScanningFilterPolicy.FILTER_NONE): 390 391 advertiserId = scannerId ^ 1; 392 advertiserAddress = Address( advertiserAddressType ); 393 if (advertiserAddressType == ExtendedAddressType.RANDOM) or (advertiserAddressType == ExtendedAddressType.RESOLVABLE_OR_RANDOM): 394 peerAddress = randomIdentityAddress( scannerId ); 395 else: 396 peerAddress = publicIdentityAddress( scannerId ); 397 398 advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter); 399 400 scannerAddress = Address( scannerAddressType ); 401 402 scanner = Scanner(transport, scannerId, trace, ScanType.PASSIVE, matchingReportType(advertiseType), scannerAddress, scanFilter, advertiseReports); 403 404 return advertiser, scanner; 405 406def setPrivateActiveScanning(transport, scannerId, trace, advertiseType, advertiseReports=1, advertiseResponses=1, \ 407 advertiserAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, scannerAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, \ 408 advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, advertiseChannels=AdvertiseChannel.ALL_CHANNELS, \ 409 scanFilter=ScanningFilterPolicy.FILTER_NONE): 410 411 advertiserId = scannerId ^ 1; 412 advertiserAddress = Address( advertiserAddressType ); 413 if (advertiserAddressType == ExtendedAddressType.RANDOM) or (advertiserAddressType == ExtendedAddressType.RESOLVABLE_OR_RANDOM): 414 peerAddress = randomIdentityAddress( scannerId ); 415 else: 416 peerAddress = publicIdentityAddress( scannerId ); 417 418 advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter); 419 420 scannerAddress = Address( scannerAddressType ); 421 422 scanner = Scanner(transport, scannerId, trace, ScanType.ACTIVE, matchingReportType(advertiseType), scannerAddress, scanFilter, advertiseReports, advertiseResponses); 423 424 return advertiser, scanner; 425 426def setPublicInitiator(transport, initiatorId, trace, advertiseType, advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, \ 427 advertiseChannels=AdvertiseChannel.ALL_CHANNELS, initiatorFilterPolicy=InitiatorFilterPolicy.FILTER_NONE): 428 429 advertiserId = initiatorId ^ 1; 430 advertiserAddress = Address( ExtendedAddressType.PUBLIC ); 431 peerAddress = publicIdentityAddress( initiatorId ); 432 433 advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter); 434 435 initiatorAddress = Address( ExtendedAddressType.PUBLIC ); 436 responderAddress = publicIdentityAddress( advertiserId ); 437 438 initiator = Initiator(transport, initiatorId, advertiserId, trace, initiatorAddress, responderAddress, initiatorFilterPolicy); 439 440 return advertiser, initiator; 441 442def setPrivateInitiator(transport, initiatorId, trace, advertiseType, advertiserAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, \ 443 initiatorAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, \ 444 advertiseChannels=AdvertiseChannel.ALL_CHANNELS, initiatorFilterPolicy=InitiatorFilterPolicy.FILTER_NONE): 445 446 advertiserId = initiatorId ^ 1; 447 advertiserAddress = Address( advertiserAddressType ); 448 if (advertiserAddressType == ExtendedAddressType.RANDOM) or (advertiserAddressType == ExtendedAddressType.RESOLVABLE_OR_RANDOM): 449 peerAddress = randomIdentityAddress( initiatorId ); 450 else: 451 peerAddress = publicIdentityAddress( initiatorId ); 452 453 advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter); 454 455 initiatorAddress = Address( initiatorAddressType ); 456 if (initiatorAddressType == ExtendedAddressType.RANDOM) or (initiatorAddressType == ExtendedAddressType.RESOLVABLE_OR_RANDOM): 457 responderAddress = randomIdentityAddress( advertiserId ); 458 else: 459 responderAddress = publicIdentityAddress( advertiserId ); 460 461 initiator = Initiator(transport, initiatorId, advertiserId, trace, initiatorAddress, responderAddress, initiatorFilterPolicy); 462 463 return advertiser, initiator; 464 465 466def le_iso_data_write_fragments(transport, idx, trace, conn_handle, data, iso_buffer_len): 467 TsFlag = 0 468 cont = False 469 offset = 0 470 fragments = 0 471 success = True 472 473 while success and offset < len(data): 474 fragment_len = min(iso_buffer_len, len(data[offset:])) 475 476 PbFlag = 0 477 if cont: 478 PbFlag |= 1 479 if len(data[offset:]) <= iso_buffer_len: 480 PbFlag |= 2 481 cont = True 482 483 status = le_iso_data_write(transport, idx, conn_handle, PbFlag, TsFlag, data[offset:offset+fragment_len], 0) 484 success = (status == 0) and success 485 486 offset += fragment_len 487 fragments += 1 488 489 return success, fragments 490 491 492def le_iso_data_write_complete(transport, idx, trace, number_of_packets_written, to): 493 success = True 494 for _ in range(number_of_packets_written): 495 success = le_iso_data_write_rsp(transport, idx, to) == 0 and success 496 497 return success 498 499 500def le_iso_data_write_nbytes(transport, idx, trace, conn_handle, nbytes, pkt_seq_num, iso_buffer_len): 501 iso_data_sdu = tuple([pkt_seq_num] * nbytes) 502 tx_iso_data_load = struct.pack(f'<HH{nbytes}B', pkt_seq_num, nbytes, *iso_data_sdu) 503 success, fragments = le_iso_data_write_fragments(transport, idx, trace, conn_handle, tx_iso_data_load, iso_buffer_len) 504 505 return success, iso_data_sdu 506 507 508def fetch_number_of_completed_packets(transport, idx, trace, number_of_packets_written, to): 509 number_of_completed_packets = {} 510 511 while sum(number_of_completed_packets.values()) < number_of_packets_written: 512 event = get_event(transport, idx, to) 513 if event.event != Events.BT_HCI_EVT_NUM_COMPLETED_PACKETS: 514 break 515 516 num_handles, conn_handles, num_packets = event.decode() 517 for i in range(num_handles): 518 if conn_handles[i] in number_of_completed_packets: 519 number_of_completed_packets[conn_handles[i]] += num_packets[i] 520 else: 521 number_of_completed_packets[conn_handles[i]] = num_packets[i] 522 523 return len(number_of_completed_packets), number_of_completed_packets.keys(), number_of_completed_packets.values() 524 525 526class PbFlags(IntEnum): 527 FIRST = 0 528 CONTINUATION = 1 529 COMPLETE = 2 530 LAST = 3 531 532 533def iso_receive_sdu(transport, idx, trace, sdu_interval): 534 iso_sdu = [] 535 iso_sdu_len = 0 536 success = True 537 while success: 538 success = le_iso_data_ready(transport, idx, sdu_interval * 2) and success 539 if not success: 540 return success, -1, tuple([]) 541 542 time, handle, pbflags, tsflag, rx_iso_data_load = le_iso_data_read(transport, idx, 100) 543 rx_iso_data_load = bytearray(rx_iso_data_load) 544 545 # Unpack ISO_Data_Load 546 rx_offset = 0 547 # a. Get Time_Stamp if present 548 if tsflag: 549 fmt = '<I' 550 (time_stamp,) = struct.unpack_from(fmt, rx_iso_data_load) 551 rx_offset += struct.calcsize(fmt) 552 553 # FIXME: Temporary workaround for Packetcraft not being along with Core version Sydney. 554 # Packet_Sequence_Number, Packet_Status_Flag and ISO_SDU_Length shall be sent for all fragments 555 if pbflags == PbFlags.FIRST or pbflags == PbFlags.COMPLETE: 556 # b. Get Packet_Sequence_Number, ISO_SDU_Length and Packet_Status_Flag 557 fmt = '<HH' 558 rx_packet_sequence_number, rx_iso_sdu_length = struct.unpack_from(fmt, rx_iso_data_load, rx_offset) 559 rx_offset += struct.calcsize(fmt) 560 rx_packet_status_flag = rx_iso_sdu_length >> 14 561 rx_iso_sdu_length &= 0xfff # 12 bits valid 562 563 success = (iso_sdu_len == 0) and success 564 iso_sdu_len = rx_iso_sdu_length 565 566 rx_iso_sdu = rx_iso_data_load[rx_offset:] 567 iso_sdu.extend(rx_iso_sdu) 568 569 # Valid data. The complete ISO_SDU was received correctly. 570 success = (rx_packet_status_flag == 0x00) and success 571 572 if pbflags == PbFlags.LAST or pbflags == PbFlags.COMPLETE: 573 success = (len(iso_sdu) == iso_sdu_len) and success 574 break 575 576 success = (len(iso_sdu) < iso_sdu_len) and success 577 578 return success, handle, tuple(iso_sdu) 579 580 581def iso_send_payload_pdu(transport, transmitter, receiver, trace, conn_handle, max_data_size, sdu_interval, pkt_seq_num, 582 tx_iso_sdu=None): 583 # Create a ISO_SDU of sdu_size length 584 if not tx_iso_sdu: 585 tx_iso_sdu = tuple([(pkt_seq_num + x) % 255 for x in range(max_data_size)]) 586 587 # Pack the ISO_Data_Load (no Time_Stamp) of an HCI ISO Data packet 588 # <Packet_Sequence_Number, ISO_SDU_Length, ISO_SDU> 589 fmt = '<HH{ISO_SDU_Length}B'.format(ISO_SDU_Length=len(tx_iso_sdu)) 590 tx_iso_data_load = struct.pack(fmt, pkt_seq_num, len(tx_iso_sdu), *tx_iso_sdu) 591 592 # Transmitter: TX SDU 593 success, _, _, iso_buffer_len, _ = readBufferSizeV2(transport, transmitter, trace) 594 s, fragments = le_iso_data_write_fragments(transport, transmitter, trace, conn_handle, tx_iso_data_load, iso_buffer_len) 595 sucess = s and success 596 success = le_iso_data_write_complete(transport, transmitter, trace, fragments, 100) and success 597 success = verifyNumCompleteEvents(transport, transmitter, conn_handle, fragments, trace, sdu_interval * 2) and success 598 599 s, _, rx_iso_sdu = iso_receive_sdu(transport, receiver, trace, sdu_interval) 600 success = s and success 601 602 # Transmitter: No RX 603 success = not le_iso_data_ready(transport, transmitter, 100) and success 604 605 # Receiver: No more RX data 606 success = not le_iso_data_ready(transport, receiver, 100) and success 607 608 # TX and RX match 609 return (tx_iso_sdu == rx_iso_sdu) and success 610 611 612def iso_send_payload_pdu_parallel(transport, idx_1, idx_2, trace, conn_handle_1, conn_handle_2, max_data_size, 613 sdu_interval, pkt_seq_num): 614 # Create a ISO_SDU of sdu_size length 615 tx_iso_sdu = tuple([(pkt_seq_num + x) % 255 for x in range(max_data_size)]) 616 617 # Pack the ISO_Data_Load (no Time_Stamp) of an HCI ISO Data packet 618 # <Packet_Sequence_Number, ISO_SDU_Length, ISO_SDU> 619 fmt = '<HH{ISO_SDU_Length}B'.format(ISO_SDU_Length=len(tx_iso_sdu)) 620 tx_iso_data_load = struct.pack(fmt, pkt_seq_num, len(tx_iso_sdu), *tx_iso_sdu) 621 622 # Feed TX buffers 623 success, _, _, iso_buffer_len_1, _ = readBufferSizeV2(transport, idx_1, trace) 624 s, _, _, iso_buffer_len_2, _ = readBufferSizeV2(transport, idx_2, trace) 625 success = s and success 626 s, fragments_1 = le_iso_data_write_fragments(transport, idx_1, trace, conn_handle_1, tx_iso_data_load, iso_buffer_len_1) 627 success = s and success 628 s, fragments_2 = le_iso_data_write_fragments(transport, idx_2, trace, conn_handle_2, tx_iso_data_load, iso_buffer_len_2) 629 success = s and success 630 631 # Wait for data to be sent; fetch EDTT command response and Number of Completed packets event 632 success = le_iso_data_write_complete(transport, idx_1, trace, fragments_1, 100) and success 633 success = le_iso_data_write_complete(transport, idx_2, trace, fragments_2, 100) and success 634 success = verifyNumCompleteEvents(transport, idx_1, conn_handle_1, fragments_1, trace) and success 635 success = verifyNumCompleteEvents(transport, idx_2, conn_handle_2, fragments_2, trace) and success 636 637 # Check the data received 638 s, _, rx_iso_sdu = iso_receive_sdu(transport, idx_1, trace, sdu_interval) 639 success = s and tx_iso_sdu == rx_iso_sdu and success 640 641 s, _, rx_iso_sdu = iso_receive_sdu(transport, idx_2, trace, sdu_interval) 642 success = s and tx_iso_sdu == rx_iso_sdu and success 643 644 return success 645 646 647def set_isochronous_channels_host_support(transport, device, trace, value): 648 status = le_set_host_feature(transport, device, FeatureSupport.ISOCHRONOUS_CHANNELS, value, 100) 649 return getCommandCompleteEvent(transport, device, trace) and (status == 0x00) 650 651 652def establish_acl_connection(transport, central, peripheral, trace, interval=None, supervision_timeout=None): 653 advertiser, initiator = setPublicInitiator(transport, central, trace, Advertising.CONNECTABLE_UNDIRECTED) 654 if interval: 655 initiator.intervalMin = initiator.intervalMax = interval 656 657 if supervision_timeout: 658 initiator.supervisionTimeout = supervision_timeout 659 660 success = advertiser.enable() 661 connected = initiator.connect() 662 success = success and connected 663 664 if not connected: 665 success = advertiser.disable() and success 666 667 return success, advertiser, initiator 668 669 670def establish_cis_connection(transport, central, peripheral, trace, params, acl_conn_handle, setup_iso_data_path=True, 671 use_test_cmd=True): 672 success = True 673 674 # LT: Set CIG Parameters for Test 675 if use_test_cmd: 676 status, cigId, cisCount, central_cis_handles = \ 677 le_set_cig_parameters_test(transport, central, 0, *params.get_cig_parameters_test(), 100) 678 else: 679 status, cigId, cisCount, central_cis_handles = \ 680 le_set_cig_parameters(transport, central, 0, *params.get_cig_parameters(), 100) 681 682 success = getCommandCompleteEvent(transport, central, trace) and (status == 0x00) and success 683 central_acl_handles = [acl_conn_handle] * cisCount 684 peripheral_cis_handles = [-1] * cisCount 685 686 # LT: Create CIS 687 status = le_create_cis(transport, central, cisCount, central_cis_handles, central_acl_handles, 100) 688 success = verifyAndShowEvent(transport, central, Events.BT_HCI_EVT_CMD_STATUS, trace) and (status == 0) and success 689 690 for n in range(cisCount): 691 # UT: Wait for HCI_EVT_LE_CIS_REQUEST 692 s, event = verifyAndFetchMetaEvent(transport, peripheral, MetaEvents.BT_HCI_EVT_LE_CIS_REQUEST, trace) 693 success = s and success 694 _, peripheral_cis_handles[n], cigId, cisId = event.decode() 695 696 # UT: Accept CIS Request 697 status = le_accept_cis_request(transport, peripheral, peripheral_cis_handles[n], 100) 698 success = verifyAndShowEvent(transport, peripheral, Events.BT_HCI_EVT_CMD_STATUS, trace) and (status == 0) and success 699 700 # LT: Wait for HCI_EVT_LE_CIS_ESTABLISHED 701 s, event = verifyAndFetchMetaEvent(transport, central, MetaEvents.BT_HCI_EVT_LE_CIS_ESTABLISHED, trace, 2000) 702 success = s and (event.decode()[0] == 0x00) and success 703 704 # UT: Wait for HCI_EVT_LE_CIS_ESTABLISHED 705 s, event = verifyAndFetchMetaEvent(transport, peripheral, MetaEvents.BT_HCI_EVT_LE_CIS_ESTABLISHED, trace) 706 success = s and (event.decode()[0] == 0x00) and success 707 708 if not setup_iso_data_path: 709 return success, central_cis_handles, peripheral_cis_handles 710 711 for n in range(cisCount): 712 if (params.Max_SDU_C_To_P != 0): 713 # LT: Setup Data Path - Data_Path_Direction=0 (Input) Data_Path_ID=1 (HCI) Codec_ID=0 Controller_Delay=0 Codec_Configuration_Length=0 Codec_Configuration=NULL 714 status, _ = le_setup_iso_data_path(transport, central, central_cis_handles[n], 0, 0, [0, 0, 0, 0, 0], 0, 0, [], 100) 715 success = getCommandCompleteEvent(transport, central, trace) and (status == 0x00) and success 716 717 # UT: Setup Data Path - Data_Path_Direction=1 (Output) Data_Path_ID=1 (HCI) Codec_ID=0 Controller_Delay=0 Codec_Configuration_Length=0 Codec_Configuration=NULL 718 status, _ = le_setup_iso_data_path(transport, peripheral, peripheral_cis_handles[n], 1, 0, [0, 0, 0, 0, 0], 0, 0, [], 100) 719 success = getCommandCompleteEvent(transport, peripheral, trace) and (status == 0x00) and success 720 721 if (params.Max_SDU_P_To_C != 0): 722 # LT: Setup Data Path - Data_Path_Direction=1 (Output) Data_Path_ID=1 (HCI) Codec_ID=0 Controller_Delay=0 Codec_Configuration_Length=0 Codec_Configuration=NULL 723 status, _ = le_setup_iso_data_path(transport, central, central_cis_handles[n], 1, 0, [0, 0, 0, 0, 0], 0, 0, [], 100) 724 success = getCommandCompleteEvent(transport, central, trace) and (status == 0x00) and success 725 726 # UT: Setup Data Path - Data_Path_Direction=0 (Input) Data_Path_ID=1 (HCI) Codec_ID=0 Controller_Delay=0 Codec_Configuration_Length=0 Codec_Configuration=NULL 727 status, _ = le_setup_iso_data_path(transport, peripheral, peripheral_cis_handles[n], 0, 0, [0, 0, 0, 0, 0], 0, 0, [], 100) 728 success = getCommandCompleteEvent(transport, peripheral, trace) and (status == 0x00) and success 729 730 return success, central_cis_handles, peripheral_cis_handles 731 732 733def calc_supervision_timeout(iso_interval_ms): 734 """ 735 Calculate the Supervision timeout for the LE Link that can be used to create a CIG with given ISO_Interval 736 :param iso_interval_ms: ISO_Interval in milliseconds 737 :return: Supervision timeout for the LE Link 738 """ 739 # TS 4.10.1.2 Timing Requirements 740 # "The connSupervisionTimeout for an ACL with associated CISes shall be greater than twice that of the 741 # ISO_Intervals of the associated CISes." 742 supervision_timeout_ms = int(iso_interval_ms * 2 + 250) 743 assert (supervision_timeout_ms < 32000) 744 745 return int(supervision_timeout_ms / 10) 746 747 748def enable_encryption(transport, central, peripheral, trace, conn_handle_c, keys): 749 rand = keys[0] 750 ediv = keys[1] 751 ltk = keys[2] 752 753 status = le_start_encryption(transport, central, conn_handle_c, rand, ediv, ltk, 100) 754 success = verifyAndShowEvent(transport, central, Events.BT_HCI_EVT_CMD_STATUS, trace, 1000) and status == 0x00 755 756 s, conn_handle_p, req_rand, req_ediv = hasLeLtkRequestMetaEvent(transport, peripheral, trace, 1000) 757 success = s and req_rand == rand and req_ediv == ediv and success 758 759 status, handle = le_long_term_key_request_reply(transport, peripheral, conn_handle_p, ltk, 1000) 760 success = getCommandCompleteEvent(transport, peripheral, trace) and status == 0x00 and \ 761 handle == conn_handle_p and success 762 763 s, handle, enabled, key_size = hasEncryptionChangeEvent(transport, peripheral, trace, 1000) 764 success = s and handle == conn_handle_p and enabled == 0x01 and success 765 766 s, handle, enabled, key_size = hasEncryptionChangeEvent(transport, central, trace, 1000) 767 success = s and handle == conn_handle_c and enabled == 0x01 and success 768 769 return success 770 771 772def state_connected_isochronous_stream(transport, peripheral, central, trace, params, 773 setup_iso_data_path=True, enc_keys=None, use_test_cmd=True, 774 adjust_conn_interval=False): 775 # The Isochronous Channels (Host Support) FeatureSet bit is set. 776 success = set_isochronous_channels_host_support(transport, peripheral, trace, 1) 777 success = set_isochronous_channels_host_support(transport, central, trace, 1) and success 778 779 # Adjust connection interval to avoid CIG and ACL events collision 780 if adjust_conn_interval: 781 conn_interval = params.ISO_Interval 782 else: 783 conn_interval = None 784 785 ### ACL Connection Established. IUT (upperTester) is Peripheral. ### 786 s, advertiser, initiator = establish_acl_connection(transport, central, peripheral, trace, conn_interval, 787 calc_supervision_timeout(params.ISO_Interval * 1.25)) 788 success = s and success 789 if not initiator: 790 return success, None, [0xFFFF] * params.CIS_Count 791 792 if enc_keys: 793 success = enable_encryption(transport, central, peripheral, trace, initiator.handles[0], enc_keys) and success 794 795 s, central_cis_handles, peripheral_cis_handles = \ 796 establish_cis_connection(transport, central, peripheral, trace, params, initiator.handles[0], 797 setup_iso_data_path, use_test_cmd) 798 799 return s and success, initiator, peripheral_cis_handles, central_cis_handles 800 801# Helper function for set_complete_ext_adv_data()/set_complete_ext_scan_response_data() 802def _set_complete_ad_sr_data(transport, idx, handle, fragmentPref, advData, set_data_function): 803 maxFragmentSize = 251 804 remainingAdvData = advData[:] 805 firstFragment = True 806 while (len(remainingAdvData) > 0): 807 if firstFragment: 808 if len(remainingAdvData) <= maxFragmentSize: 809 op = FragmentOperation.COMPLETE_FRAGMENT 810 else: 811 op = FragmentOperation.FIRST_FRAGMENT 812 else: 813 if len(remainingAdvData) <= maxFragmentSize: 814 op = FragmentOperation.LAST_FRAGMENT 815 else: 816 op = FragmentOperation.INTERMEDIATE_FRAGMENT 817 endIndex = maxFragmentSize if len(remainingAdvData) >= maxFragmentSize else len(remainingAdvData) 818 status = set_data_function(transport, idx, handle, op, fragmentPref, remainingAdvData[:endIndex], 100) 819 if status != 0: 820 return False 821 remainingAdvData = remainingAdvData[endIndex:] 822 firstFragment = False 823 return True 824 825""" 826 Sets extended advertising data handling fragmentation as needed. The number of fragments used is kept as low as possible. 827 Returns true if succeeded, false otherwise 828""" 829def set_complete_ext_adv_data(transport, idx, handle, fragmentPref, advData): 830 return _set_complete_ad_sr_data(transport, idx, handle, fragmentPref, advData, le_set_extended_advertising_data) 831 832""" 833 Sets extended advertising scan response data handling fragmentation as needed. The number of fragments used is kept as low as possible. 834 Returns true if succeeded, false otherwise 835""" 836def set_complete_ext_scan_response_data(transport, idx, handle, fragmentPref, advData): 837 return _set_complete_ad_sr_data(transport, idx, handle, fragmentPref, advData, le_set_extended_scan_response_data) 838 839""" 840 Wait for the backend of an ADV_IND packet - will advance the time to be just after the next ADV_IND packet 841 (just after in this case meaning within T_IFS so a response can be sent) 842 843 Returns the ADV_IND packet or None if it fails to find one before the timeout 844""" 845def wait_for_ADV_IND_end(transport, packets, timeout): 846 checkInterval = 100 # Note: Has to be less than T_IFS 847 advIndPacket = None 848 timeout = timeout*1000 # Convert to microseconds 849 while timeout > 0: 850 lastPacket = packets.findLast(packet_filter=('ADV_IND')) 851 if lastPacket: 852 # Check that simulation time is just after the ADV_IND has ended (so we can transmit a response) 853 simulation_time = transport.get_last_t() 854 if simulation_time < lastPacket.ts + get_packet_air_time(lastPacket) + 150: 855 # Success, we can continue 856 advIndPacket = lastPacket 857 break 858 # No packet yet - wait a little and try again 859 transport.wait_until_t(transport.get_last_t() + checkInterval) 860 timeout -= checkInterval 861 return advIndPacket 862 863""" 864 Wait for the backend of an AUX_ADV_IND packet - will advance the time to be just after the next AUX_ADV_IND packet 865 (just after in this case meaning within T_IFS so a response can be sent) 866 867 Returns the AUX_ADV_IND packet 868""" 869def wait_for_AUX_ADV_IND_end(transport, packets): 870 # Get an ADV_EXT_IND with an aux ptr pointing to an AUX_ADV_IND packet that hasn't been sent yet 871 auxAdvIndPacket = None 872 auxAdvIndEndTs = 0 873 while auxAdvIndPacket == None: 874 while True: 875 lastPacket = packets.findLast(packet_filter=('ADV_EXT_IND', 'AUX_ADV_IND')) 876 if not auxAdvIndPacket: 877 auxAdvIndPacket = packets.findLast(packet_filter='AUX_ADV_IND') 878 if lastPacket.type == PacketType.ADV_EXT_IND and auxAdvIndPacket: 879 # Calculate end of offset window 880 offsetEnd = (lastPacket.payload['AuxPtr'].auxOffset + 1) * (30 if lastPacket.payload['AuxPtr'].offsetUnits == 0 else 300) 881 # Expected air time of the AUX_ADV_IND packet (assuming no changes from the last one) 882 airTime = get_packet_air_time(auxAdvIndPacket) 883 # Calculate expected (last possible) end time of the coming AUX_ADV_IND packet 884 auxAdvIndEndTs = int(lastPacket.ts + offsetEnd + airTime) 885 break 886 # Don't have the needed packets yet or the last packet was not an ADV_EXT_IND; Wait a little and try again 887 transport.wait(1) 888 # Wait until the calculated end time (but make sure to always wait at least 1 us to avoid deadlocks) 889 transport.wait_until_t(max(auxAdvIndEndTs + 1, transport.get_last_t() + 1)) 890 891 # Verify that the simulation time is within T_IFS of the end of the AUX_ADV_IND packet 892 auxAdvIndPacket = None 893 simulationTime = transport.get_last_t() 894 lastPacket = packets.findLast() 895 if lastPacket.type == PacketType.AUX_ADV_IND and simulationTime < lastPacket.ts + get_packet_air_time(lastPacket) + 150: 896 # Success, we can continue 897 auxAdvIndPacket = lastPacket 898 899 return auxAdvIndPacket 900 901""" 902 Get air time of given packet in microseconds 903""" 904def get_packet_air_time(packet): 905 # Note: Packet air length is: pre-amble + AA + header + payload + CRC 906 return math.ceil(((2 if packet.phy == '2M' else 1) + 4 + 2 + len(packet) + 3)*8/(2 if packet.phy == '2M' else 1)) 907 908""" 909LL.TS.p17 9104.10.1.3 Default Values for Set CIG Parameters Commands 911When using either the HCI_LE_Set_CIG_Parameters or HCI_LE_Set_CIG_Parameters_Test commands, 912the following table defines common default parameters for this section. The test case may specify 913different values. 914""" 915class SetCIGParameters: 916 # Known parameter fields 917 data = [ 918 # Name, Alias, Per CIS, Default Value 919 ('SDU_Interval_C_To_P', 'sdu_int_m2s', False, 20000), # 20 ms 920 ('SDU_Interval_P_To_C', 'sdu_int_s2m', False, 20000), # 20 ms 921 ('ISO_Interval', 'iso_int', False, int(20 // 1.25)), # 20 ms 922 ('CIS_Count', 'cis_cnt', False, 1), # NOTE: Needs to be located before the first "Per CIS" field 923 ('Worst_Case_SCA', None, False, 0), 924 ('Packing', 'packing', False, 0), # Sequential 925 ('Framing', 'framing', False, 0), # Unframed 926 ('NSE', 'nse', True, 3), # Note: Set to 3 or the Max Supported CIS NSE defined in IXIT, whichever is less. 927 ('Max_SDU_C_To_P', 'mx_sdu_m2s', True, None), # NOTE: Calculated in __init__ 928 ('Max_SDU_P_To_C', 'mx_sdu_s2m', True, None), # NOTE: Calculated in __init__ 929 ('Max_PDU_C_To_P', 'mx_pdu_m2s', True, 251), 930 ('Max_PDU_P_To_C', 'mx_pdu_s2m', True, 251), 931 ('PHY_C_To_P', 'phy_m2s', True, 1), # LE 1M PHY 932 ('PHY_P_To_C', 'phy_s2m', True, 1), # LE 1M PHY 933 ('FT_C_To_P', 'ft_m2s', False, 1), 934 ('FT_P_To_C', 'ft_s2m', False, 1), 935 ('BN_C_To_P', 'bn_m2s', True, 1), 936 ('BN_P_To_C', 'bn_s2m', True, 1), 937 ('Max_Transport_Latency_C_To_P', None, False, 40000), # 40 ms 938 ('Max_Transport_Latency_P_To_C', None, False, 40000), # 40 ms 939 ('RTN_C_To_P', None, True, 3), 940 ('RTN_P_To_C', None, True, 3), 941 ('Max_SDU_Supported', None, False, 247), # Maximum ISOAL SDU length 942 ] 943 944 def __init__(self, **kwargs): 945 # Make a list of the known fields 946 fields = [t[0] for t in self.data] + [t[1] for t in self.data] 947 948 # Check for unknown fields 949 for key in kwargs.keys(): 950 if key not in fields: 951 raise ValueError('Unknown field {}'.format(key)) 952 953 # Dynamically set the attributes of the class instance 954 for (field, alias, per_cis, default) in self.data: 955 # Get supplied value or default 956 value = kwargs.get(field, default) 957 958 # Per CIS field? 959 if per_cis: 960 value = self.per_cis_value(value) 961 962 # Create field attribute 963 setattr(self, field, value) 964 965 # Create alias attribute 966 if alias: 967 setattr(self, alias, value) 968 969 # Calculate the Max_SDU_C_To_P and Max_SDU_P_To_C unless given 970 Max_SDU_C_To_P = [None] * self.CIS_Count 971 Max_SDU_P_To_C = [None] * self.CIS_Count 972 973 for n in range(self.CIS_Count): 974 # Calculate default values 975 Max_SDU_C_To_P[n] = calcMaxIsoSdu(self.Framing, self.BN_C_To_P[n], self.Max_PDU_C_To_P[n], 976 self.ISO_Interval * 1.25 * 1000, self.SDU_Interval_C_To_P, 977 self.Max_SDU_Supported) 978 Max_SDU_P_To_C[n] = calcMaxIsoSdu(self.Framing, self.BN_P_To_C[n], self.Max_PDU_P_To_C[n], 979 self.ISO_Interval * 1.25 * 1000, self.SDU_Interval_P_To_C, 980 self.Max_SDU_Supported) 981 982 # Override 983 self.Max_SDU_C_To_P = self.per_cis_value(kwargs.get('Max_SDU_C_To_P', Max_SDU_C_To_P)) 984 self.Max_SDU_P_To_C = self.per_cis_value(kwargs.get('Max_SDU_P_To_C', Max_SDU_P_To_C)) 985 986 def per_cis_value(self, value): 987 if isinstance(value, list): 988 # List value given, so must match CIS_Count 989 if len(value) != self.CIS_Count: 990 raise ValueError('Field {} has wrong length {}, expected {}'.format(key, len(value), self.CIS_Count)) 991 else: 992 # Single value given, so multiply by CIS_Count 993 value = [value] * self.CIS_Count 994 return value 995 996 def get_cig_parameters_test(self): 997 return (self.SDU_Interval_C_To_P, self.SDU_Interval_P_To_C, self.FT_C_To_P, self.FT_P_To_C, self.ISO_Interval, 998 self.Worst_Case_SCA, self.Packing, self.Framing, self.CIS_Count, list(range(self.CIS_Count)), self.NSE, 999 self.Max_SDU_C_To_P, self.Max_SDU_P_To_C, self.Max_PDU_C_To_P, self.Max_PDU_P_To_C, self.PHY_C_To_P, 1000 self.PHY_P_To_C, self.BN_C_To_P, self.BN_P_To_C) 1001 1002 def get_cig_parameters(self): 1003 return (self.SDU_Interval_C_To_P, self.SDU_Interval_P_To_C, self.Worst_Case_SCA, self.Packing, self.Framing, 1004 self.Max_Transport_Latency_C_To_P, self.Max_Transport_Latency_P_To_C, self.CIS_Count, 1005 list(range(self.CIS_Count)), self.Max_SDU_C_To_P, self.Max_SDU_P_To_C, self.PHY_C_To_P, self.PHY_P_To_C, 1006 self.RTN_C_To_P, self.RTN_P_To_C) 1007