1 /*
2  * Copyright (c) 2024 Nordic Semiconductor ASA
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <autoconf.h>
8 #include <errno.h>
9 #include <stdbool.h>
10 #include <stddef.h>
11 #include <stdint.h>
12 
13 #include <zephyr/bluetooth/audio/cap.h>
14 #include <zephyr/bluetooth/audio/bap.h>
15 #include <zephyr/bluetooth/bluetooth.h>
16 #include <zephyr/bluetooth/hci_types.h>
17 #include <zephyr/bluetooth/iso.h>
18 #include <zephyr/kernel.h>
19 #include <zephyr/kernel/thread_stack.h>
20 #include <zephyr/logging/log.h>
21 #include <zephyr/logging/log_core.h>
22 #include <zephyr/net_buf.h>
23 #include <zephyr/sys/atomic.h>
24 #include <zephyr/sys/atomic_types.h>
25 #include <zephyr/sys/byteorder.h>
26 #include <zephyr/sys/util.h>
27 #include <zephyr/sys/util_macro.h>
28 #include <zephyr/types.h>
29 
30 #include "bap_stream_tx.h"
31 #include "common.h"
32 
33 /** Enqueue at least 2 per stream, but otherwise equal distribution based on the buf count */
34 #define ENQUEUE_CNT MAX(2, (CONFIG_BT_ISO_TX_BUF_COUNT / CONFIG_BT_ISO_MAX_CHAN))
35 
36 LOG_MODULE_REGISTER(bap_stream_tx, LOG_LEVEL_INF);
37 
38 struct tx_stream {
39 	struct bt_bap_stream *bap_stream;
40 	uint16_t seq_num;
41 	atomic_t enqueued;
42 };
43 
44 static struct tx_stream tx_streams[CONFIG_BT_ISO_MAX_CHAN];
45 
stream_is_streaming(const struct bt_bap_stream * bap_stream)46 static bool stream_is_streaming(const struct bt_bap_stream *bap_stream)
47 {
48 	struct bt_bap_ep_info ep_info;
49 	int err;
50 
51 	if (bap_stream == NULL) {
52 		return false;
53 	}
54 
55 	/* No-op if stream is not configured */
56 	if (bap_stream->ep == NULL) {
57 		return false;
58 	}
59 
60 	err = bt_bap_ep_get_info(bap_stream->ep, &ep_info);
61 	if (err != 0) {
62 		return false;
63 	}
64 
65 	if (ep_info.iso_chan == NULL || ep_info.iso_chan->state != BT_ISO_STATE_CONNECTED) {
66 		return false;
67 	}
68 
69 	return ep_info.state == BT_BAP_EP_STATE_STREAMING;
70 }
71 
tx_thread_func(void * arg1,void * arg2,void * arg3)72 static void tx_thread_func(void *arg1, void *arg2, void *arg3)
73 {
74 	NET_BUF_POOL_FIXED_DEFINE(tx_pool, CONFIG_BT_ISO_TX_BUF_COUNT,
75 				  BT_ISO_SDU_BUF_SIZE(CONFIG_BT_ISO_TX_MTU),
76 				  CONFIG_BT_CONN_TX_USER_DATA_SIZE, NULL);
77 
78 	/* This loop will attempt to send on all streams in the streaming state in a round robin
79 	 * fashion.
80 	 * The TX is controlled by the number of buffers configured, and increasing
81 	 * CONFIG_BT_ISO_TX_BUF_COUNT will allow for more streams in parallel, or to submit more
82 	 * buffers per stream.
83 	 * Once a buffer has been freed by the stack, it triggers the next TX.
84 	 */
85 	while (true) {
86 		int err = -ENOEXEC;
87 
88 		for (size_t i = 0U; i < ARRAY_SIZE(tx_streams); i++) {
89 			struct bt_bap_stream *bap_stream = tx_streams[i].bap_stream;
90 
91 			if (stream_is_streaming(bap_stream) &&
92 			    atomic_get(&tx_streams[i].enqueued) < ENQUEUE_CNT) {
93 				struct net_buf *buf;
94 
95 				buf = net_buf_alloc(&tx_pool, K_FOREVER);
96 				net_buf_reserve(buf, BT_ISO_CHAN_SEND_RESERVE);
97 
98 				net_buf_add_mem(buf, mock_iso_data, bap_stream->qos->sdu);
99 
100 				err = bt_bap_stream_send(bap_stream, buf, tx_streams[i].seq_num);
101 				if (err == 0) {
102 					tx_streams[i].seq_num++;
103 					atomic_inc(&tx_streams[i].enqueued);
104 				} else {
105 					if (!stream_is_streaming(bap_stream)) {
106 						/* Can happen if we disconnected while waiting for a
107 						 * buffer - Ignore
108 						 */
109 					} else {
110 						FAIL("Unable to send: %d", err);
111 					}
112 
113 					net_buf_unref(buf);
114 				}
115 			} /* No-op if stream is not streaming */
116 		}
117 
118 		if (err != 0) {
119 			/* In case of any errors, retry with a delay */
120 			k_sleep(K_MSEC(10));
121 		}
122 	}
123 }
124 
bap_stream_tx_register(struct bt_bap_stream * bap_stream)125 int bap_stream_tx_register(struct bt_bap_stream *bap_stream)
126 {
127 	if (bap_stream == NULL) {
128 		return -EINVAL;
129 	}
130 
131 	if (!bap_stream_tx_can_send(bap_stream)) {
132 		return -EINVAL;
133 	}
134 
135 	for (size_t i = 0U; i < ARRAY_SIZE(tx_streams); i++) {
136 		if (tx_streams[i].bap_stream == NULL) {
137 			tx_streams[i].bap_stream = bap_stream;
138 			tx_streams[i].seq_num = 0U;
139 
140 			LOG_INF("Registered %p for TX", bap_stream);
141 
142 			return 0;
143 		}
144 	}
145 
146 	return -ENOMEM;
147 }
148 
bap_stream_tx_unregister(struct bt_bap_stream * bap_stream)149 int bap_stream_tx_unregister(struct bt_bap_stream *bap_stream)
150 {
151 	if (bap_stream == NULL) {
152 		return -EINVAL;
153 	}
154 
155 	for (size_t i = 0U; i < ARRAY_SIZE(tx_streams); i++) {
156 		if (tx_streams[i].bap_stream == bap_stream) {
157 			tx_streams[i].bap_stream = NULL;
158 			atomic_set(&tx_streams[i].enqueued, 0);
159 
160 			LOG_INF("Unregistered %p for TX", bap_stream);
161 
162 			return 0;
163 		}
164 	}
165 
166 	return -ENODATA;
167 }
168 
bap_stream_tx_init(void)169 void bap_stream_tx_init(void)
170 {
171 	static bool thread_started;
172 
173 	if (!thread_started) {
174 		static K_KERNEL_STACK_DEFINE(tx_thread_stack, 1024U);
175 		const int tx_thread_prio = K_PRIO_PREEMPT(5);
176 		static struct k_thread tx_thread;
177 
178 		k_thread_create(&tx_thread, tx_thread_stack, K_KERNEL_STACK_SIZEOF(tx_thread_stack),
179 				tx_thread_func, NULL, NULL, NULL, tx_thread_prio, 0, K_NO_WAIT);
180 		k_thread_name_set(&tx_thread, "TX thread");
181 		thread_started = true;
182 	}
183 }
184 
bap_stream_tx_can_send(const struct bt_bap_stream * stream)185 bool bap_stream_tx_can_send(const struct bt_bap_stream *stream)
186 {
187 	struct bt_bap_ep_info info;
188 	int err;
189 
190 	if (stream == NULL || stream->ep == NULL) {
191 		return false;
192 	}
193 
194 	err = bt_bap_ep_get_info(stream->ep, &info);
195 	if (err != 0) {
196 		return false;
197 	}
198 
199 	return info.can_send;
200 }
201 
bap_stream_tx_sent_cb(struct bt_bap_stream * stream)202 void bap_stream_tx_sent_cb(struct bt_bap_stream *stream)
203 {
204 	struct audio_test_stream *test_stream = audio_test_stream_from_bap_stream(stream);
205 
206 	if ((test_stream->tx_cnt % 100U) == 0U) {
207 		LOG_INF("Stream %p sent %zu SDUs", stream, test_stream->tx_cnt);
208 	}
209 
210 	test_stream->tx_cnt++;
211 
212 	for (size_t i = 0U; i < ARRAY_SIZE(tx_streams); i++) {
213 		if (tx_streams[i].bap_stream == stream) {
214 			const atomic_val_t old = atomic_dec(&tx_streams[i].enqueued);
215 
216 			if (old == 0) {
217 				FAIL("Old enqueue count was 0");
218 			}
219 		}
220 	}
221 }
222