1 /*
2  * Copyright (c) 2024 Nordic Semiconductor ASA
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/init.h>
8 
9 #include <zephyr/logging/log.h>
10 #include <zephyr/net/socket.h>
11 #include <zephyr/net/socket_service.h>
12 #include <zephyr/net/http/service.h>
13 #include <zephyr/net/http/server.h>
14 #include <zephyr/net/websocket.h>
15 #include <zephyr/shell/shell.h>
16 #include <zephyr/shell/shell_websocket.h>
17 #include <zephyr/logging/log_backend_ws.h>
18 
19 LOG_MODULE_REGISTER(shell_websocket, CONFIG_SHELL_WEBSOCKET_LOG_LEVEL);
20 
21 #define WEBSOCKET_LINE_SIZE CONFIG_SHELL_WEBSOCKET_LINE_BUF_SIZE
22 #define WEBSOCKET_TIMEOUT   CONFIG_SHELL_WEBSOCKET_SEND_TIMEOUT
23 
24 #define WEBSOCKET_MIN_COMMAND_LEN 2
25 #define WEBSOCKET_WILL_DO_COMMAND_LEN 3
26 
27 static void ws_server_cb(struct net_socket_service_event *evt);
28 
29 NET_SOCKET_SERVICE_SYNC_DEFINE_STATIC(websocket_server, ws_server_cb,
30 				      SHELL_WEBSOCKET_SERVICE_COUNT);
31 
ws_end_client_connection(struct shell_websocket * ws)32 static void ws_end_client_connection(struct shell_websocket *ws)
33 {
34 	int ret;
35 
36 	LOG_DBG("Closing connection to #%d", ws->fds[0].fd);
37 
38 	if (IS_ENABLED(CONFIG_LOG_BACKEND_WS)) {
39 		(void)log_backend_ws_unregister(ws->fds[0].fd);
40 	}
41 
42 	(void)websocket_unregister(ws->fds[0].fd);
43 
44 	ws->fds[0].fd = -1;
45 	ws->output_lock = false;
46 
47 	k_work_cancel_delayable_sync(&ws->send_work, &ws->work_sync);
48 
49 	ret = net_socket_service_register(&websocket_server, ws->fds,
50 					  ARRAY_SIZE(ws->fds), NULL);
51 	if (ret < 0) {
52 		LOG_ERR("Failed to re-register socket service (%d)", ret);
53 	}
54 }
55 
ws_send(struct shell_websocket * ws,bool block)56 static int ws_send(struct shell_websocket *ws, bool block)
57 {
58 	int ret;
59 	uint8_t *msg = ws->line_out.buf;
60 	uint16_t len = ws->line_out.len;
61 
62 	if (ws->line_out.len == 0) {
63 		return 0;
64 	}
65 
66 	if (ws->fds[0].fd < 0) {
67 		return -ENOTCONN;
68 	}
69 
70 	while (len > 0) {
71 		ret = zsock_send(ws->fds[0].fd, msg, len,
72 				 block ? 0 : ZSOCK_MSG_DONTWAIT);
73 		if (!block && (ret < 0) && (errno == EAGAIN)) {
74 			/* Not all data was sent - move the remaining data and
75 			 * update length.
76 			 */
77 			memmove(ws->line_out.buf, msg, len);
78 			ws->line_out.len = len;
79 			return -EAGAIN;
80 		}
81 
82 		if (ret < 0) {
83 			ret = -errno;
84 			LOG_ERR("Failed to send %d, shutting down", -ret);
85 			ws_end_client_connection(ws);
86 			return ret;
87 		}
88 
89 		msg += ret;
90 		len -= ret;
91 	}
92 
93 	/* We reinitialize the line buffer */
94 	ws->line_out.len = 0;
95 
96 	return 0;
97 }
98 
ws_send_prematurely(struct k_work * work)99 static void ws_send_prematurely(struct k_work *work)
100 {
101 	struct k_work_delayable *dwork = k_work_delayable_from_work(work);
102 	struct shell_websocket *ws = CONTAINER_OF(dwork,
103 						  struct shell_websocket,
104 						  send_work);
105 	int ret;
106 
107 	/* Use non-blocking send to prevent system workqueue blocking. */
108 	ret = ws_send(ws, false);
109 	if (ret == -EAGAIN) {
110 		/* Not all data was sent, reschedule the work. */
111 		k_work_reschedule(&ws->send_work, K_MSEC(WEBSOCKET_TIMEOUT));
112 	}
113 }
114 
ws_recv(struct shell_websocket * ws,struct zsock_pollfd * pollfd)115 static void ws_recv(struct shell_websocket *ws, struct zsock_pollfd *pollfd)
116 {
117 	size_t len, buf_left;
118 	uint8_t *buf;
119 	int ret;
120 
121 	k_mutex_lock(&ws->rx_lock, K_FOREVER);
122 
123 	buf_left = sizeof(ws->rx_buf) - ws->rx_len;
124 	if (buf_left == 0) {
125 		/* No space left to read TCP stream, try again later. */
126 		k_mutex_unlock(&ws->rx_lock);
127 		k_msleep(10);
128 		return;
129 	}
130 
131 	buf = ws->rx_buf + ws->rx_len;
132 
133 	ret = zsock_recv(pollfd->fd, buf, buf_left, 0);
134 	if (ret < 0) {
135 		LOG_DBG("Websocket client error %d", ret);
136 		goto error;
137 	} else if (ret == 0) {
138 		LOG_DBG("Websocket client closed connection");
139 		goto error;
140 	}
141 
142 	len = ret;
143 	ws->rx_len += len;
144 
145 	k_mutex_unlock(&ws->rx_lock);
146 
147 	ws->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, ws->shell_context);
148 
149 	return;
150 
151 error:
152 	k_mutex_unlock(&ws->rx_lock);
153 	ws_end_client_connection(ws);
154 }
155 
ws_server_cb(struct net_socket_service_event * evt)156 static void ws_server_cb(struct net_socket_service_event *evt)
157 {
158 	socklen_t optlen = sizeof(int);
159 	struct shell_websocket *ws;
160 	int sock_error;
161 
162 	ws = (struct shell_websocket *)evt->user_data;
163 
164 	if ((evt->event.revents & ZSOCK_POLLERR) ||
165 	    (evt->event.revents & ZSOCK_POLLNVAL)) {
166 		(void)zsock_getsockopt(evt->event.fd, SOL_SOCKET,
167 				       SO_ERROR, &sock_error, &optlen);
168 		LOG_ERR("Websocket socket %d error (%d)", evt->event.fd, sock_error);
169 
170 		if (evt->event.fd == ws->fds[0].fd) {
171 			return ws_end_client_connection(ws);
172 		}
173 
174 		return;
175 	}
176 
177 	if (!(evt->event.revents & ZSOCK_POLLIN)) {
178 		return;
179 	}
180 
181 	if (evt->event.fd == ws->fds[0].fd) {
182 		return ws_recv(ws, &ws->fds[0]);
183 	}
184 }
185 
shell_ws_init(struct shell_websocket * ctx,int ws_socket)186 static int shell_ws_init(struct shell_websocket *ctx, int ws_socket)
187 {
188 	int ret;
189 
190 	if (ws_socket < 0) {
191 		LOG_ERR("Invalid socket %d", ws_socket);
192 		return -EBADF;
193 	}
194 
195 	if (ctx->fds[0].fd >= 0) {
196 		/* There is already a websocket connection to this shell,
197 		 * kick the previous connection out.
198 		 */
199 		ws_end_client_connection(ctx);
200 	}
201 
202 	ctx->fds[0].fd = ws_socket;
203 	ctx->fds[0].events = ZSOCK_POLLIN;
204 
205 	ret = net_socket_service_register(&websocket_server, ctx->fds,
206 					  ARRAY_SIZE(ctx->fds), ctx);
207 	if (ret < 0) {
208 		LOG_ERR("Failed to register socket service, %d", ret);
209 		goto error;
210 	}
211 	if (IS_ENABLED(CONFIG_LOG_BACKEND_WS)) {
212 		log_backend_ws_register(ws_socket);
213 	}
214 
215 	return 0;
216 
217 error:
218 	if (ctx->fds[0].fd >= 0) {
219 		(void)zsock_close(ctx->fds[0].fd);
220 		ctx->fds[0].fd = -1;
221 	}
222 
223 	return ret;
224 }
225 
226 /* Shell API */
227 
init(const struct shell_transport * transport,const void * config,shell_transport_handler_t evt_handler,void * context)228 static int init(const struct shell_transport *transport,
229 		const void *config,
230 		shell_transport_handler_t evt_handler,
231 		void *context)
232 {
233 	struct shell_websocket *ws;
234 
235 	ws = (struct shell_websocket *)transport->ctx;
236 
237 	memset(ws, 0, sizeof(struct shell_websocket));
238 	for (int i = 0; i < ARRAY_SIZE(ws->fds); i++) {
239 		ws->fds[i].fd = -1;
240 	}
241 
242 	ws->shell_handler = evt_handler;
243 	ws->shell_context = context;
244 
245 	k_work_init_delayable(&ws->send_work, ws_send_prematurely);
246 	k_mutex_init(&ws->rx_lock);
247 
248 	return 0;
249 }
250 
uninit(const struct shell_transport * transport)251 static int uninit(const struct shell_transport *transport)
252 {
253 	ARG_UNUSED(transport);
254 
255 	return 0;
256 }
257 
enable(const struct shell_transport * transport,bool blocking)258 static int enable(const struct shell_transport *transport, bool blocking)
259 {
260 	ARG_UNUSED(transport);
261 	ARG_UNUSED(blocking);
262 
263 	return 0;
264 }
265 
sh_write(const struct shell_transport * transport,const void * data,size_t length,size_t * cnt)266 static int sh_write(const struct shell_transport *transport,
267 		    const void *data, size_t length, size_t *cnt)
268 {
269 	struct shell_websocket_line_buf *lb;
270 	struct shell_websocket *ws;
271 	uint32_t timeout;
272 	bool was_running;
273 	size_t copy_len;
274 	int ret;
275 
276 	ws = (struct shell_websocket *)transport->ctx;
277 
278 	if (ws->fds[0].fd < 0 || ws->output_lock) {
279 		*cnt = length;
280 		return 0;
281 	}
282 
283 	*cnt = 0;
284 	lb = &ws->line_out;
285 
286 	/* Stop the transmission timer, so it does not interrupt the operation.
287 	 */
288 	timeout = k_ticks_to_ms_ceil32(k_work_delayable_remaining_get(&ws->send_work));
289 	was_running = k_work_cancel_delayable_sync(&ws->send_work, &ws->work_sync);
290 
291 	do {
292 		if (lb->len + length - *cnt > WEBSOCKET_LINE_SIZE) {
293 			copy_len = WEBSOCKET_LINE_SIZE - lb->len;
294 		} else {
295 			copy_len = length - *cnt;
296 		}
297 
298 		memcpy(lb->buf + lb->len, (uint8_t *)data + *cnt, copy_len);
299 		lb->len += copy_len;
300 
301 		/* Send the data immediately if the buffer is full or line feed
302 		 * is recognized.
303 		 */
304 		if (lb->buf[lb->len - 1] == '\n' || lb->len == WEBSOCKET_LINE_SIZE) {
305 			ret = ws_send(ws, true);
306 			if (ret != 0) {
307 				*cnt = length;
308 				return ret;
309 			}
310 		}
311 
312 		*cnt += copy_len;
313 	} while (*cnt < length);
314 
315 	if (lb->len > 0) {
316 		/* Check if the timer was already running, initialize otherwise.
317 		 */
318 		timeout = was_running ? timeout : WEBSOCKET_TIMEOUT;
319 
320 		k_work_reschedule(&ws->send_work, K_MSEC(timeout));
321 	}
322 
323 	ws->shell_handler(SHELL_TRANSPORT_EVT_TX_RDY, ws->shell_context);
324 
325 	return 0;
326 }
327 
sh_read(const struct shell_transport * transport,void * data,size_t length,size_t * cnt)328 static int sh_read(const struct shell_transport *transport,
329 		   void *data, size_t length, size_t *cnt)
330 {
331 	struct shell_websocket *ws;
332 	size_t read_len;
333 
334 	ws = (struct shell_websocket *)transport->ctx;
335 
336 	if (ws->fds[0].fd < 0) {
337 		goto no_data;
338 	}
339 
340 	k_mutex_lock(&ws->rx_lock, K_FOREVER);
341 
342 	if (ws->rx_len == 0) {
343 		k_mutex_unlock(&ws->rx_lock);
344 		goto no_data;
345 	}
346 
347 	read_len = ws->rx_len;
348 	if (read_len > length) {
349 		read_len = length;
350 	}
351 
352 	memcpy(data, ws->rx_buf, read_len);
353 	*cnt = read_len;
354 
355 	ws->rx_len -= read_len;
356 	if (ws->rx_len > 0) {
357 		memmove(ws->rx_buf, ws->rx_buf + read_len, ws->rx_len);
358 	}
359 
360 	k_mutex_unlock(&ws->rx_lock);
361 
362 	return 0;
363 
364 no_data:
365 	*cnt = 0;
366 	return 0;
367 }
368 
369 const struct shell_transport_api shell_websocket_transport_api = {
370 	.init = init,
371 	.uninit = uninit,
372 	.enable = enable,
373 	.write = sh_write,
374 	.read = sh_read
375 };
376 
shell_websocket_setup(int ws_socket,struct http_request_ctx * request_ctx,void * user_data)377 int shell_websocket_setup(int ws_socket, struct http_request_ctx *request_ctx, void *user_data)
378 {
379 	struct shell_websocket *ws = user_data;
380 
381 	return shell_ws_init(ws, ws_socket);
382 }
383 
shell_websocket_enable(const struct shell * sh)384 int shell_websocket_enable(const struct shell *sh)
385 {
386 	bool log_backend = CONFIG_SHELL_WEBSOCKET_INIT_LOG_LEVEL > 0 &&
387 			   !IS_ENABLED(CONFIG_LOG_BACKEND_WS);
388 	uint32_t level = (CONFIG_SHELL_WEBSOCKET_INIT_LOG_LEVEL > LOG_LEVEL_DBG) ?
389 		CONFIG_LOG_MAX_LEVEL : CONFIG_SHELL_WEBSOCKET_INIT_LOG_LEVEL;
390 	static const struct shell_backend_config_flags cfg_flags =
391 		SHELL_DEFAULT_BACKEND_CONFIG_FLAGS;
392 	int ret;
393 
394 	ret = shell_init(sh, NULL, cfg_flags, log_backend, level);
395 	if (ret < 0) {
396 		LOG_DBG("Cannot init websocket shell %p", sh);
397 	}
398 
399 	return ret;
400 }
401