1# -*- coding: utf-8 -*-
2# Copyright 2019 Oticon A/S
3# SPDX-License-Identifier: Apache-2.0
4
5import math
6from components.utils import *;
7from components.basic_commands import *;
8from components.address import *;
9from components.events import *;
10from components.resolvable import *;
11from components.advertiser import *;
12from components.scanner import *;
13from components.initiator import *;
14from components.addata import *;
15from components.preambles import *;
16from components.dump import PacketType;
17from enum import IntEnum
18
19global lowerRandomAddress, upperRandomAddress;
20
21def verifyAndShowEvent(transport, idx, expectedEvent, trace, to=100):
22
23    event = get_event(transport, idx, to);
24    trace.trace(7, str(event));
25    return event.event == expectedEvent;
26
27def verifyNumCompleteEvents(transport, idx, handle, count, trace, to=100):
28
29    success = True
30    while success and count > 0:
31        event = get_event(transport, idx, to)
32        trace.trace(7, str(event))
33        numHandles, handles, packets = event.decode()
34        success = (event.event == Events.BT_HCI_EVT_NUM_COMPLETED_PACKETS and
35                   numHandles == 1 and handles[0] == handle and success)
36        count -= packets[0]
37
38    return success
39
40def verifyAndShowMetaEvent(transport, idx, expectedEvent, trace):
41
42    event = get_event(transport, idx, 100);
43    trace.trace(7, str(event));
44    return event.subEvent == expectedEvent;
45
46def verifyAndFetchEvent(transport, idx, expectedEvent, trace):
47
48    event = get_event(transport, idx, 100);
49    trace.trace(7, str(event));
50    return event.event == expectedEvent, event;
51
52def verifyAndFetchMetaEvent(transport, idx, expectedEvent, trace, to=100):
53
54    event = get_event(transport, idx, to);
55    trace.trace(7, str(event));
56    return event.subEvent == expectedEvent, event;
57
58def getCommandCompleteEvent(transport, idx, trace):
59
60    return verifyAndShowEvent(transport, idx, Events.BT_HCI_EVT_CMD_COMPLETE, trace);
61
62def readLocalResolvableAddress(transport, idx, identityAddress, trace):
63
64    status, resolvableAddress = le_read_local_resolvable_address(transport, idx, identityAddress.type, identityAddress.address, 100);
65    trace.trace(6, "LE Read Local Resolvable Address returns status: 0x%02X" % status);
66    return getCommandCompleteEvent(transport, idx, trace) and (status == 0), resolvableAddress;
67
68def readPeerResolvableAddress(transport, idx, identityAddress, trace):
69
70    status, resolvableAddress = le_read_peer_resolvable_address(transport, idx, identityAddress.type, identityAddress.address, 100);
71    trace.trace(6, "LE Read Peer Resolvable Address returns status: 0x%02X" % status);
72    return getCommandCompleteEvent(transport, idx, trace) and (status == 0), resolvableAddress;
73
74"""
75    Issue a channel Map Update
76"""
77def channelMapUpdate(transport, idx, channelMap, trace):
78
79    status = le_set_host_channel_classification(transport, idx, toArray(channelMap, 5), 100);
80    trace.trace(6, "LE Set Host Channel Classification returns status: 0x%02X" % status);
81    return getCommandCompleteEvent(transport, idx, trace) and (status == 0);
82
83def setLEEventMask(transport, idx, events, trace):
84
85    status = le_set_event_mask(transport, idx, events, 100);
86    trace.trace(6, "LE Set Event Mask returns status: 0x%02X" % status);
87    return getCommandCompleteEvent(transport, idx, trace) and (status == 0);
88
89def setPrivacyMode(transport, idx, address, mode, trace):
90
91    status = le_set_privacy_mode(transport, idx, address.type, address.address, mode, 100);
92    trace.trace(6, "LE Set Privacy Mode returns status: 0x%02X" % status);
93    return getCommandCompleteEvent(transport, idx, trace) and (status == 0);
94
95def setDataLength(transport, idx, handle, octets, time, trace):
96
97    status, handle = le_set_data_length(transport, idx, handle, octets, time, 100);
98    trace.trace(6, "LE Set Data Length returns status: 0x%02X handle: 0x%04X" % (status, handle));
99    return getCommandCompleteEvent(transport, idx, trace) and (status == 0);
100
101def readBufferSize(transport, idx, trace):
102
103    status, maxPacketLength, maxPacketNumbers = le_read_buffer_size(transport, idx, 100);
104    trace.trace(6, "LE Read Buffer Size returns status: 0x%02X - Data Packet length %d, Number of Data Packets %d" % (status, maxPacketLength, maxPacketNumbers));
105    return getCommandCompleteEvent(transport, idx, trace) and (status == 0), maxPacketLength, maxPacketNumbers;
106
107def readBufferSizeV2(transport, idx, trace):
108
109    status, maxPacketLength, maxPacketNumbers, maxIsoPacketLength, maxIsoPacketNumbers = le_read_buffer_size_v2(transport, idx, 100);
110    trace.trace(6, "LE Read Buffer Size V2 returns status: 0x%02X - Data Packet length %d, Number of Data Packets %d, ISO Data Packet length %d, Number of ISO Data Packets %d" % (status, maxPacketLength, maxPacketNumbers, maxIsoPacketLength, maxIsoPacketNumbers));
111    return getCommandCompleteEvent(transport, idx, trace) and (status == 0), maxPacketLength, maxPacketNumbers, maxIsoPacketLength, maxIsoPacketNumbers;
112
113def readLocalFeatures(transport, idx, trace):
114
115    status, features = le_read_local_supported_features(transport, idx, 100)
116    trace.trace(6, "LE Read Local Supported Features returns status: 0x%02X" % status);
117    return getCommandCompleteEvent(transport, idx, trace) and (status == 0), features;
118
119def readRemoteFeatures(transport, idx, handle, trace):
120
121    status = le_read_remote_features(transport, idx, handle, 100);
122    trace.trace(6, "LE Read Remote Features returns status: 0x%02X" % status);
123    return verifyAndShowEvent(transport, idx, Events.BT_HCI_EVT_CMD_STATUS, trace) and (status == 0);
124
125def readRemoteVersionInformation(transport, idx, handle, trace):
126
127    status = read_remote_version_information(transport, idx, handle, 100);
128    trace.trace(6, "Read Remote Version Information returns status: 0x%02X" % status);
129    return verifyAndShowEvent(transport, idx, Events.BT_HCI_EVT_CMD_STATUS, trace) and (status == 0);
130
131def addAddressesToFilterAcceptList(transport, idx, addresses, trace):
132
133    _addresses = [ [ _.type, toNumber(_.address) ] for _ in addresses ];
134    return preamble_specific_filter_accept_listed(transport, idx, _addresses, trace);
135
136"""
137    Send a DATA package...
138"""
139def writeData(transport, idx, handle, pbFlags, txData, trace):
140
141    status = le_data_write(transport, idx, handle, pbFlags, 0, txData, 100);
142    trace.trace(6, "LE Data Write returns status: 0x%02X" % status);
143    success = status == 0;
144
145    dataSent = has_event(transport, idx, 200)[0];
146    success = success and dataSent;
147    if dataSent:
148        dataSent = verifyAndShowEvent(transport, idx, Events.BT_HCI_EVT_NUM_COMPLETED_PACKETS, trace);
149        success = success and dataSent;
150
151    return success;
152
153def encrypt(transport, idx, key, plaintext, trace):
154
155    status, encrypted = le_encrypt(transport, idx, key, plaintext, 2000);
156    trace.trace(6, "LE Encrypt Command returns status: 0x%02X" % status);
157    success = getCommandCompleteEvent(transport, idx, trace) and (status == 0);
158    return success, encrypted;
159
160"""
161    Read a single DATA Package...
162"""
163def readData(transport, idx, trace, timeout=200):
164    rxData = [];
165
166    dataReady = le_data_ready(transport, idx, timeout);
167    if dataReady:
168        rxPBFlags, rxBCFlags, rxDataPart = le_data_read(transport, idx, 100)[2:];
169        trace.trace(6, "LE Data Read returns PB=%d BC=%d - %2d data bytes: %s" % \
170                       (rxPBFlags, rxBCFlags, len(rxDataPart), formatArray(rxDataPart)));
171        rxData = list(rxDataPart);
172
173    return (len(rxData) > 0), rxData;
174
175"""
176    Read and concatenate multiple DATA Packages...
177"""
178def readDataFragments(transport, idx, trace, timeout=100):
179    success, rxData = True, [];
180
181    while success:
182        dataReady = le_data_ready(transport, idx, timeout);
183        success = success and dataReady;
184        if dataReady:
185            rxPBFlags, rxBCFlags, rxDataPart = le_data_read(transport, idx, 100)[2:];
186            trace.trace(6, "LE Data Read returns PB=%d BC=%d - %2d data bytes: %s" % \
187                           (rxPBFlags, rxBCFlags, len(rxDataPart), formatArray(rxDataPart)));
188            rxData += rxDataPart;
189            timeout = 99;
190
191    return (len(rxData) > 0), rxData;
192
193def hasConnectionUpdateCompleteEvent(transport, idx, trace):
194
195    success, status = has_event(transport, idx, 100)[0], -1;
196    if success:
197        success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_CONN_UPDATE_COMPLETE, trace);
198        if success:
199            status, handle, interval, latency, timeout = event.decode();
200            success = status == 0;
201    return success, status;
202
203def hasChannelSelectionAlgorithmEvent(transport, idx, trace):
204
205    success, status, handle, chSelAlgorithm = has_event(transport, idx, 100)[0], -1, -1, -1;
206    if success:
207        success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_CHAN_SEL_ALGO, trace);
208        if success:
209            handle, chSelAlgorithm = event.decode();
210    return success, handle, chSelAlgorithm;
211
212def hasDataLengthChangedEvent(transport, idx, trace):
213
214    success, handle, maxTxOctets, maxTxTime, maxRxOctets, maxRxTime = has_event(transport, idx, 200)[0], -1, -1, -1, -1, -1;
215    if success:
216        success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_DATA_LEN_CHANGE, trace);
217        if success:
218            handle, maxTxOctets, maxTxTime, maxRxOctets, maxRxTime = event.decode();
219    return success, handle, maxTxOctets, maxTxTime, maxRxOctets, maxRxTime;
220
221def hasReadRemoteFeaturesCompleteEvent(transport, idx, trace, to=100):
222
223    success, handle, features = has_event(transport, idx, to)[0], -1, 0x0;
224    if success:
225        success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_REMOTE_FEAT_COMPLETE, trace);
226        if success:
227            status, handle, features = event.decode();
228            success = status == 0;
229    return success, handle, toArray(features,8);
230
231def hasReadRemoteVersionInformationCompleteEvent(transport, idx, trace):
232
233    success, handle, version, manufacturer, subVersion = has_event(transport, idx, 100)[0], -1, -1, -1, -1;
234    if success:
235        success, event = verifyAndFetchEvent(transport, idx, Events.BT_HCI_EVT_REMOTE_VERSION_INFO, trace);
236        if success:
237            status, handle, version, manufacturer, subVersion = event.decode();
238            success = status == 0;
239    return success, handle, version, manufacturer, subVersion;
240
241def hasLeCisRequestMetaEvent(transport, idx, trace, to):
242
243    success, aclConnectionHandle, cisConnectionHandle, cigId, cisId = has_event(transport, idx, to)[0], -1, -1, -1, -1
244    if success:
245        success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_CIS_REQUEST, trace)
246        if success:
247            aclConnectionHandle, cisConnectionHandle, cigId, cisId = event.decode()
248    return success, aclConnectionHandle, cisConnectionHandle, cigId, cisId
249
250
251def hasLeLtkRequestMetaEvent(transport, idx, trace, to):
252
253    success, handle, rand, ediv = has_event(transport, idx, to)[0], -1, -1, -1
254    if success:
255        success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_LTK_REQUEST, trace)
256        if success:
257            handle, rand, ediv = event.decode()
258    return success, handle, rand, ediv
259
260
261def hasEncryptionChangeEvent(transport, idx, trace, to):
262    success, status, handle, enabled, key_size = has_event(transport, idx, to)[0], -1, -1, -1, -1
263    if success:
264        event = get_event(transport, idx, to)
265        trace.trace(7, str(event))
266        if event.event == Events.BT_HCI_EVT_ENCRYPT_CHANGE_V1:
267            status, handle, enabled = event.decode()
268        elif event.event == Events.BT_HCI_EVT_ENCRYPT_CHANGE_V2:
269            status, handle, enabled, key_size = event.decode()
270
271    success = status == 0 and success
272
273    return success, handle, enabled, key_size
274
275
276def calcMaxPacketTime(packetLength):
277    #      (preamble + AA + header + packetLength + MIC + CRC) * us/byte
278    return (1        + 4  + 2      + packetLength + 4   + 3  ) * 8
279
280def calcMaxIsoSdu(Framing, BN, Max_PDU, ISO_Interval, SDU_Interval, Max_SDU_Supported):
281    if Framing == 0:
282        return calcUnframedMaxIsoSdu(BN, Max_PDU, ISO_Interval, SDU_Interval, Max_SDU_Supported)
283    elif Framing == 1:
284        return Max_SDU_Supported
285    else:
286        raise ValueError("Framing must be 0 or 1")
287
288def calcUnframedMaxIsoSdu(BN, Max_PDU, ISO_Interval, SDU_Interval, Max_SDU_Supported):
289    # BLUETOOTH CORE SPECIFICATION Version 5.2 | Vol 6, Part G, 2.1 UNFRAMED PDU:
290    #
291    # BN = ceil(Max_SDU / Max_PDU) * (ISO_Interval / SDU_Interval).
292    # (BN / (ISO_Interval / SDU_Interval)) = ceil(Max_SDU / Max_PDU)
293    # (BN / (ISO_Interval / SDU_Interval)) - 1 < Max_SDU / Max_PDU <= (BN / (ISO_Interval / SDU_Interval))
294    # ((BN / (ISO_Interval / SDU_Interval)) - 1) * Max_PDU < Max_SDU <= (BN / (ISO_Interval / SDU_Interval)) * Max_PDU
295    #
296    # Max_SDU = Max_PDU * BN * (SDU_Interval / ISO_Interval)
297
298    Max_SDU = int(Max_PDU * BN * (SDU_Interval / ISO_Interval))
299
300    # Clamp
301    return min(Max_SDU, Max_SDU_Supported)
302
303def matchingReportType(advertiseType):
304
305    if   advertiseType == Advertising.CONNECTABLE_UNDIRECTED:
306        reportType = AdvertisingReport.ADV_IND;
307    elif advertiseType == Advertising.CONNECTABLE_HDC_DIRECTED or advertiseType == Advertising.CONNECTABLE_LDC_DIRECTED:
308        reportType = AdvertisingReport.ADV_DIRECT_IND;
309    elif advertiseType == Advertising.SCANNABLE_UNDIRECTED:
310        reportType = AdvertisingReport.ADV_SCAN_IND;
311    elif advertiseType == Advertising.NON_CONNECTABLE_UNDIRECTED:
312        reportType = AdvertisingReport.ADV_NONCONN_IND;
313    else:
314        reportType = AdvertisingReport.ADV_IND;
315    return reportType;
316
317def publicIdentityAddress(idx):
318
319    return Address( SimpleAddressType.PUBLIC, 0x123456789ABC if idx == 0 else 0x456789ABCDEF );
320
321def randomIdentityAddress(idx):
322    global lowerRandomAddress, upperRandomAddress;
323
324    return Address( SimpleAddressType.RANDOM, upperRandomAddress if idx == 0 else lowerRandomAddress );
325
326def resolvablePublicAddress(idx):
327
328    return Address( ExtendedAddressType.RESOLVABLE_OR_PUBLIC, 0x123456789ABC if idx == 0 else 0x456789ABCDEF );
329
330def setStaticRandomAddress(transport, idx, trace):
331    global lowerRandomAddress, upperRandomAddress;
332
333    address = upperRandomAddress if idx == 0 else lowerRandomAddress;
334    if (toNumber(address)) & 0xC00000000000 != 0xC00000000000:
335        address = toArray(toNumber(address) | 0xC00000000000, 6);
336        preamble_set_random_address(transport, idx, toNumber(address), trace);
337        if idx == 0:
338            upperRandomAddress = address;
339        else:
340            lowerRandomAddress = address;
341
342def setNonResolvableRandomAddress(transport, idx, trace):
343    global lowerRandomAddress, upperRandomAddress;
344
345    address = upperRandomAddress if idx == 0 else lowerRandomAddress;
346    if (toNumber(address)) & 0xC00000000000 != 0x000000000000:
347        address = toArray(toNumber(address) & 0x3FFFFFFFFFFF, 6);
348        preamble_set_random_address(transport, idx, toNumber(address), trace);
349        if idx == 0:
350            upperRandomAddress = address;
351        else:
352            lowerRandomAddress = address;
353
354def setPassiveScanning(transport, scannerId, trace, advertiseType, advertiseReports=100, \
355                       advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, advertiseChannels=AdvertiseChannel.ALL_CHANNELS, \
356                       scanFilter=ScanningFilterPolicy.FILTER_NONE):
357
358    advertiserId = scannerId ^ 1;
359    advertiserAddress = Address( ExtendedAddressType.PUBLIC );
360    peerAddress = publicIdentityAddress( scannerId );
361
362    advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter);
363
364    scannerAddress = Address( ExtendedAddressType.PUBLIC );
365
366    scanner = Scanner(transport, scannerId, trace, ScanType.PASSIVE, matchingReportType(advertiseType), scannerAddress, scanFilter, advertiseReports);
367
368    return advertiser, scanner;
369
370def setActiveScanning(transport, scannerId, trace, advertiseType, advertiseReports=1, advertiseResponses=1, \
371                      advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, advertiseChannels=AdvertiseChannel.ALL_CHANNELS, \
372                      scanFilter=ScanningFilterPolicy.FILTER_NONE):
373
374    advertiserId = scannerId ^ 1;
375    advertiserAddress = Address( ExtendedAddressType.PUBLIC );
376    peerAddress = publicIdentityAddress( scannerId );
377
378    advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter);
379
380    scannerAddress = Address( ExtendedAddressType.PUBLIC );
381
382    scanner = Scanner(transport, scannerId, trace, ScanType.ACTIVE, matchingReportType(advertiseType), scannerAddress, scanFilter, advertiseReports, advertiseResponses);
383
384    return advertiser, scanner;
385
386def setPrivatePassiveScanning(transport, scannerId, trace, advertiseType, advertiseReports=100, \
387                              advertiserAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, scannerAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, \
388                              advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, advertiseChannels=AdvertiseChannel.ALL_CHANNELS, \
389                              scanFilter=ScanningFilterPolicy.FILTER_NONE):
390
391    advertiserId = scannerId ^ 1;
392    advertiserAddress = Address( advertiserAddressType );
393    if (advertiserAddressType == ExtendedAddressType.RANDOM) or (advertiserAddressType == ExtendedAddressType.RESOLVABLE_OR_RANDOM):
394        peerAddress = randomIdentityAddress( scannerId );
395    else:
396        peerAddress = publicIdentityAddress( scannerId );
397
398    advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter);
399
400    scannerAddress = Address( scannerAddressType );
401
402    scanner = Scanner(transport, scannerId, trace, ScanType.PASSIVE, matchingReportType(advertiseType), scannerAddress, scanFilter, advertiseReports);
403
404    return advertiser, scanner;
405
406def setPrivateActiveScanning(transport, scannerId, trace, advertiseType, advertiseReports=1, advertiseResponses=1, \
407                             advertiserAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, scannerAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, \
408                             advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, advertiseChannels=AdvertiseChannel.ALL_CHANNELS, \
409                             scanFilter=ScanningFilterPolicy.FILTER_NONE):
410
411    advertiserId = scannerId ^ 1;
412    advertiserAddress = Address( advertiserAddressType );
413    if (advertiserAddressType == ExtendedAddressType.RANDOM) or (advertiserAddressType == ExtendedAddressType.RESOLVABLE_OR_RANDOM):
414        peerAddress = randomIdentityAddress( scannerId );
415    else:
416        peerAddress = publicIdentityAddress( scannerId );
417
418    advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter);
419
420    scannerAddress = Address( scannerAddressType );
421
422    scanner = Scanner(transport, scannerId, trace, ScanType.ACTIVE, matchingReportType(advertiseType), scannerAddress, scanFilter, advertiseReports, advertiseResponses);
423
424    return advertiser, scanner;
425
426def setPublicInitiator(transport, initiatorId, trace, advertiseType, advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, \
427                       advertiseChannels=AdvertiseChannel.ALL_CHANNELS, initiatorFilterPolicy=InitiatorFilterPolicy.FILTER_NONE):
428
429    advertiserId = initiatorId ^ 1;
430    advertiserAddress = Address( ExtendedAddressType.PUBLIC );
431    peerAddress = publicIdentityAddress( initiatorId );
432
433    advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter);
434
435    initiatorAddress = Address( ExtendedAddressType.PUBLIC );
436    responderAddress = publicIdentityAddress( advertiserId );
437
438    initiator = Initiator(transport, initiatorId, advertiserId, trace, initiatorAddress, responderAddress, initiatorFilterPolicy);
439
440    return advertiser, initiator;
441
442def setPrivateInitiator(transport, initiatorId, trace, advertiseType, advertiserAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, \
443                        initiatorAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, \
444                        advertiseChannels=AdvertiseChannel.ALL_CHANNELS, initiatorFilterPolicy=InitiatorFilterPolicy.FILTER_NONE):
445
446    advertiserId = initiatorId ^ 1;
447    advertiserAddress = Address( advertiserAddressType );
448    if (advertiserAddressType == ExtendedAddressType.RANDOM) or (advertiserAddressType == ExtendedAddressType.RESOLVABLE_OR_RANDOM):
449        peerAddress = randomIdentityAddress( initiatorId );
450    else:
451        peerAddress = publicIdentityAddress( initiatorId );
452
453    advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter);
454
455    initiatorAddress = Address( initiatorAddressType );
456    if (initiatorAddressType == ExtendedAddressType.RANDOM) or (initiatorAddressType == ExtendedAddressType.RESOLVABLE_OR_RANDOM):
457        responderAddress = randomIdentityAddress( advertiserId );
458    else:
459        responderAddress = publicIdentityAddress( advertiserId );
460
461    initiator = Initiator(transport, initiatorId, advertiserId, trace, initiatorAddress, responderAddress, initiatorFilterPolicy);
462
463    return advertiser, initiator;
464
465
466def le_iso_data_write_fragments(transport, idx, trace, conn_handle, data, iso_buffer_len):
467    TsFlag = 0
468    cont = False
469    offset = 0
470    fragments = 0
471    success = True
472
473    while success and offset < len(data):
474        fragment_len = min(iso_buffer_len, len(data[offset:]))
475
476        PbFlag = 0
477        if cont:
478            PbFlag |= 1
479        if len(data[offset:]) <= iso_buffer_len:
480            PbFlag |= 2
481        cont = True
482
483        status = le_iso_data_write(transport, idx, conn_handle, PbFlag, TsFlag, data[offset:offset+fragment_len], 0)
484        success = (status == 0) and success
485
486        offset += fragment_len
487        fragments += 1
488
489    return success, fragments
490
491
492def le_iso_data_write_complete(transport, idx, trace, number_of_packets_written, to):
493    success = True
494    for _ in range(number_of_packets_written):
495        success = le_iso_data_write_rsp(transport, idx, to) == 0 and success
496
497    return success
498
499
500def le_iso_data_write_nbytes(transport, idx, trace, conn_handle, nbytes, pkt_seq_num, iso_buffer_len):
501    iso_data_sdu = tuple([pkt_seq_num] * nbytes)
502    tx_iso_data_load = struct.pack(f'<HH{nbytes}B', pkt_seq_num, nbytes, *iso_data_sdu)
503    success, fragments = le_iso_data_write_fragments(transport, idx, trace, conn_handle, tx_iso_data_load, iso_buffer_len)
504
505    return success, iso_data_sdu
506
507
508def fetch_number_of_completed_packets(transport, idx, trace, number_of_packets_written, to):
509    number_of_completed_packets = {}
510
511    while sum(number_of_completed_packets.values()) < number_of_packets_written:
512        event = get_event(transport, idx, to)
513        if event.event != Events.BT_HCI_EVT_NUM_COMPLETED_PACKETS:
514            break
515
516        num_handles, conn_handles, num_packets = event.decode()
517        for i in range(num_handles):
518            if conn_handles[i] in number_of_completed_packets:
519                number_of_completed_packets[conn_handles[i]] += num_packets[i]
520            else:
521                number_of_completed_packets[conn_handles[i]] = num_packets[i]
522
523    return len(number_of_completed_packets), number_of_completed_packets.keys(), number_of_completed_packets.values()
524
525
526class PbFlags(IntEnum):
527    FIRST = 0
528    CONTINUATION = 1
529    COMPLETE = 2
530    LAST = 3
531
532
533def iso_receive_sdu(transport, idx, trace, sdu_interval):
534    iso_sdu = []
535    iso_sdu_len = 0
536    success = True
537    while success:
538        success = le_iso_data_ready(transport, idx, sdu_interval * 2) and success
539        if not success:
540            return success, -1, tuple([])
541
542        time, handle, pbflags, tsflag, rx_iso_data_load = le_iso_data_read(transport, idx, 100)
543        rx_iso_data_load = bytearray(rx_iso_data_load)
544
545        # Unpack ISO_Data_Load
546        rx_offset = 0
547        # a. Get Time_Stamp if present
548        if tsflag:
549            fmt = '<I'
550            (time_stamp,) = struct.unpack_from(fmt, rx_iso_data_load)
551            rx_offset += struct.calcsize(fmt)
552
553        # FIXME: Temporary workaround for Packetcraft not being along with Core version Sydney.
554        #  Packet_Sequence_Number, Packet_Status_Flag and ISO_SDU_Length shall be sent for all fragments
555        if pbflags == PbFlags.FIRST or pbflags == PbFlags.COMPLETE:
556            # b. Get Packet_Sequence_Number, ISO_SDU_Length and Packet_Status_Flag
557            fmt = '<HH'
558            rx_packet_sequence_number, rx_iso_sdu_length = struct.unpack_from(fmt, rx_iso_data_load, rx_offset)
559            rx_offset += struct.calcsize(fmt)
560            rx_packet_status_flag = rx_iso_sdu_length >> 14
561            rx_iso_sdu_length &= 0xfff  # 12 bits valid
562
563            success = (iso_sdu_len == 0) and success
564            iso_sdu_len = rx_iso_sdu_length
565
566        rx_iso_sdu = rx_iso_data_load[rx_offset:]
567        iso_sdu.extend(rx_iso_sdu)
568
569        # Valid data. The complete ISO_SDU was received correctly.
570        success = (rx_packet_status_flag == 0x00) and success
571
572        if pbflags == PbFlags.LAST or pbflags == PbFlags.COMPLETE:
573            success = (len(iso_sdu) == iso_sdu_len) and success
574            break
575
576        success = (len(iso_sdu) < iso_sdu_len) and success
577
578    return success, handle, tuple(iso_sdu)
579
580
581def iso_send_payload_pdu(transport, transmitter, receiver, trace, conn_handle, max_data_size, sdu_interval, pkt_seq_num,
582                         tx_iso_sdu=None):
583    # Create a ISO_SDU of sdu_size length
584    if not tx_iso_sdu:
585        tx_iso_sdu = tuple([(pkt_seq_num + x) % 255 for x in range(max_data_size)])
586
587    # Pack the ISO_Data_Load (no Time_Stamp) of an HCI ISO Data packet
588    # <Packet_Sequence_Number, ISO_SDU_Length, ISO_SDU>
589    fmt = '<HH{ISO_SDU_Length}B'.format(ISO_SDU_Length=len(tx_iso_sdu))
590    tx_iso_data_load = struct.pack(fmt, pkt_seq_num, len(tx_iso_sdu), *tx_iso_sdu)
591
592    # Transmitter: TX SDU
593    success, _, _, iso_buffer_len, _ = readBufferSizeV2(transport, transmitter, trace)
594    s, fragments = le_iso_data_write_fragments(transport, transmitter, trace, conn_handle, tx_iso_data_load, iso_buffer_len)
595    sucess = s and success
596    success = le_iso_data_write_complete(transport, transmitter, trace, fragments, 100) and success
597    success = verifyNumCompleteEvents(transport, transmitter, conn_handle, fragments, trace, sdu_interval * 2) and success
598
599    s, _, rx_iso_sdu = iso_receive_sdu(transport, receiver, trace, sdu_interval)
600    success = s and success
601
602    # Transmitter: No RX
603    success = not le_iso_data_ready(transport, transmitter, 100) and success
604
605    # Receiver: No more RX data
606    success = not le_iso_data_ready(transport, receiver, 100) and success
607
608    # TX and RX match
609    return (tx_iso_sdu == rx_iso_sdu) and success
610
611
612def iso_send_payload_pdu_parallel(transport, idx_1, idx_2, trace, conn_handle_1, conn_handle_2, max_data_size,
613                                  sdu_interval, pkt_seq_num):
614    # Create a ISO_SDU of sdu_size length
615    tx_iso_sdu = tuple([(pkt_seq_num + x) % 255 for x in range(max_data_size)])
616
617    # Pack the ISO_Data_Load (no Time_Stamp) of an HCI ISO Data packet
618    # <Packet_Sequence_Number, ISO_SDU_Length, ISO_SDU>
619    fmt = '<HH{ISO_SDU_Length}B'.format(ISO_SDU_Length=len(tx_iso_sdu))
620    tx_iso_data_load = struct.pack(fmt, pkt_seq_num, len(tx_iso_sdu), *tx_iso_sdu)
621
622    # Feed TX buffers
623    success, _, _, iso_buffer_len_1, _ = readBufferSizeV2(transport, idx_1, trace)
624    s, _, _, iso_buffer_len_2, _ = readBufferSizeV2(transport, idx_2, trace)
625    success = s and success
626    s, fragments_1 = le_iso_data_write_fragments(transport, idx_1, trace, conn_handle_1, tx_iso_data_load, iso_buffer_len_1)
627    success = s and success
628    s, fragments_2 = le_iso_data_write_fragments(transport, idx_2, trace, conn_handle_2, tx_iso_data_load, iso_buffer_len_2)
629    success = s and success
630
631    # Wait for data to be sent; fetch EDTT command response and Number of Completed packets event
632    success = le_iso_data_write_complete(transport, idx_1, trace, fragments_1, 100) and success
633    success = le_iso_data_write_complete(transport, idx_2, trace, fragments_2, 100) and success
634    success = verifyNumCompleteEvents(transport, idx_1, conn_handle_1, fragments_1, trace) and success
635    success = verifyNumCompleteEvents(transport, idx_2, conn_handle_2, fragments_2, trace) and success
636
637    # Check the data received
638    s, _, rx_iso_sdu = iso_receive_sdu(transport, idx_1, trace, sdu_interval)
639    success = s and tx_iso_sdu == rx_iso_sdu and success
640
641    s, _, rx_iso_sdu = iso_receive_sdu(transport, idx_2, trace, sdu_interval)
642    success = s and tx_iso_sdu == rx_iso_sdu and success
643
644    return success
645
646
647def set_isochronous_channels_host_support(transport, device, trace, value):
648    status = le_set_host_feature(transport, device, FeatureSupport.ISOCHRONOUS_CHANNELS, value, 100)
649    return getCommandCompleteEvent(transport, device, trace) and (status == 0x00)
650
651
652def establish_acl_connection(transport, central, peripheral, trace, interval=None, supervision_timeout=None):
653    advertiser, initiator = setPublicInitiator(transport, central, trace, Advertising.CONNECTABLE_UNDIRECTED)
654    if interval:
655        initiator.intervalMin = initiator.intervalMax = interval
656
657    if supervision_timeout:
658        initiator.supervisionTimeout = supervision_timeout
659
660    success = advertiser.enable()
661    connected = initiator.connect()
662    success = success and connected
663
664    if not connected:
665        success = advertiser.disable() and success
666
667    return success, advertiser, initiator
668
669
670def establish_cis_connection(transport, central, peripheral, trace, params, acl_conn_handle, setup_iso_data_path=True,
671                             use_test_cmd=True):
672    success = True
673
674    # LT: Set CIG Parameters for Test
675    if use_test_cmd:
676        status, cigId, cisCount, central_cis_handles = \
677            le_set_cig_parameters_test(transport, central, 0, *params.get_cig_parameters_test(), 100)
678    else:
679        status, cigId, cisCount, central_cis_handles = \
680            le_set_cig_parameters(transport, central, 0, *params.get_cig_parameters(), 100)
681
682    success = getCommandCompleteEvent(transport, central, trace) and (status == 0x00) and success
683    central_acl_handles = [acl_conn_handle] * cisCount
684    peripheral_cis_handles = [-1] * cisCount
685
686    # LT: Create CIS
687    status = le_create_cis(transport, central, cisCount, central_cis_handles, central_acl_handles, 100)
688    success = verifyAndShowEvent(transport, central, Events.BT_HCI_EVT_CMD_STATUS, trace) and (status == 0) and success
689
690    for n in range(cisCount):
691        # UT: Wait for HCI_EVT_LE_CIS_REQUEST
692        s, event = verifyAndFetchMetaEvent(transport, peripheral, MetaEvents.BT_HCI_EVT_LE_CIS_REQUEST, trace)
693        success = s and success
694        _, peripheral_cis_handles[n], cigId, cisId = event.decode()
695
696        # UT: Accept CIS Request
697        status = le_accept_cis_request(transport, peripheral, peripheral_cis_handles[n], 100)
698        success = verifyAndShowEvent(transport, peripheral, Events.BT_HCI_EVT_CMD_STATUS, trace) and (status == 0) and success
699
700        # LT: Wait for HCI_EVT_LE_CIS_ESTABLISHED
701        s, event = verifyAndFetchMetaEvent(transport, central, MetaEvents.BT_HCI_EVT_LE_CIS_ESTABLISHED, trace, 2000)
702        success = s and (event.decode()[0] == 0x00) and success
703
704        # UT: Wait for HCI_EVT_LE_CIS_ESTABLISHED
705        s, event = verifyAndFetchMetaEvent(transport, peripheral, MetaEvents.BT_HCI_EVT_LE_CIS_ESTABLISHED, trace)
706        success = s and (event.decode()[0] == 0x00) and success
707
708    if not setup_iso_data_path:
709        return success, central_cis_handles, peripheral_cis_handles
710
711    for n in range(cisCount):
712        if (params.Max_SDU_C_To_P != 0):
713            # LT: Setup Data Path - Data_Path_Direction=0 (Input)  Data_Path_ID=1 (HCI) Codec_ID=0 Controller_Delay=0 Codec_Configuration_Length=0 Codec_Configuration=NULL
714            status, _ = le_setup_iso_data_path(transport, central, central_cis_handles[n], 0, 0, [0, 0, 0, 0, 0], 0, 0, [], 100)
715            success = getCommandCompleteEvent(transport, central, trace) and (status == 0x00) and success
716
717            # UT: Setup Data Path - Data_Path_Direction=1 (Output) Data_Path_ID=1 (HCI) Codec_ID=0 Controller_Delay=0 Codec_Configuration_Length=0 Codec_Configuration=NULL
718            status, _ = le_setup_iso_data_path(transport, peripheral, peripheral_cis_handles[n], 1, 0, [0, 0, 0, 0, 0], 0, 0, [], 100)
719            success = getCommandCompleteEvent(transport, peripheral, trace) and (status == 0x00) and success
720
721        if (params.Max_SDU_P_To_C != 0):
722            # LT: Setup Data Path - Data_Path_Direction=1 (Output)  Data_Path_ID=1 (HCI) Codec_ID=0 Controller_Delay=0 Codec_Configuration_Length=0 Codec_Configuration=NULL
723            status, _ = le_setup_iso_data_path(transport, central, central_cis_handles[n], 1, 0, [0, 0, 0, 0, 0], 0, 0, [], 100)
724            success = getCommandCompleteEvent(transport, central, trace) and (status == 0x00) and success
725
726            # UT: Setup Data Path - Data_Path_Direction=0 (Input) Data_Path_ID=1 (HCI) Codec_ID=0 Controller_Delay=0 Codec_Configuration_Length=0 Codec_Configuration=NULL
727            status, _ = le_setup_iso_data_path(transport, peripheral, peripheral_cis_handles[n], 0, 0, [0, 0, 0, 0, 0], 0, 0, [], 100)
728            success = getCommandCompleteEvent(transport, peripheral, trace) and (status == 0x00) and success
729
730    return success, central_cis_handles, peripheral_cis_handles
731
732
733def calc_supervision_timeout(iso_interval_ms):
734    """
735    Calculate the Supervision timeout for the LE Link that can be used to create a CIG with given ISO_Interval
736    :param iso_interval_ms: ISO_Interval in milliseconds
737    :return: Supervision timeout for the LE Link
738    """
739    # TS 4.10.1.2 Timing Requirements
740    # "The connSupervisionTimeout for an ACL with associated CISes shall be greater than twice that of the
741    #  ISO_Intervals of the associated CISes."
742    supervision_timeout_ms = int(iso_interval_ms * 2 + 250)
743    assert (supervision_timeout_ms < 32000)
744
745    return int(supervision_timeout_ms / 10)
746
747
748def enable_encryption(transport, central, peripheral, trace, conn_handle_c, keys):
749    rand = keys[0]
750    ediv = keys[1]
751    ltk = keys[2]
752
753    status = le_start_encryption(transport, central, conn_handle_c, rand, ediv, ltk, 100)
754    success = verifyAndShowEvent(transport, central, Events.BT_HCI_EVT_CMD_STATUS, trace, 1000) and status == 0x00
755
756    s, conn_handle_p, req_rand, req_ediv = hasLeLtkRequestMetaEvent(transport, peripheral, trace, 1000)
757    success = s and req_rand == rand and req_ediv == ediv and success
758
759    status, handle = le_long_term_key_request_reply(transport, peripheral, conn_handle_p, ltk, 1000)
760    success = getCommandCompleteEvent(transport, peripheral, trace) and status == 0x00 and \
761              handle == conn_handle_p and success
762
763    s, handle, enabled, key_size = hasEncryptionChangeEvent(transport, peripheral, trace, 1000)
764    success = s and handle == conn_handle_p and enabled == 0x01 and success
765
766    s, handle, enabled, key_size = hasEncryptionChangeEvent(transport, central, trace, 1000)
767    success = s and handle == conn_handle_c and enabled == 0x01 and success
768
769    return success
770
771
772def state_connected_isochronous_stream(transport, peripheral, central, trace, params,
773                                       setup_iso_data_path=True, enc_keys=None, use_test_cmd=True,
774                                       adjust_conn_interval=False):
775    # The Isochronous Channels (Host Support) FeatureSet bit is set.
776    success = set_isochronous_channels_host_support(transport, peripheral, trace, 1)
777    success = set_isochronous_channels_host_support(transport, central, trace, 1) and success
778
779    # Adjust connection interval to avoid CIG and ACL events collision
780    if adjust_conn_interval:
781        conn_interval = params.ISO_Interval
782    else:
783        conn_interval = None
784
785    ### ACL Connection Established. IUT (upperTester) is Peripheral. ###
786    s, advertiser, initiator = establish_acl_connection(transport, central, peripheral, trace, conn_interval,
787                                                        calc_supervision_timeout(params.ISO_Interval * 1.25))
788    success = s and success
789    if not initiator:
790        return success, None, [0xFFFF] * params.CIS_Count
791
792    # Trigger feature exchange procedures
793    success = readRemoteFeatures(transport, central, initiator.handles[0], trace) and success
794    hasFeatures, _, _ = hasReadRemoteFeaturesCompleteEvent(transport, central, trace, 200);
795    success = hasFeatures and success;
796
797    if enc_keys:
798        success = enable_encryption(transport, central, peripheral, trace, initiator.handles[0], enc_keys) and success
799
800    s, central_cis_handles, peripheral_cis_handles = \
801        establish_cis_connection(transport, central, peripheral, trace, params, initiator.handles[0],
802                                 setup_iso_data_path, use_test_cmd)
803
804    return s and success, initiator, peripheral_cis_handles, central_cis_handles
805
806# Helper function for set_complete_ext_adv_data()/set_complete_ext_scan_response_data()
807def _set_complete_ad_sr_data(transport, idx, handle, fragmentPref, advData, set_data_function):
808    maxFragmentSize = 251
809    remainingAdvData = advData[:]
810    firstFragment = True
811    while (len(remainingAdvData) > 0):
812        if firstFragment:
813            if len(remainingAdvData) <= maxFragmentSize:
814                op = FragmentOperation.COMPLETE_FRAGMENT
815            else:
816                op = FragmentOperation.FIRST_FRAGMENT
817        else:
818            if len(remainingAdvData) <= maxFragmentSize:
819                op = FragmentOperation.LAST_FRAGMENT
820            else:
821                op = FragmentOperation.INTERMEDIATE_FRAGMENT
822        endIndex = maxFragmentSize if len(remainingAdvData) >= maxFragmentSize else len(remainingAdvData)
823        status = set_data_function(transport, idx, handle, op, fragmentPref, remainingAdvData[:endIndex], 100)
824        if status != 0:
825            return False
826        remainingAdvData = remainingAdvData[endIndex:]
827        firstFragment = False
828    return True
829
830"""
831   Sets extended advertising data handling fragmentation as needed. The number of fragments used is kept as low as possible.
832   Returns true if succeeded, false otherwise
833"""
834def set_complete_ext_adv_data(transport, idx, handle, fragmentPref, advData):
835    return _set_complete_ad_sr_data(transport, idx, handle, fragmentPref, advData, le_set_extended_advertising_data)
836
837"""
838   Sets extended advertising scan response data handling fragmentation as needed. The number of fragments used is kept as low as possible.
839   Returns true if succeeded, false otherwise
840"""
841def set_complete_ext_scan_response_data(transport, idx, handle, fragmentPref, advData):
842    return _set_complete_ad_sr_data(transport, idx, handle, fragmentPref, advData, le_set_extended_scan_response_data)
843
844"""
845    Wait for the backend of an ADV_IND packet - will advance the time to be just after the next ADV_IND packet
846    (just after in this case meaning within T_IFS so a response can be sent)
847
848    Returns the ADV_IND packet or None if it fails to find one before the timeout
849"""
850def wait_for_ADV_IND_end(transport, packets, timeout):
851    checkInterval = 100 # Note: Has to be less than T_IFS
852    advIndPacket = None
853    timeout = timeout*1000 # Convert to microseconds
854    while timeout > 0:
855        lastPacket = packets.findLast(packet_filter=('ADV_IND'))
856        if lastPacket:
857            # Check that simulation time is just after the ADV_IND has ended (so we can transmit a response)
858            simulation_time = transport.get_last_t()
859            if simulation_time < lastPacket.ts + get_packet_air_time(lastPacket) + 150:
860                # Success, we can continue
861                advIndPacket = lastPacket
862                break
863        # No packet yet - wait a little and try again
864        transport.wait_until_t(transport.get_last_t() + checkInterval)
865        timeout -= checkInterval
866    return advIndPacket
867
868"""
869    Wait for the backend of an AUX_ADV_IND packet - will advance the time to be just after the next AUX_ADV_IND packet
870    (just after in this case meaning within T_IFS so a response can be sent)
871
872    Returns the AUX_ADV_IND packet
873"""
874def wait_for_AUX_ADV_IND_end(transport, packets):
875    # Get an ADV_EXT_IND with an aux ptr pointing to an AUX_ADV_IND packet that hasn't been sent yet
876    auxAdvIndPacket = None
877    auxAdvIndEndTs = 0
878    while auxAdvIndPacket == None:
879        while True:
880            lastPacket = packets.findLast(packet_filter=('ADV_EXT_IND', 'AUX_ADV_IND'))
881            if not auxAdvIndPacket:
882                auxAdvIndPacket = packets.findLast(packet_filter='AUX_ADV_IND')
883            if lastPacket.type == PacketType.ADV_EXT_IND and auxAdvIndPacket:
884                # Calculate end of offset window
885                offsetEnd = (lastPacket.payload['AuxPtr'].auxOffset + 1) * (30 if lastPacket.payload['AuxPtr'].offsetUnits == 0 else 300)
886                # Expected air time of the AUX_ADV_IND packet (assuming no changes from the last one)
887                airTime = get_packet_air_time(auxAdvIndPacket)
888                # Calculate expected (last possible) end time of the coming AUX_ADV_IND packet
889                auxAdvIndEndTs = int(lastPacket.ts + offsetEnd + airTime)
890                break
891            # Don't have the needed packets yet or the last packet was not an ADV_EXT_IND; Wait a little and try again
892            transport.wait(1)
893        # Wait until the calculated end time (but make sure to always wait at least 1 us to avoid deadlocks)
894        transport.wait_until_t(max(auxAdvIndEndTs + 1, transport.get_last_t() + 1))
895
896        # Verify that the simulation time is within T_IFS of the end of the AUX_ADV_IND packet
897        auxAdvIndPacket = None
898        simulationTime = transport.get_last_t()
899        lastPacket = packets.findLast()
900        if lastPacket.type == PacketType.AUX_ADV_IND and simulationTime < lastPacket.ts + get_packet_air_time(lastPacket) + 150:
901            # Success, we can continue
902            auxAdvIndPacket = lastPacket
903
904    return auxAdvIndPacket
905
906"""
907    Get air time of given packet in microseconds
908"""
909def get_packet_air_time(packet):
910    # Note: Packet air length is: pre-amble + AA + header + payload + CRC
911    return math.ceil(((2 if packet.phy == '2M' else 1) + 4 + 2 + len(packet) + 3)*8/(2 if packet.phy == '2M' else 1))
912
913"""
914LL.TS.p17
9154.10.1.3     Default Values for Set CIG Parameters Commands
916When using either the HCI_LE_Set_CIG_Parameters or HCI_LE_Set_CIG_Parameters_Test commands,
917the following table defines common default parameters for this section. The test case may specify
918different values.
919"""
920class SetCIGParameters:
921    # Known parameter fields
922    data = [
923        # Name,                             Alias,         Per CIS, Default Value
924        ('SDU_Interval_C_To_P',             'sdu_int_m2s', False,   20000),  # 20 ms
925        ('SDU_Interval_P_To_C',             'sdu_int_s2m', False,   20000),  # 20 ms
926        ('ISO_Interval',                    'iso_int',     False,   int(20 // 1.25)), # 20 ms
927        ('CIS_Count',                       'cis_cnt',     False,   1),  # NOTE: Needs to be located before the first "Per CIS" field
928        ('Worst_Case_SCA',                  None,          False,   0),
929        ('Packing',                         'packing',     False,   0),  # Sequential
930        ('Framing',                         'framing',     False,   0),  # Unframed
931        ('NSE',                             'nse',         True,    3),  # Note: Set to 3 or the Max Supported CIS NSE defined in IXIT, whichever is less.
932        ('Max_SDU_C_To_P',                  'mx_sdu_m2s',  True,    None),  # NOTE: Calculated in __init__
933        ('Max_SDU_P_To_C',                  'mx_sdu_s2m',  True,    None),  # NOTE: Calculated in __init__
934        ('Max_PDU_C_To_P',                  'mx_pdu_m2s',  True,    251),
935        ('Max_PDU_P_To_C',                  'mx_pdu_s2m',  True,    251),
936        ('PHY_C_To_P',                      'phy_m2s',     True,    1),  # LE 1M PHY
937        ('PHY_P_To_C',                      'phy_s2m',     True,    1),  # LE 1M PHY
938        ('FT_C_To_P',                       'ft_m2s',      False,   1),
939        ('FT_P_To_C',                       'ft_s2m',      False,   1),
940        ('BN_C_To_P',                       'bn_m2s',      True,    1),
941        ('BN_P_To_C',                       'bn_s2m',      True,    1),
942        ('Max_Transport_Latency_C_To_P',    None,          False,   40000),  # 40 ms
943        ('Max_Transport_Latency_P_To_C',    None,          False,   40000),  # 40 ms
944        ('RTN_C_To_P',                      None,          True,    3),
945        ('RTN_P_To_C',                      None,          True,    3),
946        ('Max_SDU_Supported',               None,          False,   247),  # Maximum ISOAL SDU length
947    ]
948
949    def __init__(self, **kwargs):
950        # Make a list of the known fields
951        fields = [t[0] for t in self.data] + [t[1] for t in self.data]
952
953        # Check for unknown fields
954        for key in kwargs.keys():
955            if key not in fields:
956                raise ValueError('Unknown field {}'.format(key))
957
958        # Dynamically set the attributes of the class instance
959        for (field, alias, per_cis, default) in self.data:
960            # Get supplied value or default
961            value = kwargs.get(field, default)
962
963            # Per CIS field?
964            if per_cis:
965                value = self.per_cis_value(value)
966
967            # Create field attribute
968            setattr(self, field, value)
969
970            # Create alias attribute
971            if alias:
972                setattr(self, alias, value)
973
974        # Calculate the Max_SDU_C_To_P and Max_SDU_P_To_C unless given
975        Max_SDU_C_To_P = [None] * self.CIS_Count
976        Max_SDU_P_To_C = [None] * self.CIS_Count
977
978        for n in range(self.CIS_Count):
979            # Calculate default values
980            Max_SDU_C_To_P[n] = calcMaxIsoSdu(self.Framing, self.BN_C_To_P[n], self.Max_PDU_C_To_P[n],
981                                              self.ISO_Interval * 1.25 * 1000, self.SDU_Interval_C_To_P,
982                                              self.Max_SDU_Supported)
983            Max_SDU_P_To_C[n] = calcMaxIsoSdu(self.Framing, self.BN_P_To_C[n], self.Max_PDU_P_To_C[n],
984                                              self.ISO_Interval * 1.25 * 1000, self.SDU_Interval_P_To_C,
985                                              self.Max_SDU_Supported)
986
987        # Override
988        self.Max_SDU_C_To_P = self.per_cis_value(kwargs.get('Max_SDU_C_To_P', Max_SDU_C_To_P))
989        self.Max_SDU_P_To_C = self.per_cis_value(kwargs.get('Max_SDU_P_To_C', Max_SDU_P_To_C))
990
991    def per_cis_value(self, value):
992        if isinstance(value, list):
993            # List value given, so must match CIS_Count
994            if len(value) != self.CIS_Count:
995                raise ValueError('Field {} has wrong length {}, expected {}'.format(key, len(value), self.CIS_Count))
996        else:
997            # Single value given, so multiply by CIS_Count
998            value = [value] * self.CIS_Count
999        return value
1000
1001    def get_cig_parameters_test(self):
1002        return (self.SDU_Interval_C_To_P, self.SDU_Interval_P_To_C, self.FT_C_To_P, self.FT_P_To_C, self.ISO_Interval,
1003                self.Worst_Case_SCA, self.Packing, self.Framing, self.CIS_Count, list(range(self.CIS_Count)), self.NSE,
1004                self.Max_SDU_C_To_P, self.Max_SDU_P_To_C, self.Max_PDU_C_To_P, self.Max_PDU_P_To_C, self.PHY_C_To_P,
1005                self.PHY_P_To_C, self.BN_C_To_P, self.BN_P_To_C)
1006
1007    def get_cig_parameters(self):
1008        return (self.SDU_Interval_C_To_P, self.SDU_Interval_P_To_C, self.Worst_Case_SCA, self.Packing, self.Framing,
1009                self.Max_Transport_Latency_C_To_P, self.Max_Transport_Latency_P_To_C, self.CIS_Count,
1010                list(range(self.CIS_Count)), self.Max_SDU_C_To_P, self.Max_SDU_P_To_C, self.PHY_C_To_P, self.PHY_P_To_C,
1011                self.RTN_C_To_P, self.RTN_P_To_C)
1012