1 /*
2  * Copyright (c) 2024, Nordic Semiconductor
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <stdio.h>
8 
9 #include <zephyr/posix/sys/socket.h>
10 #include <zephyr/posix/poll.h>
11 
12 #include <zephyr/kernel.h>
13 #include <zephyr/net/tls_credentials.h>
14 #include <zephyr/net/http/server.h>
15 #include <zephyr/net/http/service.h>
16 #include <zephyr/net/net_ip.h>
17 #include <zephyr/net/socket.h>
18 #include <zephyr/net/websocket.h>
19 #include <zephyr/net/net_mgmt.h>
20 #include <zephyr/net/net_stats.h>
21 #include <zephyr/init.h>
22 
23 #include <zephyr/logging/log.h>
24 LOG_MODULE_DECLARE(net_http_server_sample, LOG_LEVEL_DBG);
25 
26 #if defined(CONFIG_NET_SOCKETS_SOCKOPT_TLS) || defined(CONFIG_COVERAGE_GCOV)
27 #define STACK_SIZE 4096
28 #else
29 #define STACK_SIZE 2048
30 #endif
31 
32 #if defined(CONFIG_NET_TC_THREAD_COOPERATIVE)
33 #define THREAD_PRIORITY K_PRIO_COOP(CONFIG_NUM_COOP_PRIORITIES - 1)
34 #else
35 #define THREAD_PRIORITY K_PRIO_PREEMPT(8)
36 #endif
37 
38 #if defined(CONFIG_USERSPACE)
39 #include <zephyr/app_memory/app_memdomain.h>
40 extern struct k_mem_partition app_partition;
41 extern struct k_mem_domain app_domain;
42 #define APP_BMEM K_APP_BMEM(app_partition)
43 #define APP_DMEM K_APP_DMEM(app_partition)
44 #else
45 #define APP_BMEM
46 #define APP_DMEM
47 #endif
48 
49 #define MAX_CLIENT_QUEUE CONFIG_NET_SAMPLE_NUM_WEBSOCKET_HANDLERS
50 #define RECV_BUFFER_SIZE 1280
51 
52 struct ws_netstats_ctx {
53 	int sock;
54 	struct k_work_delayable work;
55 };
56 
57 K_THREAD_STACK_ARRAY_DEFINE(ws_handler_stack,
58 			    CONFIG_NET_SAMPLE_NUM_WEBSOCKET_HANDLERS,
59 			    STACK_SIZE);
60 static struct k_thread ws_handler_thread[CONFIG_NET_SAMPLE_NUM_WEBSOCKET_HANDLERS];
61 static APP_BMEM bool ws_handler_in_use[CONFIG_NET_SAMPLE_NUM_WEBSOCKET_HANDLERS];
62 
63 static struct ws_netstats_ctx netstats_ctx[CONFIG_NET_SAMPLE_NUM_WEBSOCKET_HANDLERS];
64 
65 static struct data {
66 	int sock;
67 	uint32_t counter;
68 	uint32_t bytes_received;
69 	struct pollfd fds[1];
70 	char recv_buffer[RECV_BUFFER_SIZE];
71 } config[CONFIG_NET_SAMPLE_NUM_WEBSOCKET_HANDLERS] = {
72 	[0 ... (CONFIG_NET_SAMPLE_NUM_WEBSOCKET_HANDLERS - 1)] = {
73 		.sock = -1,
74 		.fds[0].fd = -1,
75 	}
76 };
77 
get_free_echo_slot(struct data * cfg)78 static int get_free_echo_slot(struct data *cfg)
79 {
80 	for (int i = 0; i < CONFIG_NET_SAMPLE_NUM_WEBSOCKET_HANDLERS; i++) {
81 		if (cfg[i].sock < 0) {
82 			return i;
83 		}
84 	}
85 
86 	return -1;
87 }
88 
get_free_netstats_slot(void)89 static int get_free_netstats_slot(void)
90 {
91 	for (int i = 0; i < CONFIG_NET_SAMPLE_NUM_WEBSOCKET_HANDLERS; i++) {
92 		if (netstats_ctx[i].sock < 0) {
93 			return i;
94 		}
95 	}
96 
97 	return -1;
98 }
99 
sendall(int sock,const void * buf,size_t len)100 static ssize_t sendall(int sock, const void *buf, size_t len)
101 {
102 	while (len) {
103 		ssize_t out_len = send(sock, buf, len, 0);
104 
105 		if (out_len < 0) {
106 			return out_len;
107 		}
108 		buf = (const char *)buf + out_len;
109 		len -= out_len;
110 	}
111 
112 	return 0;
113 }
114 
ws_echo_handler(void * ptr1,void * ptr2,void * ptr3)115 static void ws_echo_handler(void *ptr1, void *ptr2, void *ptr3)
116 {
117 	int slot = POINTER_TO_INT(ptr1);
118 	struct data *cfg = ptr2;
119 	bool *in_use = ptr3;
120 	int offset = 0;
121 	int received;
122 	int client;
123 	int ret;
124 
125 	client = cfg->sock;
126 
127 	cfg->fds[0].fd = client;
128 	cfg->fds[0].events = POLLIN;
129 
130 	/* In this example, we start to receive data from the websocket
131 	 * and send it back to the client. Note that we could either use
132 	 * the BSD socket interface if we do not care about Websocket
133 	 * specific packets, or we could use the websocket_{send/recv}_msg()
134 	 * function to send websocket specific data.
135 	 */
136 	while (true) {
137 		if (poll(cfg->fds, 1, -1) < 0) {
138 			LOG_ERR("Error in poll:%d", errno);
139 			continue;
140 		}
141 
142 		if (cfg->fds[0].fd < 0) {
143 			continue;
144 		}
145 
146 		if (cfg->fds[0].revents & POLLHUP) {
147 			LOG_DBG("Client #%d has disconnected", client);
148 			break;
149 		}
150 
151 		received = recv(client,
152 				cfg->recv_buffer + offset,
153 				sizeof(cfg->recv_buffer) - offset,
154 				0);
155 
156 		if (received == 0) {
157 			/* Connection closed */
158 			LOG_INF("[%d] Connection closed", slot);
159 			break;
160 		} else if (received < 0) {
161 			/* Socket error */
162 			LOG_ERR("[%d] Connection error %d", slot, errno);
163 			break;
164 		}
165 
166 		cfg->bytes_received += received;
167 		offset += received;
168 
169 #if !defined(CONFIG_NET_SOCKETS_SOCKOPT_TLS)
170 		/* To prevent fragmentation of the response, reply only if
171 		 * buffer is full or there is no more data to read
172 		 */
173 		if (offset == sizeof(cfg->recv_buffer) ||
174 		    (recv(client, cfg->recv_buffer + offset,
175 			  sizeof(cfg->recv_buffer) - offset,
176 			  MSG_PEEK | MSG_DONTWAIT) < 0 &&
177 		     (errno == EAGAIN || errno == EWOULDBLOCK))) {
178 #endif
179 			ret = sendall(client, cfg->recv_buffer, offset);
180 			if (ret < 0) {
181 				LOG_ERR("[%d] Failed to send data, closing socket",
182 					slot);
183 				break;
184 			}
185 
186 			LOG_DBG("[%d] Received and replied with %d bytes",
187 				slot, offset);
188 
189 			if (++cfg->counter % 1000 == 0U) {
190 				LOG_INF("[%d] Sent %u packets", slot, cfg->counter);
191 			}
192 
193 			offset = 0;
194 #if !defined(CONFIG_NET_SOCKETS_SOCKOPT_TLS)
195 		}
196 #endif
197 	}
198 
199 	*in_use = false;
200 
201 	(void)websocket_unregister(client);
202 
203 	cfg->sock = -1;
204 }
205 
netstats_collect(char * buf,size_t maxlen)206 static int netstats_collect(char *buf, size_t maxlen)
207 {
208 	int ret;
209 	struct net_stats data;
210 	uint32_t bytes_recv = 0;
211 	uint32_t bytes_sent = 0;
212 	uint32_t ipv6_recv = 0;
213 	uint32_t ipv6_sent = 0;
214 	uint32_t ipv4_recv = 0;
215 	uint32_t ipv4_sent = 0;
216 	uint32_t tcp_recv = 0;
217 	uint32_t tcp_sent = 0;
218 
219 	net_mgmt(NET_REQUEST_STATS_GET_ALL, NULL, &data, sizeof(data));
220 
221 	const char *net_stats_json_template = "{"
222 					      "\"bytes_recv\":%u,"
223 					      "\"bytes_sent\":%u,"
224 					      "\"ipv6_pkt_recv\":%u,"
225 					      "\"ipv6_pkt_sent\":%u,"
226 					      "\"ipv4_pkt_recv\":%u,"
227 					      "\"ipv4_pkt_sent\":%u,"
228 					      "\"tcp_bytes_recv\":%u,"
229 					      "\"tcp_bytes_sent\":%u"
230 					      "}";
231 
232 	bytes_recv = data.bytes.received;
233 	bytes_sent = data.bytes.sent;
234 #if defined(CONFIG_NET_STATISTICS_IPV6)
235 	ipv6_recv = data.ipv6.recv;
236 	ipv6_sent = data.ipv6.sent;
237 #endif
238 #if defined(CONFIG_NET_STATISTICS_IPV4)
239 	ipv4_recv = data.ipv4.recv;
240 	ipv4_sent = data.ipv4.sent;
241 #endif
242 #if defined(CONFIG_NET_STATISTICS_TCP)
243 	tcp_recv = data.tcp.bytes.received;
244 	tcp_sent = data.tcp.bytes.sent;
245 #endif
246 
247 	ret = snprintf(buf, maxlen, net_stats_json_template, bytes_recv, bytes_sent, ipv6_recv,
248 		       ipv6_sent, ipv4_recv, ipv4_sent, tcp_recv, tcp_sent);
249 	if (ret >= maxlen) {
250 		LOG_ERR("Net stats do not fit in buffer");
251 		return -ENOSPC;
252 	}
253 
254 	return ret;
255 }
256 
257 #define WS_NETSTATS_STACK_SIZE 2048
258 K_THREAD_STACK_DEFINE(ws_netstats_stack, WS_NETSTATS_STACK_SIZE);
259 struct k_work_q ws_netstats_queue;
260 
netstats_handler(struct k_work * work)261 static void netstats_handler(struct k_work *work)
262 {
263 	int ret;
264 	static char tx_buf[256];
265 	struct k_work_delayable *dwork = k_work_delayable_from_work(work);
266 	struct ws_netstats_ctx *ctx = CONTAINER_OF(dwork, struct ws_netstats_ctx, work);
267 
268 	ret = netstats_collect(tx_buf, sizeof(tx_buf));
269 	if (ret < 0) {
270 		LOG_ERR("Unable to collect network statistics, err %d", ret);
271 		goto unregister;
272 	}
273 
274 	ret = websocket_send_msg(ctx->sock, tx_buf, ret, WEBSOCKET_OPCODE_DATA_TEXT, false, true,
275 				 SYS_FOREVER_MS);
276 	if (ret < 0) {
277 		LOG_INF("Couldn't send websocket msg (%d), closing connection", ret);
278 		goto unregister;
279 	}
280 
281 	ret = k_work_reschedule_for_queue(&ws_netstats_queue, &ctx->work,
282 					  K_MSEC(CONFIG_NET_SAMPLE_WEBSOCKET_STATS_INTERVAL));
283 	if (ret < 0) {
284 		LOG_ERR("Failed to schedule netstats work, err %d", ret);
285 		goto unregister;
286 	}
287 
288 	return;
289 
290 unregister:
291 	(void)websocket_unregister(ctx->sock);
292 	ctx->sock = -1;
293 }
294 
ws_netstats_init(void)295 int ws_netstats_init(void)
296 {
297 	struct k_work_queue_config cfg = {.name = "ws_netstats_q"};
298 
299 	k_work_queue_init(&ws_netstats_queue);
300 	k_work_queue_start(&ws_netstats_queue, ws_netstats_stack, WS_NETSTATS_STACK_SIZE, 0, &cfg);
301 
302 	for (int i = 0; i < CONFIG_NET_SAMPLE_NUM_WEBSOCKET_HANDLERS; i++) {
303 		netstats_ctx[i].sock = -1;
304 		k_work_init_delayable(&netstats_ctx[i].work, netstats_handler);
305 	}
306 
307 	return 0;
308 }
309 SYS_INIT(ws_netstats_init, APPLICATION, 0);
310 
ws_echo_setup(int ws_socket,struct http_request_ctx * request_ctx,void * user_data)311 int ws_echo_setup(int ws_socket, struct http_request_ctx *request_ctx, void *user_data)
312 {
313 	int slot;
314 
315 	slot = get_free_echo_slot(config);
316 	if (slot < 0) {
317 		LOG_ERR("Cannot accept more connections");
318 		/* The caller will close the connection in this case */
319 		return -ENOENT;
320 	}
321 
322 	config[slot].sock = ws_socket;
323 
324 	LOG_INF("[%d] Accepted a Websocket connection", slot);
325 
326 	k_thread_create(&ws_handler_thread[slot],
327 			ws_handler_stack[slot],
328 			K_THREAD_STACK_SIZEOF(ws_handler_stack[slot]),
329 			ws_echo_handler,
330 			INT_TO_POINTER(slot), &config[slot], &ws_handler_in_use[slot],
331 			THREAD_PRIORITY,
332 			IS_ENABLED(CONFIG_USERSPACE) ? K_USER |
333 						       K_INHERIT_PERMS : 0,
334 			K_NO_WAIT);
335 
336 	if (IS_ENABLED(CONFIG_THREAD_NAME)) {
337 #define MAX_NAME_LEN (sizeof("ws[xxxxxxxxxx]"))
338 		char name[MAX_NAME_LEN];
339 
340 		snprintk(name, sizeof(name), "ws[%d]", slot);
341 		k_thread_name_set(&ws_handler_thread[slot], name);
342 	}
343 
344 	return 0;
345 }
346 
ws_netstats_setup(int ws_socket,struct http_request_ctx * request_ctx,void * user_data)347 int ws_netstats_setup(int ws_socket, struct http_request_ctx *request_ctx, void *user_data)
348 {
349 	int ret;
350 	int slot;
351 
352 	slot = get_free_netstats_slot();
353 	if (slot < 0) {
354 		LOG_ERR("Cannot accept more netstats websocket connections");
355 		return -ENOENT;
356 	}
357 
358 	netstats_ctx[slot].sock = ws_socket;
359 
360 	ret = k_work_reschedule_for_queue(&ws_netstats_queue, &netstats_ctx[slot].work, K_NO_WAIT);
361 	if (ret < 0) {
362 		LOG_ERR("Failed to schedule netstats work, err %d", ret);
363 		return ret;
364 	}
365 
366 	LOG_INF("Accepted websocket connection for net stats");
367 	return 0;
368 }
369