1 /*
2 * Copyright (c) 2015 Intel Corporation
3 * Copyright (c) 2023 Arm Limited (or its affiliates). All rights reserved.
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 */
7
8 #include <zephyr/logging/log.h>
9 LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
10
11 #include <zephyr/kernel.h>
12
13 #include <zephyr/linker/sections.h>
14 #include <zephyr/toolchain.h>
15
16 #include <zephyr/net/socket.h>
17 #include <zephyr/net/zperf.h>
18
19 #include "zperf_internal.h"
20 #include "zperf_session.h"
21
22 /* To get net_sprint_ipv{4|6}_addr() */
23 #define NET_LOG_ENABLED 1
24 #include "net_private.h"
25
26 #if defined(CONFIG_NET_TC_THREAD_COOPERATIVE)
27 #define TCP_RECEIVER_THREAD_PRIORITY K_PRIO_COOP(8)
28 #else
29 #define TCP_RECEIVER_THREAD_PRIORITY K_PRIO_PREEMPT(8)
30 #endif
31
32 #define TCP_RECEIVER_STACK_SIZE 2048
33
34 #define SOCK_ID_IPV4_LISTEN 0
35 #define SOCK_ID_IPV6_LISTEN 1
36 #define SOCK_ID_MAX (CONFIG_NET_ZPERF_MAX_SESSIONS + 2)
37
38 #define TCP_RECEIVER_BUF_SIZE 1500
39 #define POLL_TIMEOUT_MS 100
40
41 static K_THREAD_STACK_DEFINE(tcp_receiver_stack_area, TCP_RECEIVER_STACK_SIZE);
42 static struct k_thread tcp_receiver_thread_data;
43
44 static zperf_callback tcp_session_cb;
45 static void *tcp_user_data;
46 static bool tcp_server_running;
47 static bool tcp_server_stop;
48 static uint16_t tcp_server_port;
49 static struct sockaddr tcp_server_addr;
50 static K_SEM_DEFINE(tcp_server_run, 0, 1);
51
tcp_received(const struct sockaddr * addr,size_t datalen)52 static void tcp_received(const struct sockaddr *addr, size_t datalen)
53 {
54 struct session *session;
55 int64_t time;
56
57 time = k_uptime_ticks();
58
59 session = get_session(addr, SESSION_TCP);
60 if (!session) {
61 NET_ERR("Cannot get a session!");
62 return;
63 }
64
65 switch (session->state) {
66 case STATE_COMPLETED:
67 case STATE_NULL:
68 zperf_reset_session_stats(session);
69 session->start_time = k_uptime_ticks();
70 session->state = STATE_ONGOING;
71
72 if (tcp_session_cb != NULL) {
73 tcp_session_cb(ZPERF_SESSION_STARTED, NULL,
74 tcp_user_data);
75 }
76
77 __fallthrough;
78 case STATE_ONGOING:
79 session->counter++;
80 session->length += datalen;
81
82 if (datalen == 0) { /* EOF */
83 struct zperf_results results = { 0 };
84
85 session->state = STATE_COMPLETED;
86
87 results.total_len = session->length;
88 results.time_in_us = k_ticks_to_us_ceil32(
89 time - session->start_time);
90
91 if (tcp_session_cb != NULL) {
92 tcp_session_cb(ZPERF_SESSION_FINISHED, &results,
93 tcp_user_data);
94 }
95 }
96 break;
97 default:
98 NET_ERR("Unsupported case");
99 }
100 }
101
tcp_bind_listen_connection(struct zsock_pollfd * pollfd,struct sockaddr * address)102 static int tcp_bind_listen_connection(struct zsock_pollfd *pollfd,
103 struct sockaddr *address)
104 {
105 uint16_t port;
106 int ret;
107
108 if (address->sa_family == AF_INET) {
109 port = ntohs(net_sin(address)->sin_port);
110 } else {
111 port = ntohs(net_sin6(address)->sin6_port);
112 }
113
114 ret = zsock_bind(pollfd->fd, address, sizeof(*address));
115 if (ret < 0) {
116 NET_ERR("Cannot bind IPv%d TCP port %d (%d)",
117 (address->sa_family == AF_INET ? 4 : 6), port, errno);
118 goto out;
119 }
120
121 ret = zsock_listen(pollfd->fd, 1);
122 if (ret < 0) {
123 NET_ERR("Cannot listen IPv%d TCP (%d)",
124 (address->sa_family == AF_INET ? 4 : 6), errno);
125 goto out;
126 }
127
128 pollfd->events = ZSOCK_POLLIN;
129
130 out:
131 return ret;
132 }
133
tcp_session_error_report(void)134 static void tcp_session_error_report(void)
135 {
136 if (tcp_session_cb != NULL) {
137 tcp_session_cb(ZPERF_SESSION_ERROR, NULL, tcp_user_data);
138 }
139 }
140
tcp_server_session(void)141 static void tcp_server_session(void)
142 {
143 static uint8_t buf[TCP_RECEIVER_BUF_SIZE];
144 static struct zsock_pollfd fds[SOCK_ID_MAX];
145 static struct sockaddr sock_addr[SOCK_ID_MAX];
146 int ret;
147
148 for (int i = 0; i < ARRAY_SIZE(fds); i++) {
149 fds[i].fd = -1;
150 }
151
152 if (IS_ENABLED(CONFIG_NET_IPV4)) {
153 struct sockaddr_in *in4_addr = zperf_get_sin();
154 const struct in_addr *addr = NULL;
155
156 fds[SOCK_ID_IPV4_LISTEN].fd = zsock_socket(AF_INET, SOCK_STREAM,
157 IPPROTO_TCP);
158 if (fds[SOCK_ID_IPV4_LISTEN].fd < 0) {
159 NET_ERR("Cannot create IPv4 network socket.");
160 goto error;
161 }
162
163 addr = &net_sin(&tcp_server_addr)->sin_addr;
164
165 if (!net_ipv4_is_addr_unspecified(addr)) {
166 memcpy(&in4_addr->sin_addr, addr,
167 sizeof(struct in_addr));
168 } else if (strlen(MY_IP4ADDR ? MY_IP4ADDR : "")) {
169 /* Use Setting IP */
170 ret = zperf_get_ipv4_addr(MY_IP4ADDR,
171 &in4_addr->sin_addr);
172 if (ret < 0) {
173 NET_WARN("Unable to set IPv4");
174 goto use_existing_ipv4;
175 }
176 } else {
177 use_existing_ipv4:
178 /* Use existing IP */
179 addr = zperf_get_default_if_in4_addr();
180 if (!addr) {
181 NET_ERR("Unable to get IPv4 by default");
182 goto error;
183 }
184 memcpy(&in4_addr->sin_addr, addr,
185 sizeof(struct in_addr));
186 }
187
188 in4_addr->sin_port = htons(tcp_server_port);
189
190 NET_INFO("Binding to %s",
191 net_sprint_ipv4_addr(&in4_addr->sin_addr));
192
193 memcpy(net_sin(&sock_addr[SOCK_ID_IPV4_LISTEN]), in4_addr,
194 sizeof(struct sockaddr_in));
195
196 ret = tcp_bind_listen_connection(
197 &fds[SOCK_ID_IPV4_LISTEN],
198 &sock_addr[SOCK_ID_IPV4_LISTEN]);
199 if (ret < 0) {
200 goto error;
201 }
202 }
203
204 if (IS_ENABLED(CONFIG_NET_IPV6)) {
205 struct sockaddr_in6 *in6_addr = zperf_get_sin6();
206 const struct in6_addr *addr = NULL;
207
208 fds[SOCK_ID_IPV6_LISTEN].fd = zsock_socket(AF_INET6, SOCK_STREAM,
209 IPPROTO_TCP);
210 if (fds[SOCK_ID_IPV6_LISTEN].fd < 0) {
211 NET_ERR("Cannot create IPv6 network socket.");
212 goto error;
213 }
214
215 addr = &net_sin6(&tcp_server_addr)->sin6_addr;
216
217 if (!net_ipv6_is_addr_unspecified(addr)) {
218 memcpy(&in6_addr->sin6_addr, addr,
219 sizeof(struct in6_addr));
220 } else if (strlen(MY_IP6ADDR ? MY_IP6ADDR : "")) {
221 /* Use Setting IP */
222 ret = zperf_get_ipv6_addr(MY_IP6ADDR,
223 MY_PREFIX_LEN_STR,
224 &in6_addr->sin6_addr);
225 if (ret < 0) {
226 NET_WARN("Unable to set IPv6");
227 goto use_existing_ipv6;
228 }
229 } else {
230 use_existing_ipv6:
231 /* Use existing IP */
232 addr = zperf_get_default_if_in6_addr();
233 if (!addr) {
234 NET_ERR("Unable to get IPv6 by default");
235 goto error;
236 }
237 memcpy(&in6_addr->sin6_addr, addr,
238 sizeof(struct in6_addr));
239 }
240
241 in6_addr->sin6_port = htons(tcp_server_port);
242
243 NET_INFO("Binding to %s",
244 net_sprint_ipv6_addr(&in6_addr->sin6_addr));
245
246 memcpy(net_sin6(&sock_addr[SOCK_ID_IPV6_LISTEN]), in6_addr,
247 sizeof(struct sockaddr_in6));
248
249 ret = tcp_bind_listen_connection(
250 &fds[SOCK_ID_IPV6_LISTEN],
251 &sock_addr[SOCK_ID_IPV6_LISTEN]);
252 if (ret < 0) {
253 goto error;
254 }
255 }
256
257 NET_INFO("Listening on port %d", tcp_server_port);
258
259 while (true) {
260 ret = zsock_poll(fds, ARRAY_SIZE(fds), POLL_TIMEOUT_MS);
261 if (ret < 0) {
262 NET_ERR("TCP receiver poll error (%d)", errno);
263 goto error;
264 }
265
266 if (tcp_server_stop) {
267 goto cleanup;
268 }
269
270 if (ret == 0) {
271 continue;
272 }
273
274 for (int i = 0; i < ARRAY_SIZE(fds); i++) {
275 if ((fds[i].revents & ZSOCK_POLLERR) ||
276 (fds[i].revents & ZSOCK_POLLNVAL)) {
277 NET_ERR("TCP receiver IPv%d socket error",
278 (sock_addr[i].sa_family == AF_INET
279 ? 4 : 6));
280 goto error;
281 }
282
283 if (!(fds[i].revents & ZSOCK_POLLIN)) {
284 continue;
285 }
286
287 if ((i >= SOCK_ID_IPV4_LISTEN) && (i <= SOCK_ID_IPV6_LISTEN)) {
288 int j = SOCK_ID_IPV6_LISTEN + 1;
289 struct sockaddr addr_incoming_conn;
290 socklen_t addrlen = sizeof(struct sockaddr);
291 int sock = zsock_accept(fds[i].fd,
292 &addr_incoming_conn,
293 &addrlen);
294
295 if (sock < 0) {
296 NET_ERR("TCP receiver IPv%d accept error",
297 (sock_addr[i].sa_family == AF_INET
298 ? 4 : 6));
299 goto error;
300 }
301
302 for (; j < SOCK_ID_MAX; j++) {
303 if (fds[j].fd < 0) {
304 break;
305 }
306 }
307
308 if (j == SOCK_ID_MAX) {
309 /* Too many connections. */
310 NET_ERR("Dropping TCP connection, reached maximum limit.");
311 zsock_close(sock);
312 } else {
313 fds[j].fd = sock;
314 fds[j].events = ZSOCK_POLLIN;
315 memcpy(&sock_addr[j],
316 &addr_incoming_conn,
317 addrlen);
318 }
319 } else if ((i > SOCK_ID_IPV6_LISTEN) && (i < SOCK_ID_MAX)) {
320 ret = zsock_recv(fds[i].fd, buf, sizeof(buf), 0);
321 if (ret < 0) {
322 NET_ERR("recv failed on IPv%d socket (%d)",
323 (sock_addr[i].sa_family == AF_INET
324 ? 4 : 6),
325 errno);
326 tcp_session_error_report();
327 /* This will close the zperf session */
328 ret = 0;
329 }
330
331 tcp_received(&sock_addr[i], ret);
332
333 if (ret == 0) {
334 zsock_close(fds[i].fd);
335 fds[i].fd = -1;
336 memset(&sock_addr[i], 0,
337 sizeof(struct sockaddr));
338 }
339 } else {
340 goto error;
341 }
342 }
343 }
344
345 error:
346 tcp_session_error_report();
347
348 cleanup:
349 for (int i = 0; i < ARRAY_SIZE(fds); i++) {
350 if (fds[i].fd >= 0) {
351 zsock_close(fds[i].fd);
352 memset(&sock_addr[i], 0, sizeof(struct sockaddr));
353 }
354 }
355 }
356
tcp_receiver_thread(void * ptr1,void * ptr2,void * ptr3)357 void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
358 {
359 ARG_UNUSED(ptr1);
360 ARG_UNUSED(ptr2);
361 ARG_UNUSED(ptr3);
362
363 while (true) {
364 k_sem_take(&tcp_server_run, K_FOREVER);
365
366 tcp_server_session();
367
368 tcp_server_running = false;
369 }
370 }
371
zperf_tcp_receiver_init(void)372 void zperf_tcp_receiver_init(void)
373 {
374 k_thread_create(&tcp_receiver_thread_data,
375 tcp_receiver_stack_area,
376 K_THREAD_STACK_SIZEOF(tcp_receiver_stack_area),
377 tcp_receiver_thread,
378 NULL, NULL, NULL,
379 TCP_RECEIVER_THREAD_PRIORITY,
380 IS_ENABLED(CONFIG_USERSPACE) ? K_USER |
381 K_INHERIT_PERMS : 0,
382 K_NO_WAIT);
383 }
384
zperf_tcp_download(const struct zperf_download_params * param,zperf_callback callback,void * user_data)385 int zperf_tcp_download(const struct zperf_download_params *param,
386 zperf_callback callback, void *user_data)
387 {
388 if (param == NULL || callback == NULL) {
389 return -EINVAL;
390 }
391
392 if (tcp_server_running) {
393 return -EALREADY;
394 }
395
396 tcp_session_cb = callback;
397 tcp_user_data = user_data;
398 tcp_server_port = param->port;
399 tcp_server_running = true;
400 tcp_server_stop = false;
401 memcpy(&tcp_server_addr, ¶m->addr, sizeof(struct sockaddr));
402
403 k_sem_give(&tcp_server_run);
404
405 return 0;
406 }
407
zperf_tcp_download_stop(void)408 int zperf_tcp_download_stop(void)
409 {
410 if (!tcp_server_running) {
411 return -EALREADY;
412 }
413
414 tcp_server_stop = true;
415 tcp_session_cb = NULL;
416
417 return 0;
418 }
419