# -*- coding: utf-8 -*- # Copyright 2021 Oticon A/S # SPDX-License-Identifier: Apache-2.0 import numpy import random import math from components.addata import * from components.address import * from components.advertiser import * from components.basic_commands import * from components.initiator import * from components.pairing import * from components.preambles import * from components.resolvable import * from components.scanner import * from components.test_spec import TestSpec from components.utils import * from collections import namedtuple from tests.test_utils import * global lowerIRK, upperIRK, lowerRandomAddress, upperRandomAddress def peripheral_send_single_sdu_cis(transport, peripheral, central, trace, params): success, initiator, (cis_conn_handle,), _ = \ state_connected_isochronous_stream(transport, peripheral, central, trace, params) if not initiator: return success # Test procedure # 1.The Upper Tester sends an HCI ISO Data Packet to the IUT with data length less than or equal to # the Max Data Length # 2.The IUT sends a single ISO Data PDU to the Lower Tester with the specified LLID # and Payload Data identical to the data in step 1. Framing shall be 0 if LLID is 0b00 # and shall be 1 if LLID is 0b10 # Maximum data size to fit into a single PDU/SDU if params.Framing == 1: max_data_size = params.Max_SDU_P_To_C[0] - 5 else: max_data_size = params.Max_SDU_P_To_C[0] success = iso_send_payload_pdu(transport, peripheral, central, trace, cis_conn_handle, max_data_size, params.SDU_Interval_P_To_C, 1) and success ### TERMINATION ### success = initiator.disconnect(0x13) and success return success def central_receive_single_sdu_cis(transport, central, peripheral, trace, params): return peripheral_send_single_sdu_cis(transport, peripheral, central, trace, params) def peripheral_receive_single_sdu_cis(transport, peripheral, central, trace, params): success, initiator, _, (cis_conn_handle,) = \ state_connected_isochronous_stream(transport, peripheral, central, trace, params) if not initiator: return success # Maximum data size to fit into a single PDU/SDU if params.Framing == 1: max_data_size = params.Max_SDU_C_To_P[0] - 5 else: max_data_size = params.Max_SDU_C_To_P[0] success = iso_send_payload_pdu(transport, central, peripheral, trace, cis_conn_handle, max_data_size, params.SDU_Interval_C_To_P, 1) and success ### TERMINATION ### success = initiator.disconnect(0x13) and success return success def central_send_single_sdu_cis(transport, central, peripheral, trace, params): return peripheral_receive_single_sdu_cis(transport, peripheral, central, trace, params) def peripheral_simultanous_sending_and_receiving_sdus(transport, peripheral, central, trace, params): success, initiator, (peripheral_cis_handle,), (central_cis_handle,) = \ state_connected_isochronous_stream(transport, peripheral, central, trace, params) if not initiator: return success # Test procedure # 1. The Upper Tester sends HCI ISO Data Packets to the IUT. # 2. The IUT sends ISO Data PDUs to the Lower Tester. # 3. At the same time, the Lower Tester sends ISO Data PDUs to the IUT. # 4. The IUT sends Isochronous Data SDUs to the Upper Tester. # Maximum data size to fit into a single PDU/SDU if params.Framing == 1: max_data_size = params.Max_SDU_C_To_P[0] - 5 else: max_data_size = params.Max_SDU_C_To_P[0] success = iso_send_payload_pdu_parallel(transport, central, peripheral, trace, central_cis_handle, peripheral_cis_handle, max_data_size, params.SDU_Interval_C_To_P, 1) and success ### TERMINATION ### success = initiator.disconnect(0x13) and success return success def central_simultanous_sending_and_receiving_sdus(transport, central, peripheral, trace, params): return peripheral_simultanous_sending_and_receiving_sdus(transport, peripheral, central, trace, params) def send_multiple_small_sdu_cis(transport, transmitter, receiver, trace, cis_handle, iso_interval, sdu_interval): # Create a ISO_SDUs of TS defined size tx_sdu_1 = tuple([0] * 20) tx_sdu_2 = tuple([1] * 25) # Pack the ISO_Data_Load (no Time_Stamp) of an HCI ISO Data packet # iso_data_pkt_1 = struct.pack(f' 4: success = success and test_f(transport, 0, 1, trace, device_dumps) elif test_f.__code__.co_argcount > 3: success = success and test_f(transport, 0, 1, trace) else: success = success and test_f(transport, 0, trace) except Exception as e: import traceback traceback.print_exc() trace.trace(3, "Test generated exception: %s" % str(e)) success = False return not success