1 /*
2  * Copyright (c) 2015 Intel Corporation
3  * Copyright (c) 2023 Arm Limited (or its affiliates). All rights reserved.
4  *
5  * SPDX-License-Identifier: Apache-2.0
6  */
7 
8 #include <zephyr/logging/log.h>
9 LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
10 
11 #include <zephyr/kernel.h>
12 
13 #include <zephyr/linker/sections.h>
14 #include <zephyr/toolchain.h>
15 
16 #include <zephyr/net/socket.h>
17 #include <zephyr/net/zperf.h>
18 
19 #include "zperf_internal.h"
20 #include "zperf_session.h"
21 
22 /* To get net_sprint_ipv{4|6}_addr() */
23 #define NET_LOG_ENABLED 1
24 #include "net_private.h"
25 
26 #if defined(CONFIG_NET_TC_THREAD_COOPERATIVE)
27 #define TCP_RECEIVER_THREAD_PRIORITY K_PRIO_COOP(8)
28 #else
29 #define TCP_RECEIVER_THREAD_PRIORITY K_PRIO_PREEMPT(8)
30 #endif
31 
32 #define TCP_RECEIVER_STACK_SIZE 2048
33 
34 #define SOCK_ID_IPV4_LISTEN 0
35 #define SOCK_ID_IPV6_LISTEN 1
36 #define SOCK_ID_MAX         (CONFIG_NET_ZPERF_MAX_SESSIONS + 2)
37 
38 #define TCP_RECEIVER_BUF_SIZE 1500
39 #define POLL_TIMEOUT_MS 100
40 
41 static K_THREAD_STACK_DEFINE(tcp_receiver_stack_area, TCP_RECEIVER_STACK_SIZE);
42 static struct k_thread tcp_receiver_thread_data;
43 
44 static zperf_callback tcp_session_cb;
45 static void *tcp_user_data;
46 static bool tcp_server_running;
47 static bool tcp_server_stop;
48 static uint16_t tcp_server_port;
49 static struct sockaddr tcp_server_addr;
50 static K_SEM_DEFINE(tcp_server_run, 0, 1);
51 
tcp_received(const struct sockaddr * addr,size_t datalen)52 static void tcp_received(const struct sockaddr *addr, size_t datalen)
53 {
54 	struct session *session;
55 	int64_t time;
56 
57 	time = k_uptime_ticks();
58 
59 	session = get_session(addr, SESSION_TCP);
60 	if (!session) {
61 		NET_ERR("Cannot get a session!");
62 		return;
63 	}
64 
65 	switch (session->state) {
66 	case STATE_COMPLETED:
67 	case STATE_NULL:
68 		zperf_reset_session_stats(session);
69 		session->start_time = k_uptime_ticks();
70 		session->state = STATE_ONGOING;
71 
72 		if (tcp_session_cb != NULL) {
73 			tcp_session_cb(ZPERF_SESSION_STARTED, NULL,
74 				       tcp_user_data);
75 		}
76 
77 		__fallthrough;
78 	case STATE_ONGOING:
79 		session->counter++;
80 		session->length += datalen;
81 
82 		if (datalen == 0) { /* EOF */
83 			struct zperf_results results = { 0 };
84 
85 			session->state = STATE_COMPLETED;
86 
87 			results.total_len = session->length;
88 			results.time_in_us = k_ticks_to_us_ceil32(
89 						time - session->start_time);
90 
91 			if (tcp_session_cb != NULL) {
92 				tcp_session_cb(ZPERF_SESSION_FINISHED, &results,
93 					       tcp_user_data);
94 			}
95 		}
96 		break;
97 	default:
98 		NET_ERR("Unsupported case");
99 	}
100 }
101 
tcp_bind_listen_connection(struct zsock_pollfd * pollfd,struct sockaddr * address)102 static int tcp_bind_listen_connection(struct zsock_pollfd *pollfd,
103 				      struct sockaddr *address)
104 {
105 	uint16_t port;
106 	int ret;
107 
108 	if (address->sa_family == AF_INET) {
109 		port = ntohs(net_sin(address)->sin_port);
110 	} else {
111 		port = ntohs(net_sin6(address)->sin6_port);
112 	}
113 
114 	ret = zsock_bind(pollfd->fd, address, sizeof(*address));
115 	if (ret < 0) {
116 		NET_ERR("Cannot bind IPv%d TCP port %d (%d)",
117 			(address->sa_family == AF_INET ? 4 : 6), port, errno);
118 		goto out;
119 	}
120 
121 	ret = zsock_listen(pollfd->fd, 1);
122 	if (ret < 0) {
123 		NET_ERR("Cannot listen IPv%d TCP (%d)",
124 			(address->sa_family == AF_INET ? 4 : 6), errno);
125 		goto out;
126 	}
127 
128 	pollfd->events = ZSOCK_POLLIN;
129 
130 out:
131 	return ret;
132 }
133 
tcp_session_error_report(void)134 static void tcp_session_error_report(void)
135 {
136 	if (tcp_session_cb != NULL) {
137 		tcp_session_cb(ZPERF_SESSION_ERROR, NULL, tcp_user_data);
138 	}
139 }
140 
tcp_server_session(void)141 static void tcp_server_session(void)
142 {
143 	static uint8_t buf[TCP_RECEIVER_BUF_SIZE];
144 	static struct zsock_pollfd fds[SOCK_ID_MAX];
145 	static struct sockaddr sock_addr[SOCK_ID_MAX];
146 	int ret;
147 
148 	for (int i = 0; i < ARRAY_SIZE(fds); i++) {
149 		fds[i].fd = -1;
150 	}
151 
152 	if (IS_ENABLED(CONFIG_NET_IPV4)) {
153 		struct sockaddr_in *in4_addr = zperf_get_sin();
154 		const struct in_addr *addr = NULL;
155 
156 		fds[SOCK_ID_IPV4_LISTEN].fd = zsock_socket(AF_INET, SOCK_STREAM,
157 							   IPPROTO_TCP);
158 		if (fds[SOCK_ID_IPV4_LISTEN].fd < 0) {
159 			NET_ERR("Cannot create IPv4 network socket.");
160 			goto error;
161 		}
162 
163 		addr = &net_sin(&tcp_server_addr)->sin_addr;
164 
165 		if (!net_ipv4_is_addr_unspecified(addr)) {
166 			memcpy(&in4_addr->sin_addr, addr,
167 				sizeof(struct in_addr));
168 		} else if (strlen(MY_IP4ADDR ? MY_IP4ADDR : "")) {
169 			/* Use Setting IP */
170 			ret = zperf_get_ipv4_addr(MY_IP4ADDR,
171 						  &in4_addr->sin_addr);
172 			if (ret < 0) {
173 				NET_WARN("Unable to set IPv4");
174 				goto use_existing_ipv4;
175 			}
176 		} else {
177 use_existing_ipv4:
178 			/* Use existing IP */
179 			addr = zperf_get_default_if_in4_addr();
180 			if (!addr) {
181 				NET_ERR("Unable to get IPv4 by default");
182 				goto error;
183 			}
184 			memcpy(&in4_addr->sin_addr, addr,
185 				sizeof(struct in_addr));
186 		}
187 
188 		in4_addr->sin_port = htons(tcp_server_port);
189 
190 		NET_INFO("Binding to %s",
191 			 net_sprint_ipv4_addr(&in4_addr->sin_addr));
192 
193 		memcpy(net_sin(&sock_addr[SOCK_ID_IPV4_LISTEN]), in4_addr,
194 		       sizeof(struct sockaddr_in));
195 
196 		ret = tcp_bind_listen_connection(
197 				&fds[SOCK_ID_IPV4_LISTEN],
198 				&sock_addr[SOCK_ID_IPV4_LISTEN]);
199 		if (ret < 0) {
200 			goto error;
201 		}
202 	}
203 
204 	if (IS_ENABLED(CONFIG_NET_IPV6)) {
205 		struct sockaddr_in6 *in6_addr = zperf_get_sin6();
206 		const struct in6_addr *addr = NULL;
207 
208 		fds[SOCK_ID_IPV6_LISTEN].fd = zsock_socket(AF_INET6, SOCK_STREAM,
209 							   IPPROTO_TCP);
210 		if (fds[SOCK_ID_IPV6_LISTEN].fd < 0) {
211 			NET_ERR("Cannot create IPv6 network socket.");
212 			goto error;
213 		}
214 
215 		addr = &net_sin6(&tcp_server_addr)->sin6_addr;
216 
217 		if (!net_ipv6_is_addr_unspecified(addr)) {
218 			memcpy(&in6_addr->sin6_addr, addr,
219 			       sizeof(struct in6_addr));
220 		} else if (strlen(MY_IP6ADDR ? MY_IP6ADDR : "")) {
221 			/* Use Setting IP */
222 			ret = zperf_get_ipv6_addr(MY_IP6ADDR,
223 						  MY_PREFIX_LEN_STR,
224 						  &in6_addr->sin6_addr);
225 			if (ret < 0) {
226 				NET_WARN("Unable to set IPv6");
227 				goto use_existing_ipv6;
228 			}
229 		} else {
230 use_existing_ipv6:
231 			/* Use existing IP */
232 			addr = zperf_get_default_if_in6_addr();
233 			if (!addr) {
234 				NET_ERR("Unable to get IPv6 by default");
235 				goto error;
236 			}
237 			memcpy(&in6_addr->sin6_addr, addr,
238 				sizeof(struct in6_addr));
239 		}
240 
241 		in6_addr->sin6_port = htons(tcp_server_port);
242 
243 		NET_INFO("Binding to %s",
244 			 net_sprint_ipv6_addr(&in6_addr->sin6_addr));
245 
246 		memcpy(net_sin6(&sock_addr[SOCK_ID_IPV6_LISTEN]), in6_addr,
247 		       sizeof(struct sockaddr_in6));
248 
249 		ret = tcp_bind_listen_connection(
250 				&fds[SOCK_ID_IPV6_LISTEN],
251 				&sock_addr[SOCK_ID_IPV6_LISTEN]);
252 		if (ret < 0) {
253 			goto error;
254 		}
255 	}
256 
257 	NET_INFO("Listening on port %d", tcp_server_port);
258 
259 	while (true) {
260 		ret = zsock_poll(fds, ARRAY_SIZE(fds), POLL_TIMEOUT_MS);
261 		if (ret < 0) {
262 			NET_ERR("TCP receiver poll error (%d)", errno);
263 			goto error;
264 		}
265 
266 		if (tcp_server_stop) {
267 			goto cleanup;
268 		}
269 
270 		if (ret == 0) {
271 			continue;
272 		}
273 
274 		for (int i = 0; i < ARRAY_SIZE(fds); i++) {
275 			if ((fds[i].revents & ZSOCK_POLLERR) ||
276 			    (fds[i].revents & ZSOCK_POLLNVAL)) {
277 				NET_ERR("TCP receiver IPv%d socket error",
278 					(sock_addr[i].sa_family == AF_INET
279 						? 4 : 6));
280 				goto error;
281 			}
282 
283 			if (!(fds[i].revents & ZSOCK_POLLIN)) {
284 				continue;
285 			}
286 
287 			if ((i >= SOCK_ID_IPV4_LISTEN) && (i <= SOCK_ID_IPV6_LISTEN)) {
288 				int j = SOCK_ID_IPV6_LISTEN + 1;
289 				struct sockaddr addr_incoming_conn;
290 				socklen_t addrlen = sizeof(struct sockaddr);
291 				int sock = zsock_accept(fds[i].fd,
292 							&addr_incoming_conn,
293 							&addrlen);
294 
295 				if (sock < 0) {
296 					NET_ERR("TCP receiver IPv%d accept error",
297 						(sock_addr[i].sa_family == AF_INET
298 							? 4 : 6));
299 					goto error;
300 				}
301 
302 				for (; j < SOCK_ID_MAX; j++) {
303 					if (fds[j].fd < 0) {
304 						break;
305 					}
306 				}
307 
308 				if (j == SOCK_ID_MAX) {
309 					/* Too many connections. */
310 					NET_ERR("Dropping TCP connection, reached maximum limit.");
311 					zsock_close(sock);
312 				} else {
313 					fds[j].fd = sock;
314 					fds[j].events = ZSOCK_POLLIN;
315 					memcpy(&sock_addr[j],
316 					       &addr_incoming_conn,
317 					       addrlen);
318 				}
319 			} else if ((i > SOCK_ID_IPV6_LISTEN) && (i < SOCK_ID_MAX)) {
320 				ret = zsock_recv(fds[i].fd, buf, sizeof(buf), 0);
321 				if (ret < 0) {
322 					NET_ERR("recv failed on IPv%d socket (%d)",
323 						(sock_addr[i].sa_family == AF_INET
324 							? 4 : 6),
325 						errno);
326 					tcp_session_error_report();
327 					/* This will close the zperf session */
328 					ret = 0;
329 				}
330 
331 				tcp_received(&sock_addr[i], ret);
332 
333 				if (ret == 0) {
334 					zsock_close(fds[i].fd);
335 					fds[i].fd = -1;
336 					memset(&sock_addr[i], 0,
337 					sizeof(struct sockaddr));
338 				}
339 			} else {
340 				goto error;
341 			}
342 		}
343 	}
344 
345 error:
346 	tcp_session_error_report();
347 
348 cleanup:
349 	for (int i = 0; i < ARRAY_SIZE(fds); i++) {
350 		if (fds[i].fd >= 0) {
351 			zsock_close(fds[i].fd);
352 			memset(&sock_addr[i], 0, sizeof(struct sockaddr));
353 		}
354 	}
355 }
356 
tcp_receiver_thread(void * ptr1,void * ptr2,void * ptr3)357 void tcp_receiver_thread(void *ptr1, void *ptr2, void *ptr3)
358 {
359 	ARG_UNUSED(ptr1);
360 	ARG_UNUSED(ptr2);
361 	ARG_UNUSED(ptr3);
362 
363 	while (true) {
364 		k_sem_take(&tcp_server_run, K_FOREVER);
365 
366 		tcp_server_session();
367 
368 		tcp_server_running = false;
369 	}
370 }
371 
zperf_tcp_receiver_init(void)372 void zperf_tcp_receiver_init(void)
373 {
374 	k_thread_create(&tcp_receiver_thread_data,
375 			tcp_receiver_stack_area,
376 			K_THREAD_STACK_SIZEOF(tcp_receiver_stack_area),
377 			tcp_receiver_thread,
378 			NULL, NULL, NULL,
379 			TCP_RECEIVER_THREAD_PRIORITY,
380 			IS_ENABLED(CONFIG_USERSPACE) ? K_USER |
381 						       K_INHERIT_PERMS : 0,
382 			K_NO_WAIT);
383 }
384 
zperf_tcp_download(const struct zperf_download_params * param,zperf_callback callback,void * user_data)385 int zperf_tcp_download(const struct zperf_download_params *param,
386 		       zperf_callback callback, void *user_data)
387 {
388 	if (param == NULL || callback == NULL) {
389 		return -EINVAL;
390 	}
391 
392 	if (tcp_server_running) {
393 		return -EALREADY;
394 	}
395 
396 	tcp_session_cb = callback;
397 	tcp_user_data = user_data;
398 	tcp_server_port = param->port;
399 	tcp_server_running = true;
400 	tcp_server_stop = false;
401 	memcpy(&tcp_server_addr, &param->addr, sizeof(struct sockaddr));
402 
403 	k_sem_give(&tcp_server_run);
404 
405 	return 0;
406 }
407 
zperf_tcp_download_stop(void)408 int zperf_tcp_download_stop(void)
409 {
410 	if (!tcp_server_running) {
411 		return -EALREADY;
412 	}
413 
414 	tcp_server_stop = true;
415 	tcp_session_cb = NULL;
416 
417 	return 0;
418 }
419