1# -*- coding: utf-8 -*-
2# Copyright 2019 Oticon A/S
3# SPDX-License-Identifier: Apache-2.0
4
5import math
6from components.utils import *;
7from components.basic_commands import *;
8from components.address import *;
9from components.events import *;
10from components.resolvable import *;
11from components.advertiser import *;
12from components.scanner import *;
13from components.initiator import *;
14from components.addata import *;
15from components.preambles import *;
16from components.dump import PacketType;
17from enum import IntEnum
18
19global lowerRandomAddress, upperRandomAddress;
20
21def verifyAndShowEvent(transport, idx, expectedEvent, trace, to=100):
22
23    event = get_event(transport, idx, to);
24    trace.trace(7, str(event));
25    return event.event == expectedEvent;
26
27def verifyNumCompleteEvents(transport, idx, handle, count, trace, to=100):
28
29    success = True
30    while success and count > 0:
31        event = get_event(transport, idx, to)
32        trace.trace(7, str(event))
33        numHandles, handles, packets = event.decode()
34        success = (event.event == Events.BT_HCI_EVT_NUM_COMPLETED_PACKETS and
35                   numHandles == 1 and handles[0] == handle and success)
36        count -= packets[0]
37
38    return success
39
40def verifyAndShowMetaEvent(transport, idx, expectedEvent, trace):
41
42    event = get_event(transport, idx, 100);
43    trace.trace(7, str(event));
44    return event.subEvent == expectedEvent;
45
46def verifyAndFetchEvent(transport, idx, expectedEvent, trace):
47
48    event = get_event(transport, idx, 100);
49    trace.trace(7, str(event));
50    return event.event == expectedEvent, event;
51
52def verifyAndFetchMetaEvent(transport, idx, expectedEvent, trace, to=100):
53
54    event = get_event(transport, idx, to);
55    trace.trace(7, str(event));
56    return event.subEvent == expectedEvent, event;
57
58def getCommandCompleteEvent(transport, idx, trace):
59
60    return verifyAndShowEvent(transport, idx, Events.BT_HCI_EVT_CMD_COMPLETE, trace);
61
62def readLocalResolvableAddress(transport, idx, identityAddress, trace):
63
64    status, resolvableAddress = le_read_local_resolvable_address(transport, idx, identityAddress.type, identityAddress.address, 100);
65    trace.trace(6, "LE Read Local Resolvable Address returns status: 0x%02X" % status);
66    return getCommandCompleteEvent(transport, idx, trace) and (status == 0), resolvableAddress;
67
68def readPeerResolvableAddress(transport, idx, identityAddress, trace):
69
70    status, resolvableAddress = le_read_peer_resolvable_address(transport, idx, identityAddress.type, identityAddress.address, 100);
71    trace.trace(6, "LE Read Peer Resolvable Address returns status: 0x%02X" % status);
72    return getCommandCompleteEvent(transport, idx, trace) and (status == 0), resolvableAddress;
73
74"""
75    Issue a channel Map Update
76"""
77def channelMapUpdate(transport, idx, channelMap, trace):
78
79    status = le_set_host_channel_classification(transport, idx, toArray(channelMap, 5), 100);
80    trace.trace(6, "LE Set Host Channel Classification returns status: 0x%02X" % status);
81    return getCommandCompleteEvent(transport, idx, trace) and (status == 0);
82
83def setLEEventMask(transport, idx, events, trace):
84
85    status = le_set_event_mask(transport, idx, events, 100);
86    trace.trace(6, "LE Set Event Mask returns status: 0x%02X" % status);
87    return getCommandCompleteEvent(transport, idx, trace) and (status == 0);
88
89def setPrivacyMode(transport, idx, address, mode, trace):
90
91    status = le_set_privacy_mode(transport, idx, address.type, address.address, mode, 100);
92    trace.trace(6, "LE Set Privacy Mode returns status: 0x%02X" % status);
93    return getCommandCompleteEvent(transport, idx, trace) and (status == 0);
94
95def setDataLength(transport, idx, handle, octets, time, trace):
96
97    status, handle = le_set_data_length(transport, idx, handle, octets, time, 100);
98    trace.trace(6, "LE Set Data Length returns status: 0x%02X handle: 0x%04X" % (status, handle));
99    return getCommandCompleteEvent(transport, idx, trace) and (status == 0);
100
101def readBufferSize(transport, idx, trace):
102
103    status, maxPacketLength, maxPacketNumbers = le_read_buffer_size(transport, idx, 100);
104    trace.trace(6, "LE Read Buffer Size returns status: 0x%02X - Data Packet length %d, Number of Data Packets %d" % (status, maxPacketLength, maxPacketNumbers));
105    return getCommandCompleteEvent(transport, idx, trace) and (status == 0), maxPacketLength, maxPacketNumbers;
106
107def readBufferSizeV2(transport, idx, trace):
108
109    status, maxPacketLength, maxPacketNumbers, maxIsoPacketLength, maxIsoPacketNumbers = le_read_buffer_size_v2(transport, idx, 100);
110    trace.trace(6, "LE Read Buffer Size V2 returns status: 0x%02X - Data Packet length %d, Number of Data Packets %d, ISO Data Packet length %d, Number of ISO Data Packets %d" % (status, maxPacketLength, maxPacketNumbers, maxIsoPacketLength, maxIsoPacketNumbers));
111    return getCommandCompleteEvent(transport, idx, trace) and (status == 0), maxPacketLength, maxPacketNumbers, maxIsoPacketLength, maxIsoPacketNumbers;
112
113def readLocalFeatures(transport, idx, trace):
114
115    status, features = le_read_local_supported_features(transport, idx, 100)
116    trace.trace(6, "LE Read Local Supported Features returns status: 0x%02X" % status);
117    return getCommandCompleteEvent(transport, idx, trace) and (status == 0), features;
118
119def readRemoteFeatures(transport, idx, handle, trace):
120
121    status = le_read_remote_features(transport, idx, handle, 100);
122    trace.trace(6, "LE Read Remote Features returns status: 0x%02X" % status);
123    return verifyAndShowEvent(transport, idx, Events.BT_HCI_EVT_CMD_STATUS, trace) and (status == 0);
124
125def readRemoteVersionInformation(transport, idx, handle, trace):
126
127    status = read_remote_version_information(transport, idx, handle, 100);
128    trace.trace(6, "Read Remote Version Information returns status: 0x%02X" % status);
129    return verifyAndShowEvent(transport, idx, Events.BT_HCI_EVT_CMD_STATUS, trace) and (status == 0);
130
131def addAddressesToFilterAcceptList(transport, idx, addresses, trace):
132
133    _addresses = [ [ _.type, toNumber(_.address) ] for _ in addresses ];
134    return preamble_specific_filter_accept_listed(transport, idx, _addresses, trace);
135
136"""
137    Send a DATA package...
138"""
139def writeData(transport, idx, handle, pbFlags, txData, trace):
140
141    status = le_data_write(transport, idx, handle, pbFlags, 0, txData, 100);
142    trace.trace(6, "LE Data Write returns status: 0x%02X" % status);
143    success = status == 0;
144
145    dataSent = has_event(transport, idx, 200)[0];
146    success = success and dataSent;
147    if dataSent:
148        dataSent = verifyAndShowEvent(transport, idx, Events.BT_HCI_EVT_NUM_COMPLETED_PACKETS, trace);
149        success = success and dataSent;
150
151    return success;
152
153def encrypt(transport, idx, key, plaintext, trace):
154
155    status, encrypted = le_encrypt(transport, idx, key, plaintext, 2000);
156    trace.trace(6, "LE Encrypt Command returns status: 0x%02X" % status);
157    success = getCommandCompleteEvent(transport, idx, trace) and (status == 0);
158    return success, encrypted;
159
160"""
161    Read a single DATA Package...
162"""
163def readData(transport, idx, trace, timeout=200):
164    rxData = [];
165
166    dataReady = le_data_ready(transport, idx, timeout);
167    if dataReady:
168        rxPBFlags, rxBCFlags, rxDataPart = le_data_read(transport, idx, 100)[2:];
169        trace.trace(6, "LE Data Read returns PB=%d BC=%d - %2d data bytes: %s" % \
170                       (rxPBFlags, rxBCFlags, len(rxDataPart), formatArray(rxDataPart)));
171        rxData = list(rxDataPart);
172
173    return (len(rxData) > 0), rxData;
174
175"""
176    Read and concatenate multiple DATA Packages...
177"""
178def readDataFragments(transport, idx, trace, timeout=100):
179    success, rxData = True, [];
180
181    while success:
182        dataReady = le_data_ready(transport, idx, timeout);
183        success = success and dataReady;
184        if dataReady:
185            rxPBFlags, rxBCFlags, rxDataPart = le_data_read(transport, idx, 100)[2:];
186            trace.trace(6, "LE Data Read returns PB=%d BC=%d - %2d data bytes: %s" % \
187                           (rxPBFlags, rxBCFlags, len(rxDataPart), formatArray(rxDataPart)));
188            rxData += rxDataPart;
189            timeout = 99;
190
191    return (len(rxData) > 0), rxData;
192
193def hasConnectionUpdateCompleteEvent(transport, idx, trace):
194
195    success, status = has_event(transport, idx, 100)[0], -1;
196    if success:
197        success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_CONN_UPDATE_COMPLETE, trace);
198        if success:
199            status, handle, interval, latency, timeout = event.decode();
200            success = status == 0;
201    return success, status;
202
203def hasChannelSelectionAlgorithmEvent(transport, idx, trace):
204
205    success, status, handle, chSelAlgorithm = has_event(transport, idx, 100)[0], -1, -1, -1;
206    if success:
207        success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_CHAN_SEL_ALGO, trace);
208        if success:
209            handle, chSelAlgorithm = event.decode();
210    return success, handle, chSelAlgorithm;
211
212def hasDataLengthChangedEvent(transport, idx, trace):
213
214    success, handle, maxTxOctets, maxTxTime, maxRxOctets, maxRxTime = has_event(transport, idx, 200)[0], -1, -1, -1, -1, -1;
215    if success:
216        success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_DATA_LEN_CHANGE, trace);
217        if success:
218            handle, maxTxOctets, maxTxTime, maxRxOctets, maxRxTime = event.decode();
219    return success, handle, maxTxOctets, maxTxTime, maxRxOctets, maxRxTime;
220
221def hasReadRemoteFeaturesCompleteEvent(transport, idx, trace):
222
223    success, handle, features = has_event(transport, idx, 100)[0], -1, [];
224    if success:
225        success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_REMOTE_FEAT_COMPLETE, trace);
226        if success:
227            status, handle, features = event.decode();
228            success = status == 0;
229    return success, handle, toArray(features,8);
230
231def hasReadRemoteVersionInformationCompleteEvent(transport, idx, trace):
232
233    success, handle, version, manufacturer, subVersion = has_event(transport, idx, 100)[0], -1, -1, -1, -1;
234    if success:
235        success, event = verifyAndFetchEvent(transport, idx, Events.BT_HCI_EVT_REMOTE_VERSION_INFO, trace);
236        if success:
237            status, handle, version, manufacturer, subVersion = event.decode();
238            success = status == 0;
239    return success, handle, version, manufacturer, subVersion;
240
241def hasLeCisRequestMetaEvent(transport, idx, trace, to):
242
243    success, aclConnectionHandle, cisConnectionHandle, cigId, cisId = has_event(transport, idx, to)[0], -1, -1, -1, -1
244    if success:
245        success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_CIS_REQUEST, trace)
246        if success:
247            aclConnectionHandle, cisConnectionHandle, cigId, cisId = event.decode()
248    return success, aclConnectionHandle, cisConnectionHandle, cigId, cisId
249
250
251def hasLeLtkRequestMetaEvent(transport, idx, trace, to):
252
253    success, handle, rand, ediv = has_event(transport, idx, to)[0], -1, -1, -1
254    if success:
255        success, event = verifyAndFetchMetaEvent(transport, idx, MetaEvents.BT_HCI_EVT_LE_LTK_REQUEST, trace)
256        if success:
257            handle, rand, ediv = event.decode()
258    return success, handle, rand, ediv
259
260
261def hasEncryptionChangeEvent(transport, idx, trace, to):
262    success, status, handle, enabled, key_size = has_event(transport, idx, to)[0], -1, -1, -1, -1
263    if success:
264        event = get_event(transport, idx, to)
265        trace.trace(7, str(event))
266        if event.event == Events.BT_HCI_EVT_ENCRYPT_CHANGE_V1:
267            status, handle, enabled = event.decode()
268        elif event.event == Events.BT_HCI_EVT_ENCRYPT_CHANGE_V2:
269            status, handle, enabled, key_size = event.decode()
270
271    success = status == 0 and success
272
273    return success, handle, enabled, key_size
274
275
276def calcMaxPacketTime(packetLength):
277    #      (preamble + AA + header + packetLength + MIC + CRC) * us/byte
278    return (1        + 4  + 2      + packetLength + 4   + 3  ) * 8
279
280def calcMaxIsoSdu(Framing, BN, Max_PDU, ISO_Interval, SDU_Interval, Max_SDU_Supported):
281    if Framing == 0:
282        return calcUnframedMaxIsoSdu(BN, Max_PDU, ISO_Interval, SDU_Interval, Max_SDU_Supported)
283    elif Framing == 1:
284        return Max_SDU_Supported
285    else:
286        raise ValueError("Framing must be 0 or 1")
287
288def calcUnframedMaxIsoSdu(BN, Max_PDU, ISO_Interval, SDU_Interval, Max_SDU_Supported):
289    # BLUETOOTH CORE SPECIFICATION Version 5.2 | Vol 6, Part G, 2.1 UNFRAMED PDU:
290    #
291    # BN = ceil(Max_SDU / Max_PDU) * (ISO_Interval / SDU_Interval).
292    # (BN / (ISO_Interval / SDU_Interval)) = ceil(Max_SDU / Max_PDU)
293    # (BN / (ISO_Interval / SDU_Interval)) - 1 < Max_SDU / Max_PDU <= (BN / (ISO_Interval / SDU_Interval))
294    # ((BN / (ISO_Interval / SDU_Interval)) - 1) * Max_PDU < Max_SDU <= (BN / (ISO_Interval / SDU_Interval)) * Max_PDU
295    #
296    # Max_SDU = Max_PDU * BN * (SDU_Interval / ISO_Interval)
297
298    Max_SDU = int(Max_PDU * BN * (SDU_Interval / ISO_Interval))
299
300    # Clamp
301    return min(Max_SDU, Max_SDU_Supported)
302
303def matchingReportType(advertiseType):
304
305    if   advertiseType == Advertising.CONNECTABLE_UNDIRECTED:
306        reportType = AdvertisingReport.ADV_IND;
307    elif advertiseType == Advertising.CONNECTABLE_HDC_DIRECTED or advertiseType == Advertising.CONNECTABLE_LDC_DIRECTED:
308        reportType = AdvertisingReport.ADV_DIRECT_IND;
309    elif advertiseType == Advertising.SCANNABLE_UNDIRECTED:
310        reportType = AdvertisingReport.ADV_SCAN_IND;
311    elif advertiseType == Advertising.NON_CONNECTABLE_UNDIRECTED:
312        reportType = AdvertisingReport.ADV_NONCONN_IND;
313    else:
314        reportType = AdvertisingReport.ADV_IND;
315    return reportType;
316
317def publicIdentityAddress(idx):
318
319    return Address( SimpleAddressType.PUBLIC, 0x123456789ABC if idx == 0 else 0x456789ABCDEF );
320
321def randomIdentityAddress(idx):
322    global lowerRandomAddress, upperRandomAddress;
323
324    return Address( SimpleAddressType.RANDOM, upperRandomAddress if idx == 0 else lowerRandomAddress );
325
326def resolvablePublicAddress(idx):
327
328    return Address( ExtendedAddressType.RESOLVABLE_OR_PUBLIC, 0x123456789ABC if idx == 0 else 0x456789ABCDEF );
329
330def setStaticRandomAddress(transport, idx, trace):
331    global lowerRandomAddress, upperRandomAddress;
332
333    address = upperRandomAddress if idx == 0 else lowerRandomAddress;
334    if (toNumber(address)) & 0xC00000000000 != 0xC00000000000:
335        address = toArray(toNumber(address) | 0xC00000000000, 6);
336        preamble_set_random_address(transport, idx, toNumber(address), trace);
337        if idx == 0:
338            upperRandomAddress = address;
339        else:
340            lowerRandomAddress = address;
341
342def setNonResolvableRandomAddress(transport, idx, trace):
343    global lowerRandomAddress, upperRandomAddress;
344
345    address = upperRandomAddress if idx == 0 else lowerRandomAddress;
346    if (toNumber(address)) & 0xC00000000000 != 0x000000000000:
347        address = toArray(toNumber(address) & 0x3FFFFFFFFFFF, 6);
348        preamble_set_random_address(transport, idx, toNumber(address), trace);
349        if idx == 0:
350            upperRandomAddress = address;
351        else:
352            lowerRandomAddress = address;
353
354def setPassiveScanning(transport, scannerId, trace, advertiseType, advertiseReports=100, \
355                       advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, advertiseChannels=AdvertiseChannel.ALL_CHANNELS, \
356                       scanFilter=ScanningFilterPolicy.FILTER_NONE):
357
358    advertiserId = scannerId ^ 1;
359    advertiserAddress = Address( ExtendedAddressType.PUBLIC );
360    peerAddress = publicIdentityAddress( scannerId );
361
362    advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter);
363
364    scannerAddress = Address( ExtendedAddressType.PUBLIC );
365
366    scanner = Scanner(transport, scannerId, trace, ScanType.PASSIVE, matchingReportType(advertiseType), scannerAddress, scanFilter, advertiseReports);
367
368    return advertiser, scanner;
369
370def setActiveScanning(transport, scannerId, trace, advertiseType, advertiseReports=1, advertiseResponses=1, \
371                      advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, advertiseChannels=AdvertiseChannel.ALL_CHANNELS, \
372                      scanFilter=ScanningFilterPolicy.FILTER_NONE):
373
374    advertiserId = scannerId ^ 1;
375    advertiserAddress = Address( ExtendedAddressType.PUBLIC );
376    peerAddress = publicIdentityAddress( scannerId );
377
378    advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter);
379
380    scannerAddress = Address( ExtendedAddressType.PUBLIC );
381
382    scanner = Scanner(transport, scannerId, trace, ScanType.ACTIVE, matchingReportType(advertiseType), scannerAddress, scanFilter, advertiseReports, advertiseResponses);
383
384    return advertiser, scanner;
385
386def setPrivatePassiveScanning(transport, scannerId, trace, advertiseType, advertiseReports=100, \
387                              advertiserAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, scannerAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, \
388                              advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, advertiseChannels=AdvertiseChannel.ALL_CHANNELS, \
389                              scanFilter=ScanningFilterPolicy.FILTER_NONE):
390
391    advertiserId = scannerId ^ 1;
392    advertiserAddress = Address( advertiserAddressType );
393    if (advertiserAddressType == ExtendedAddressType.RANDOM) or (advertiserAddressType == ExtendedAddressType.RESOLVABLE_OR_RANDOM):
394        peerAddress = randomIdentityAddress( scannerId );
395    else:
396        peerAddress = publicIdentityAddress( scannerId );
397
398    advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter);
399
400    scannerAddress = Address( scannerAddressType );
401
402    scanner = Scanner(transport, scannerId, trace, ScanType.PASSIVE, matchingReportType(advertiseType), scannerAddress, scanFilter, advertiseReports);
403
404    return advertiser, scanner;
405
406def setPrivateActiveScanning(transport, scannerId, trace, advertiseType, advertiseReports=1, advertiseResponses=1, \
407                             advertiserAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, scannerAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, \
408                             advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, advertiseChannels=AdvertiseChannel.ALL_CHANNELS, \
409                             scanFilter=ScanningFilterPolicy.FILTER_NONE):
410
411    advertiserId = scannerId ^ 1;
412    advertiserAddress = Address( advertiserAddressType );
413    if (advertiserAddressType == ExtendedAddressType.RANDOM) or (advertiserAddressType == ExtendedAddressType.RESOLVABLE_OR_RANDOM):
414        peerAddress = randomIdentityAddress( scannerId );
415    else:
416        peerAddress = publicIdentityAddress( scannerId );
417
418    advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter);
419
420    scannerAddress = Address( scannerAddressType );
421
422    scanner = Scanner(transport, scannerId, trace, ScanType.ACTIVE, matchingReportType(advertiseType), scannerAddress, scanFilter, advertiseReports, advertiseResponses);
423
424    return advertiser, scanner;
425
426def setPublicInitiator(transport, initiatorId, trace, advertiseType, advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, \
427                       advertiseChannels=AdvertiseChannel.ALL_CHANNELS, initiatorFilterPolicy=InitiatorFilterPolicy.FILTER_NONE):
428
429    advertiserId = initiatorId ^ 1;
430    advertiserAddress = Address( ExtendedAddressType.PUBLIC );
431    peerAddress = publicIdentityAddress( initiatorId );
432
433    advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter);
434
435    initiatorAddress = Address( ExtendedAddressType.PUBLIC );
436    responderAddress = publicIdentityAddress( advertiserId );
437
438    initiator = Initiator(transport, initiatorId, advertiserId, trace, initiatorAddress, responderAddress, initiatorFilterPolicy);
439
440    return advertiser, initiator;
441
442def setPrivateInitiator(transport, initiatorId, trace, advertiseType, advertiserAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, \
443                        initiatorAddressType=ExtendedAddressType.RESOLVABLE_OR_PUBLIC, advertiseFilter=AdvertisingFilterPolicy.FILTER_NONE, \
444                        advertiseChannels=AdvertiseChannel.ALL_CHANNELS, initiatorFilterPolicy=InitiatorFilterPolicy.FILTER_NONE):
445
446    advertiserId = initiatorId ^ 1;
447    advertiserAddress = Address( advertiserAddressType );
448    if (advertiserAddressType == ExtendedAddressType.RANDOM) or (advertiserAddressType == ExtendedAddressType.RESOLVABLE_OR_RANDOM):
449        peerAddress = randomIdentityAddress( initiatorId );
450    else:
451        peerAddress = publicIdentityAddress( initiatorId );
452
453    advertiser = Advertiser(transport, advertiserId, trace, advertiseChannels, advertiseType, advertiserAddress, peerAddress, advertiseFilter);
454
455    initiatorAddress = Address( initiatorAddressType );
456    if (initiatorAddressType == ExtendedAddressType.RANDOM) or (initiatorAddressType == ExtendedAddressType.RESOLVABLE_OR_RANDOM):
457        responderAddress = randomIdentityAddress( advertiserId );
458    else:
459        responderAddress = publicIdentityAddress( advertiserId );
460
461    initiator = Initiator(transport, initiatorId, advertiserId, trace, initiatorAddress, responderAddress, initiatorFilterPolicy);
462
463    return advertiser, initiator;
464
465
466def le_iso_data_write_fragments(transport, idx, trace, conn_handle, data, iso_buffer_len):
467    TsFlag = 0
468    cont = False
469    offset = 0
470    fragments = 0
471    success = True
472
473    while success and offset < len(data):
474        fragment_len = min(iso_buffer_len, len(data[offset:]))
475
476        PbFlag = 0
477        if cont:
478            PbFlag |= 1
479        if len(data[offset:]) <= iso_buffer_len:
480            PbFlag |= 2
481        cont = True
482
483        status = le_iso_data_write(transport, idx, conn_handle, PbFlag, TsFlag, data[offset:offset+fragment_len], 0)
484        success = (status == 0) and success
485
486        offset += fragment_len
487        fragments += 1
488
489    return success, fragments
490
491
492def le_iso_data_write_complete(transport, idx, trace, number_of_packets_written, to):
493    success = True
494    for _ in range(number_of_packets_written):
495        success = le_iso_data_write_rsp(transport, idx, to) == 0 and success
496
497    return success
498
499
500def le_iso_data_write_nbytes(transport, idx, trace, conn_handle, nbytes, pkt_seq_num, iso_buffer_len):
501    iso_data_sdu = tuple([pkt_seq_num] * nbytes)
502    tx_iso_data_load = struct.pack(f'<HH{nbytes}B', pkt_seq_num, nbytes, *iso_data_sdu)
503    success, fragments = le_iso_data_write_fragments(transport, idx, trace, conn_handle, tx_iso_data_load, iso_buffer_len)
504
505    return success, iso_data_sdu
506
507
508def fetch_number_of_completed_packets(transport, idx, trace, number_of_packets_written, to):
509    number_of_completed_packets = {}
510
511    while sum(number_of_completed_packets.values()) < number_of_packets_written:
512        event = get_event(transport, idx, to)
513        if event.event != Events.BT_HCI_EVT_NUM_COMPLETED_PACKETS:
514            break
515
516        num_handles, conn_handles, num_packets = event.decode()
517        for i in range(num_handles):
518            if conn_handles[i] in number_of_completed_packets:
519                number_of_completed_packets[conn_handles[i]] += num_packets[i]
520            else:
521                number_of_completed_packets[conn_handles[i]] = num_packets[i]
522
523    return len(number_of_completed_packets), number_of_completed_packets.keys(), number_of_completed_packets.values()
524
525
526class PbFlags(IntEnum):
527    FIRST = 0
528    CONTINUATION = 1
529    COMPLETE = 2
530    LAST = 3
531
532
533def iso_receive_sdu(transport, idx, trace, sdu_interval):
534    iso_sdu = []
535    iso_sdu_len = 0
536    success = True
537    while success:
538        success = le_iso_data_ready(transport, idx, sdu_interval * 2) and success
539        if not success:
540            return success, -1, tuple([])
541
542        time, handle, pbflags, tsflag, rx_iso_data_load = le_iso_data_read(transport, idx, 100)
543        rx_iso_data_load = bytearray(rx_iso_data_load)
544
545        # Unpack ISO_Data_Load
546        rx_offset = 0
547        # a. Get Time_Stamp if present
548        if tsflag:
549            fmt = '<I'
550            (time_stamp,) = struct.unpack_from(fmt, rx_iso_data_load)
551            rx_offset += struct.calcsize(fmt)
552
553        # FIXME: Temporary workaround for Packetcraft not being along with Core version Sydney.
554        #  Packet_Sequence_Number, Packet_Status_Flag and ISO_SDU_Length shall be sent for all fragments
555        if pbflags == PbFlags.FIRST or pbflags == PbFlags.COMPLETE:
556            # b. Get Packet_Sequence_Number, ISO_SDU_Length and Packet_Status_Flag
557            fmt = '<HH'
558            rx_packet_sequence_number, rx_iso_sdu_length = struct.unpack_from(fmt, rx_iso_data_load, rx_offset)
559            rx_offset += struct.calcsize(fmt)
560            rx_packet_status_flag = rx_iso_sdu_length >> 14
561            rx_iso_sdu_length &= 0xfff  # 12 bits valid
562
563            success = (iso_sdu_len == 0) and success
564            iso_sdu_len = rx_iso_sdu_length
565
566        rx_iso_sdu = rx_iso_data_load[rx_offset:]
567        iso_sdu.extend(rx_iso_sdu)
568
569        # Valid data. The complete ISO_SDU was received correctly.
570        success = (rx_packet_status_flag == 0x00) and success
571
572        if pbflags == PbFlags.LAST or pbflags == PbFlags.COMPLETE:
573            success = (len(iso_sdu) == iso_sdu_len) and success
574            break
575
576        success = (len(iso_sdu) < iso_sdu_len) and success
577
578    return success, handle, tuple(iso_sdu)
579
580
581def iso_send_payload_pdu(transport, transmitter, receiver, trace, conn_handle, max_data_size, sdu_interval, pkt_seq_num,
582                         tx_iso_sdu=None):
583    # Create a ISO_SDU of sdu_size length
584    if not tx_iso_sdu:
585        tx_iso_sdu = tuple([(pkt_seq_num + x) % 255 for x in range(max_data_size)])
586
587    # Pack the ISO_Data_Load (no Time_Stamp) of an HCI ISO Data packet
588    # <Packet_Sequence_Number, ISO_SDU_Length, ISO_SDU>
589    fmt = '<HH{ISO_SDU_Length}B'.format(ISO_SDU_Length=len(tx_iso_sdu))
590    tx_iso_data_load = struct.pack(fmt, pkt_seq_num, len(tx_iso_sdu), *tx_iso_sdu)
591
592    # Transmitter: TX SDU
593    success, _, _, iso_buffer_len, _ = readBufferSizeV2(transport, transmitter, trace)
594    s, fragments = le_iso_data_write_fragments(transport, transmitter, trace, conn_handle, tx_iso_data_load, iso_buffer_len)
595    sucess = s and success
596    success = le_iso_data_write_complete(transport, transmitter, trace, fragments, 100) and success
597    success = verifyNumCompleteEvents(transport, transmitter, conn_handle, fragments, trace, sdu_interval * 2) and success
598
599    s, _, rx_iso_sdu = iso_receive_sdu(transport, receiver, trace, sdu_interval)
600    success = s and success
601
602    # Transmitter: No RX
603    success = not le_iso_data_ready(transport, transmitter, 100) and success
604
605    # Receiver: No more RX data
606    success = not le_iso_data_ready(transport, receiver, 100) and success
607
608    # TX and RX match
609    return (tx_iso_sdu == rx_iso_sdu) and success
610
611
612def iso_send_payload_pdu_parallel(transport, idx_1, idx_2, trace, conn_handle_1, conn_handle_2, max_data_size,
613                                  sdu_interval, pkt_seq_num):
614    # Create a ISO_SDU of sdu_size length
615    tx_iso_sdu = tuple([(pkt_seq_num + x) % 255 for x in range(max_data_size)])
616
617    # Pack the ISO_Data_Load (no Time_Stamp) of an HCI ISO Data packet
618    # <Packet_Sequence_Number, ISO_SDU_Length, ISO_SDU>
619    fmt = '<HH{ISO_SDU_Length}B'.format(ISO_SDU_Length=len(tx_iso_sdu))
620    tx_iso_data_load = struct.pack(fmt, pkt_seq_num, len(tx_iso_sdu), *tx_iso_sdu)
621
622    # Feed TX buffers
623    success, _, _, iso_buffer_len_1, _ = readBufferSizeV2(transport, idx_1, trace)
624    s, _, _, iso_buffer_len_2, _ = readBufferSizeV2(transport, idx_2, trace)
625    success = s and success
626    s, fragments_1 = le_iso_data_write_fragments(transport, idx_1, trace, conn_handle_1, tx_iso_data_load, iso_buffer_len_1)
627    success = s and success
628    s, fragments_2 = le_iso_data_write_fragments(transport, idx_2, trace, conn_handle_2, tx_iso_data_load, iso_buffer_len_2)
629    success = s and success
630
631    # Wait for data to be sent; fetch EDTT command response and Number of Completed packets event
632    success = le_iso_data_write_complete(transport, idx_1, trace, fragments_1, 100) and success
633    success = le_iso_data_write_complete(transport, idx_2, trace, fragments_2, 100) and success
634    success = verifyNumCompleteEvents(transport, idx_1, conn_handle_1, fragments_1, trace) and success
635    success = verifyNumCompleteEvents(transport, idx_2, conn_handle_2, fragments_2, trace) and success
636
637    # Check the data received
638    s, _, rx_iso_sdu = iso_receive_sdu(transport, idx_1, trace, sdu_interval)
639    success = s and tx_iso_sdu == rx_iso_sdu and success
640
641    s, _, rx_iso_sdu = iso_receive_sdu(transport, idx_2, trace, sdu_interval)
642    success = s and tx_iso_sdu == rx_iso_sdu and success
643
644    return success
645
646
647def set_isochronous_channels_host_support(transport, device, trace, value):
648    status = le_set_host_feature(transport, device, FeatureSupport.ISOCHRONOUS_CHANNELS, value, 100)
649    return getCommandCompleteEvent(transport, device, trace) and (status == 0x00)
650
651
652def establish_acl_connection(transport, central, peripheral, trace, interval=None, supervision_timeout=None):
653    advertiser, initiator = setPublicInitiator(transport, central, trace, Advertising.CONNECTABLE_UNDIRECTED)
654    if interval:
655        initiator.intervalMin = initiator.intervalMax = interval
656
657    if supervision_timeout:
658        initiator.supervisionTimeout = supervision_timeout
659
660    success = advertiser.enable()
661    connected = initiator.connect()
662    success = success and connected
663
664    if not connected:
665        success = advertiser.disable() and success
666
667    return success, advertiser, initiator
668
669
670def establish_cis_connection(transport, central, peripheral, trace, params, acl_conn_handle, setup_iso_data_path=True,
671                             use_test_cmd=True):
672    success = True
673
674    # LT: Set CIG Parameters for Test
675    if use_test_cmd:
676        status, cigId, cisCount, central_cis_handles = \
677            le_set_cig_parameters_test(transport, central, 0, *params.get_cig_parameters_test(), 100)
678    else:
679        status, cigId, cisCount, central_cis_handles = \
680            le_set_cig_parameters(transport, central, 0, *params.get_cig_parameters(), 100)
681
682    success = getCommandCompleteEvent(transport, central, trace) and (status == 0x00) and success
683    central_acl_handles = [acl_conn_handle] * cisCount
684    peripheral_cis_handles = [-1] * cisCount
685
686    # LT: Create CIS
687    status = le_create_cis(transport, central, cisCount, central_cis_handles, central_acl_handles, 100)
688    success = verifyAndShowEvent(transport, central, Events.BT_HCI_EVT_CMD_STATUS, trace) and (status == 0) and success
689
690    for n in range(cisCount):
691        # UT: Wait for HCI_EVT_LE_CIS_REQUEST
692        s, event = verifyAndFetchMetaEvent(transport, peripheral, MetaEvents.BT_HCI_EVT_LE_CIS_REQUEST, trace)
693        success = s and success
694        _, peripheral_cis_handles[n], cigId, cisId = event.decode()
695
696        # UT: Accept CIS Request
697        status = le_accept_cis_request(transport, peripheral, peripheral_cis_handles[n], 100)
698        success = verifyAndShowEvent(transport, peripheral, Events.BT_HCI_EVT_CMD_STATUS, trace) and (status == 0) and success
699
700        # LT: Wait for HCI_EVT_LE_CIS_ESTABLISHED
701        s, event = verifyAndFetchMetaEvent(transport, central, MetaEvents.BT_HCI_EVT_LE_CIS_ESTABLISHED, trace, 2000)
702        success = s and (event.decode()[0] == 0x00) and success
703
704        # UT: Wait for HCI_EVT_LE_CIS_ESTABLISHED
705        s, event = verifyAndFetchMetaEvent(transport, peripheral, MetaEvents.BT_HCI_EVT_LE_CIS_ESTABLISHED, trace)
706        success = s and (event.decode()[0] == 0x00) and success
707
708    if not setup_iso_data_path:
709        return success, central_cis_handles, peripheral_cis_handles
710
711    for n in range(cisCount):
712        if (params.Max_SDU_C_To_P != 0):
713            # LT: Setup Data Path - Data_Path_Direction=0 (Input)  Data_Path_ID=1 (HCI) Codec_ID=0 Controller_Delay=0 Codec_Configuration_Length=0 Codec_Configuration=NULL
714            status, _ = le_setup_iso_data_path(transport, central, central_cis_handles[n], 0, 0, [0, 0, 0, 0, 0], 0, 0, [], 100)
715            success = getCommandCompleteEvent(transport, central, trace) and (status == 0x00) and success
716
717            # UT: Setup Data Path - Data_Path_Direction=1 (Output) Data_Path_ID=1 (HCI) Codec_ID=0 Controller_Delay=0 Codec_Configuration_Length=0 Codec_Configuration=NULL
718            status, _ = le_setup_iso_data_path(transport, peripheral, peripheral_cis_handles[n], 1, 0, [0, 0, 0, 0, 0], 0, 0, [], 100)
719            success = getCommandCompleteEvent(transport, peripheral, trace) and (status == 0x00) and success
720
721        if (params.Max_SDU_P_To_C != 0):
722            # LT: Setup Data Path - Data_Path_Direction=1 (Output)  Data_Path_ID=1 (HCI) Codec_ID=0 Controller_Delay=0 Codec_Configuration_Length=0 Codec_Configuration=NULL
723            status, _ = le_setup_iso_data_path(transport, central, central_cis_handles[n], 1, 0, [0, 0, 0, 0, 0], 0, 0, [], 100)
724            success = getCommandCompleteEvent(transport, central, trace) and (status == 0x00) and success
725
726            # UT: Setup Data Path - Data_Path_Direction=0 (Input) Data_Path_ID=1 (HCI) Codec_ID=0 Controller_Delay=0 Codec_Configuration_Length=0 Codec_Configuration=NULL
727            status, _ = le_setup_iso_data_path(transport, peripheral, peripheral_cis_handles[n], 0, 0, [0, 0, 0, 0, 0], 0, 0, [], 100)
728            success = getCommandCompleteEvent(transport, peripheral, trace) and (status == 0x00) and success
729
730    return success, central_cis_handles, peripheral_cis_handles
731
732
733def calc_supervision_timeout(iso_interval_ms):
734    """
735    Calculate the Supervision timeout for the LE Link that can be used to create a CIG with given ISO_Interval
736    :param iso_interval_ms: ISO_Interval in milliseconds
737    :return: Supervision timeout for the LE Link
738    """
739    # TS 4.10.1.2 Timing Requirements
740    # "The connSupervisionTimeout for an ACL with associated CISes shall be greater than twice that of the
741    #  ISO_Intervals of the associated CISes."
742    supervision_timeout_ms = int(iso_interval_ms * 2 + 250)
743    assert (supervision_timeout_ms < 32000)
744
745    return int(supervision_timeout_ms / 10)
746
747
748def enable_encryption(transport, central, peripheral, trace, conn_handle_c, keys):
749    rand = keys[0]
750    ediv = keys[1]
751    ltk = keys[2]
752
753    status = le_start_encryption(transport, central, conn_handle_c, rand, ediv, ltk, 100)
754    success = verifyAndShowEvent(transport, central, Events.BT_HCI_EVT_CMD_STATUS, trace, 1000) and status == 0x00
755
756    s, conn_handle_p, req_rand, req_ediv = hasLeLtkRequestMetaEvent(transport, peripheral, trace, 1000)
757    success = s and req_rand == rand and req_ediv == ediv and success
758
759    status, handle = le_long_term_key_request_reply(transport, peripheral, conn_handle_p, ltk, 1000)
760    success = getCommandCompleteEvent(transport, peripheral, trace) and status == 0x00 and \
761              handle == conn_handle_p and success
762
763    s, handle, enabled, key_size = hasEncryptionChangeEvent(transport, peripheral, trace, 1000)
764    success = s and handle == conn_handle_p and enabled == 0x01 and success
765
766    s, handle, enabled, key_size = hasEncryptionChangeEvent(transport, central, trace, 1000)
767    success = s and handle == conn_handle_c and enabled == 0x01 and success
768
769    return success
770
771
772def state_connected_isochronous_stream(transport, peripheral, central, trace, params,
773                                       setup_iso_data_path=True, enc_keys=None, use_test_cmd=True,
774                                       adjust_conn_interval=False):
775    # The Isochronous Channels (Host Support) FeatureSet bit is set.
776    success = set_isochronous_channels_host_support(transport, peripheral, trace, 1)
777    success = set_isochronous_channels_host_support(transport, central, trace, 1) and success
778
779    # Adjust connection interval to avoid CIG and ACL events collision
780    if adjust_conn_interval:
781        conn_interval = params.ISO_Interval
782    else:
783        conn_interval = None
784
785    ### ACL Connection Established. IUT (upperTester) is Peripheral. ###
786    s, advertiser, initiator = establish_acl_connection(transport, central, peripheral, trace, conn_interval,
787                                                        calc_supervision_timeout(params.ISO_Interval * 1.25))
788    success = s and success
789    if not initiator:
790        return success, None, [0xFFFF] * params.CIS_Count
791
792    if enc_keys:
793        success = enable_encryption(transport, central, peripheral, trace, initiator.handles[0], enc_keys) and success
794
795    s, central_cis_handles, peripheral_cis_handles = \
796        establish_cis_connection(transport, central, peripheral, trace, params, initiator.handles[0],
797                                 setup_iso_data_path, use_test_cmd)
798
799    return s and success, initiator, peripheral_cis_handles, central_cis_handles
800
801# Helper function for set_complete_ext_adv_data()/set_complete_ext_scan_response_data()
802def _set_complete_ad_sr_data(transport, idx, handle, fragmentPref, advData, set_data_function):
803    maxFragmentSize = 251
804    remainingAdvData = advData[:]
805    firstFragment = True
806    while (len(remainingAdvData) > 0):
807        if firstFragment:
808            if len(remainingAdvData) <= maxFragmentSize:
809                op = FragmentOperation.COMPLETE_FRAGMENT
810            else:
811                op = FragmentOperation.FIRST_FRAGMENT
812        else:
813            if len(remainingAdvData) <= maxFragmentSize:
814                op = FragmentOperation.LAST_FRAGMENT
815            else:
816                op = FragmentOperation.INTERMEDIATE_FRAGMENT
817        endIndex = maxFragmentSize if len(remainingAdvData) >= maxFragmentSize else len(remainingAdvData)
818        status = set_data_function(transport, idx, handle, op, fragmentPref, remainingAdvData[:endIndex], 100)
819        if status != 0:
820            return False
821        remainingAdvData = remainingAdvData[endIndex:]
822        firstFragment = False
823    return True
824
825"""
826   Sets extended advertising data handling fragmentation as needed. The number of fragments used is kept as low as possible.
827   Returns true if succeeded, false otherwise
828"""
829def set_complete_ext_adv_data(transport, idx, handle, fragmentPref, advData):
830    return _set_complete_ad_sr_data(transport, idx, handle, fragmentPref, advData, le_set_extended_advertising_data)
831
832"""
833   Sets extended advertising scan response data handling fragmentation as needed. The number of fragments used is kept as low as possible.
834   Returns true if succeeded, false otherwise
835"""
836def set_complete_ext_scan_response_data(transport, idx, handle, fragmentPref, advData):
837    return _set_complete_ad_sr_data(transport, idx, handle, fragmentPref, advData, le_set_extended_scan_response_data)
838
839"""
840    Wait for the backend of an ADV_IND packet - will advance the time to be just after the next ADV_IND packet
841    (just after in this case meaning within T_IFS so a response can be sent)
842
843    Returns the ADV_IND packet or None if it fails to find one before the timeout
844"""
845def wait_for_ADV_IND_end(transport, packets, timeout):
846    checkInterval = 100 # Note: Has to be less than T_IFS
847    advIndPacket = None
848    timeout = timeout*1000 # Convert to microseconds
849    while timeout > 0:
850        lastPacket = packets.findLast(packet_filter=('ADV_IND'))
851        if lastPacket:
852            # Check that simulation time is just after the ADV_IND has ended (so we can transmit a response)
853            simulation_time = transport.get_last_t()
854            if simulation_time < lastPacket.ts + get_packet_air_time(lastPacket) + 150:
855                # Success, we can continue
856                advIndPacket = lastPacket
857                break
858        # No packet yet - wait a little and try again
859        transport.wait_until_t(transport.get_last_t() + checkInterval)
860        timeout -= checkInterval
861    return advIndPacket
862
863"""
864    Wait for the backend of an AUX_ADV_IND packet - will advance the time to be just after the next AUX_ADV_IND packet
865    (just after in this case meaning within T_IFS so a response can be sent)
866
867    Returns the AUX_ADV_IND packet
868"""
869def wait_for_AUX_ADV_IND_end(transport, packets):
870    # Get an ADV_EXT_IND with an aux ptr pointing to an AUX_ADV_IND packet that hasn't been sent yet
871    auxAdvIndPacket = None
872    auxAdvIndEndTs = 0
873    while auxAdvIndPacket == None:
874        while True:
875            lastPacket = packets.findLast(packet_filter=('ADV_EXT_IND', 'AUX_ADV_IND'))
876            if not auxAdvIndPacket:
877                auxAdvIndPacket = packets.findLast(packet_filter='AUX_ADV_IND')
878            if lastPacket.type == PacketType.ADV_EXT_IND and auxAdvIndPacket:
879                # Calculate end of offset window
880                offsetEnd = (lastPacket.payload['AuxPtr'].auxOffset + 1) * (30 if lastPacket.payload['AuxPtr'].offsetUnits == 0 else 300)
881                # Expected air time of the AUX_ADV_IND packet (assuming no changes from the last one)
882                airTime = get_packet_air_time(auxAdvIndPacket)
883                # Calculate expected (last possible) end time of the coming AUX_ADV_IND packet
884                auxAdvIndEndTs = int(lastPacket.ts + offsetEnd + airTime)
885                break
886            # Don't have the needed packets yet or the last packet was not an ADV_EXT_IND; Wait a little and try again
887            transport.wait(1)
888        # Wait until the calculated end time (but make sure to always wait at least 1 us to avoid deadlocks)
889        transport.wait_until_t(max(auxAdvIndEndTs + 1, transport.get_last_t() + 1))
890
891        # Verify that the simulation time is within T_IFS of the end of the AUX_ADV_IND packet
892        auxAdvIndPacket = None
893        simulationTime = transport.get_last_t()
894        lastPacket = packets.findLast()
895        if lastPacket.type == PacketType.AUX_ADV_IND and simulationTime < lastPacket.ts + get_packet_air_time(lastPacket) + 150:
896            # Success, we can continue
897            auxAdvIndPacket = lastPacket
898
899    return auxAdvIndPacket
900
901"""
902    Get air time of given packet in microseconds
903"""
904def get_packet_air_time(packet):
905    # Note: Packet air length is: pre-amble + AA + header + payload + CRC
906    return math.ceil(((2 if packet.phy == '2M' else 1) + 4 + 2 + len(packet) + 3)*8/(2 if packet.phy == '2M' else 1))
907
908"""
909LL.TS.p17
9104.10.1.3     Default Values for Set CIG Parameters Commands
911When using either the HCI_LE_Set_CIG_Parameters or HCI_LE_Set_CIG_Parameters_Test commands,
912the following table defines common default parameters for this section. The test case may specify
913different values.
914"""
915class SetCIGParameters:
916    # Known parameter fields
917    data = [
918        # Name,                             Alias,         Per CIS, Default Value
919        ('SDU_Interval_C_To_P',             'sdu_int_m2s', False,   20000),  # 20 ms
920        ('SDU_Interval_P_To_C',             'sdu_int_s2m', False,   20000),  # 20 ms
921        ('ISO_Interval',                    'iso_int',     False,   int(20 // 1.25)), # 20 ms
922        ('CIS_Count',                       'cis_cnt',     False,   1),  # NOTE: Needs to be located before the first "Per CIS" field
923        ('Worst_Case_SCA',                  None,          False,   0),
924        ('Packing',                         'packing',     False,   0),  # Sequential
925        ('Framing',                         'framing',     False,   0),  # Unframed
926        ('NSE',                             'nse',         True,    3),  # Note: Set to 3 or the Max Supported CIS NSE defined in IXIT, whichever is less.
927        ('Max_SDU_C_To_P',                  'mx_sdu_m2s',  True,    None),  # NOTE: Calculated in __init__
928        ('Max_SDU_P_To_C',                  'mx_sdu_s2m',  True,    None),  # NOTE: Calculated in __init__
929        ('Max_PDU_C_To_P',                  'mx_pdu_m2s',  True,    251),
930        ('Max_PDU_P_To_C',                  'mx_pdu_s2m',  True,    251),
931        ('PHY_C_To_P',                      'phy_m2s',     True,    1),  # LE 1M PHY
932        ('PHY_P_To_C',                      'phy_s2m',     True,    1),  # LE 1M PHY
933        ('FT_C_To_P',                       'ft_m2s',      False,   1),
934        ('FT_P_To_C',                       'ft_s2m',      False,   1),
935        ('BN_C_To_P',                       'bn_m2s',      True,    1),
936        ('BN_P_To_C',                       'bn_s2m',      True,    1),
937        ('Max_Transport_Latency_C_To_P',    None,          False,   40000),  # 40 ms
938        ('Max_Transport_Latency_P_To_C',    None,          False,   40000),  # 40 ms
939        ('RTN_C_To_P',                      None,          True,    3),
940        ('RTN_P_To_C',                      None,          True,    3),
941        ('Max_SDU_Supported',               None,          False,   247),  # Maximum ISOAL SDU length
942    ]
943
944    def __init__(self, **kwargs):
945        # Make a list of the known fields
946        fields = [t[0] for t in self.data] + [t[1] for t in self.data]
947
948        # Check for unknown fields
949        for key in kwargs.keys():
950            if key not in fields:
951                raise ValueError('Unknown field {}'.format(key))
952
953        # Dynamically set the attributes of the class instance
954        for (field, alias, per_cis, default) in self.data:
955            # Get supplied value or default
956            value = kwargs.get(field, default)
957
958            # Per CIS field?
959            if per_cis:
960                value = self.per_cis_value(value)
961
962            # Create field attribute
963            setattr(self, field, value)
964
965            # Create alias attribute
966            if alias:
967                setattr(self, alias, value)
968
969        # Calculate the Max_SDU_C_To_P and Max_SDU_P_To_C unless given
970        Max_SDU_C_To_P = [None] * self.CIS_Count
971        Max_SDU_P_To_C = [None] * self.CIS_Count
972
973        for n in range(self.CIS_Count):
974            # Calculate default values
975            Max_SDU_C_To_P[n] = calcMaxIsoSdu(self.Framing, self.BN_C_To_P[n], self.Max_PDU_C_To_P[n],
976                                              self.ISO_Interval * 1.25 * 1000, self.SDU_Interval_C_To_P,
977                                              self.Max_SDU_Supported)
978            Max_SDU_P_To_C[n] = calcMaxIsoSdu(self.Framing, self.BN_P_To_C[n], self.Max_PDU_P_To_C[n],
979                                              self.ISO_Interval * 1.25 * 1000, self.SDU_Interval_P_To_C,
980                                              self.Max_SDU_Supported)
981
982        # Override
983        self.Max_SDU_C_To_P = self.per_cis_value(kwargs.get('Max_SDU_C_To_P', Max_SDU_C_To_P))
984        self.Max_SDU_P_To_C = self.per_cis_value(kwargs.get('Max_SDU_P_To_C', Max_SDU_P_To_C))
985
986    def per_cis_value(self, value):
987        if isinstance(value, list):
988            # List value given, so must match CIS_Count
989            if len(value) != self.CIS_Count:
990                raise ValueError('Field {} has wrong length {}, expected {}'.format(key, len(value), self.CIS_Count))
991        else:
992            # Single value given, so multiply by CIS_Count
993            value = [value] * self.CIS_Count
994        return value
995
996    def get_cig_parameters_test(self):
997        return (self.SDU_Interval_C_To_P, self.SDU_Interval_P_To_C, self.FT_C_To_P, self.FT_P_To_C, self.ISO_Interval,
998                self.Worst_Case_SCA, self.Packing, self.Framing, self.CIS_Count, list(range(self.CIS_Count)), self.NSE,
999                self.Max_SDU_C_To_P, self.Max_SDU_P_To_C, self.Max_PDU_C_To_P, self.Max_PDU_P_To_C, self.PHY_C_To_P,
1000                self.PHY_P_To_C, self.BN_C_To_P, self.BN_P_To_C)
1001
1002    def get_cig_parameters(self):
1003        return (self.SDU_Interval_C_To_P, self.SDU_Interval_P_To_C, self.Worst_Case_SCA, self.Packing, self.Framing,
1004                self.Max_Transport_Latency_C_To_P, self.Max_Transport_Latency_P_To_C, self.CIS_Count,
1005                list(range(self.CIS_Count)), self.Max_SDU_C_To_P, self.Max_SDU_P_To_C, self.PHY_C_To_P, self.PHY_P_To_C,
1006                self.RTN_C_To_P, self.RTN_P_To_C)
1007