1 /*
2 * Copyright (c) 2024 Nordic Semiconductor ASA
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <zephyr/init.h>
8
9 #include <zephyr/logging/log.h>
10 #include <zephyr/net/socket.h>
11 #include <zephyr/net/socket_service.h>
12 #include <zephyr/net/http/service.h>
13 #include <zephyr/net/http/server.h>
14 #include <zephyr/net/websocket.h>
15 #include <zephyr/shell/shell.h>
16 #include <zephyr/shell/shell_websocket.h>
17 #include <zephyr/logging/log_backend_ws.h>
18
19 LOG_MODULE_REGISTER(shell_websocket, CONFIG_SHELL_WEBSOCKET_LOG_LEVEL);
20
21 #define WEBSOCKET_LINE_SIZE CONFIG_SHELL_WEBSOCKET_LINE_BUF_SIZE
22 #define WEBSOCKET_TIMEOUT CONFIG_SHELL_WEBSOCKET_SEND_TIMEOUT
23
24 #define WEBSOCKET_MIN_COMMAND_LEN 2
25 #define WEBSOCKET_WILL_DO_COMMAND_LEN 3
26
27 static void ws_server_cb(struct net_socket_service_event *evt);
28
29 NET_SOCKET_SERVICE_SYNC_DEFINE_STATIC(websocket_server, ws_server_cb,
30 SHELL_WEBSOCKET_SERVICE_COUNT);
31
ws_end_client_connection(struct shell_websocket * ws)32 static void ws_end_client_connection(struct shell_websocket *ws)
33 {
34 int ret;
35
36 LOG_DBG("Closing connection to #%d", ws->fds[0].fd);
37
38 if (IS_ENABLED(CONFIG_LOG_BACKEND_WS)) {
39 (void)log_backend_ws_unregister(ws->fds[0].fd);
40 }
41
42 (void)websocket_unregister(ws->fds[0].fd);
43
44 ws->fds[0].fd = -1;
45 ws->output_lock = false;
46
47 k_work_cancel_delayable_sync(&ws->send_work, &ws->work_sync);
48
49 ret = net_socket_service_register(&websocket_server, ws->fds,
50 ARRAY_SIZE(ws->fds), NULL);
51 if (ret < 0) {
52 LOG_ERR("Failed to re-register socket service (%d)", ret);
53 }
54 }
55
ws_send(struct shell_websocket * ws,bool block)56 static int ws_send(struct shell_websocket *ws, bool block)
57 {
58 int ret;
59 uint8_t *msg = ws->line_out.buf;
60 uint16_t len = ws->line_out.len;
61
62 if (ws->line_out.len == 0) {
63 return 0;
64 }
65
66 if (ws->fds[0].fd < 0) {
67 return -ENOTCONN;
68 }
69
70 while (len > 0) {
71 ret = zsock_send(ws->fds[0].fd, msg, len,
72 block ? 0 : ZSOCK_MSG_DONTWAIT);
73 if (!block && (ret < 0) && (errno == EAGAIN)) {
74 /* Not all data was sent - move the remaining data and
75 * update length.
76 */
77 memmove(ws->line_out.buf, msg, len);
78 ws->line_out.len = len;
79 return -EAGAIN;
80 }
81
82 if (ret < 0) {
83 ret = -errno;
84 LOG_ERR("Failed to send %d, shutting down", -ret);
85 ws_end_client_connection(ws);
86 return ret;
87 }
88
89 msg += ret;
90 len -= ret;
91 }
92
93 /* We reinitialize the line buffer */
94 ws->line_out.len = 0;
95
96 return 0;
97 }
98
ws_send_prematurely(struct k_work * work)99 static void ws_send_prematurely(struct k_work *work)
100 {
101 struct k_work_delayable *dwork = k_work_delayable_from_work(work);
102 struct shell_websocket *ws = CONTAINER_OF(dwork,
103 struct shell_websocket,
104 send_work);
105 int ret;
106
107 /* Use non-blocking send to prevent system workqueue blocking. */
108 ret = ws_send(ws, false);
109 if (ret == -EAGAIN) {
110 /* Not all data was sent, reschedule the work. */
111 k_work_reschedule(&ws->send_work, K_MSEC(WEBSOCKET_TIMEOUT));
112 }
113 }
114
ws_recv(struct shell_websocket * ws,struct zsock_pollfd * pollfd)115 static void ws_recv(struct shell_websocket *ws, struct zsock_pollfd *pollfd)
116 {
117 size_t len, buf_left;
118 uint8_t *buf;
119 int ret;
120
121 k_mutex_lock(&ws->rx_lock, K_FOREVER);
122
123 buf_left = sizeof(ws->rx_buf) - ws->rx_len;
124 if (buf_left == 0) {
125 /* No space left to read TCP stream, try again later. */
126 k_mutex_unlock(&ws->rx_lock);
127 k_msleep(10);
128 return;
129 }
130
131 buf = ws->rx_buf + ws->rx_len;
132
133 ret = zsock_recv(pollfd->fd, buf, buf_left, 0);
134 if (ret < 0) {
135 LOG_DBG("Websocket client error %d", ret);
136 goto error;
137 } else if (ret == 0) {
138 LOG_DBG("Websocket client closed connection");
139 goto error;
140 }
141
142 len = ret;
143 ws->rx_len += len;
144
145 k_mutex_unlock(&ws->rx_lock);
146
147 ws->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, ws->shell_context);
148
149 return;
150
151 error:
152 k_mutex_unlock(&ws->rx_lock);
153 ws_end_client_connection(ws);
154 }
155
ws_server_cb(struct net_socket_service_event * evt)156 static void ws_server_cb(struct net_socket_service_event *evt)
157 {
158 socklen_t optlen = sizeof(int);
159 struct shell_websocket *ws;
160 int sock_error;
161
162 ws = (struct shell_websocket *)evt->user_data;
163
164 if ((evt->event.revents & ZSOCK_POLLERR) ||
165 (evt->event.revents & ZSOCK_POLLNVAL)) {
166 (void)zsock_getsockopt(evt->event.fd, SOL_SOCKET,
167 SO_ERROR, &sock_error, &optlen);
168 LOG_ERR("Websocket socket %d error (%d)", evt->event.fd, sock_error);
169
170 if (evt->event.fd == ws->fds[0].fd) {
171 return ws_end_client_connection(ws);
172 }
173
174 return;
175 }
176
177 if (!(evt->event.revents & ZSOCK_POLLIN)) {
178 return;
179 }
180
181 if (evt->event.fd == ws->fds[0].fd) {
182 return ws_recv(ws, &ws->fds[0]);
183 }
184 }
185
shell_ws_init(struct shell_websocket * ctx,int ws_socket)186 static int shell_ws_init(struct shell_websocket *ctx, int ws_socket)
187 {
188 int ret;
189
190 if (ws_socket < 0) {
191 LOG_ERR("Invalid socket %d", ws_socket);
192 return -EBADF;
193 }
194
195 if (ctx->fds[0].fd >= 0) {
196 /* There is already a websocket connection to this shell,
197 * kick the previous connection out.
198 */
199 ws_end_client_connection(ctx);
200 }
201
202 ctx->fds[0].fd = ws_socket;
203 ctx->fds[0].events = ZSOCK_POLLIN;
204
205 ret = net_socket_service_register(&websocket_server, ctx->fds,
206 ARRAY_SIZE(ctx->fds), ctx);
207 if (ret < 0) {
208 LOG_ERR("Failed to register socket service, %d", ret);
209 goto error;
210 }
211 if (IS_ENABLED(CONFIG_LOG_BACKEND_WS)) {
212 log_backend_ws_register(ws_socket);
213 }
214
215 return 0;
216
217 error:
218 if (ctx->fds[0].fd >= 0) {
219 (void)zsock_close(ctx->fds[0].fd);
220 ctx->fds[0].fd = -1;
221 }
222
223 return ret;
224 }
225
226 /* Shell API */
227
init(const struct shell_transport * transport,const void * config,shell_transport_handler_t evt_handler,void * context)228 static int init(const struct shell_transport *transport,
229 const void *config,
230 shell_transport_handler_t evt_handler,
231 void *context)
232 {
233 struct shell_websocket *ws;
234
235 ws = (struct shell_websocket *)transport->ctx;
236
237 memset(ws, 0, sizeof(struct shell_websocket));
238 for (int i = 0; i < ARRAY_SIZE(ws->fds); i++) {
239 ws->fds[i].fd = -1;
240 }
241
242 ws->shell_handler = evt_handler;
243 ws->shell_context = context;
244
245 k_work_init_delayable(&ws->send_work, ws_send_prematurely);
246 k_mutex_init(&ws->rx_lock);
247
248 return 0;
249 }
250
uninit(const struct shell_transport * transport)251 static int uninit(const struct shell_transport *transport)
252 {
253 ARG_UNUSED(transport);
254
255 return 0;
256 }
257
enable(const struct shell_transport * transport,bool blocking)258 static int enable(const struct shell_transport *transport, bool blocking)
259 {
260 ARG_UNUSED(transport);
261 ARG_UNUSED(blocking);
262
263 return 0;
264 }
265
sh_write(const struct shell_transport * transport,const void * data,size_t length,size_t * cnt)266 static int sh_write(const struct shell_transport *transport,
267 const void *data, size_t length, size_t *cnt)
268 {
269 struct shell_websocket_line_buf *lb;
270 struct shell_websocket *ws;
271 uint32_t timeout;
272 bool was_running;
273 size_t copy_len;
274 int ret;
275
276 ws = (struct shell_websocket *)transport->ctx;
277
278 if (ws->fds[0].fd < 0 || ws->output_lock) {
279 *cnt = length;
280 return 0;
281 }
282
283 *cnt = 0;
284 lb = &ws->line_out;
285
286 /* Stop the transmission timer, so it does not interrupt the operation.
287 */
288 timeout = k_ticks_to_ms_ceil32(k_work_delayable_remaining_get(&ws->send_work));
289 was_running = k_work_cancel_delayable_sync(&ws->send_work, &ws->work_sync);
290
291 do {
292 if (lb->len + length - *cnt > WEBSOCKET_LINE_SIZE) {
293 copy_len = WEBSOCKET_LINE_SIZE - lb->len;
294 } else {
295 copy_len = length - *cnt;
296 }
297
298 memcpy(lb->buf + lb->len, (uint8_t *)data + *cnt, copy_len);
299 lb->len += copy_len;
300
301 /* Send the data immediately if the buffer is full or line feed
302 * is recognized.
303 */
304 if (lb->buf[lb->len - 1] == '\n' || lb->len == WEBSOCKET_LINE_SIZE) {
305 ret = ws_send(ws, true);
306 if (ret != 0) {
307 *cnt = length;
308 return ret;
309 }
310 }
311
312 *cnt += copy_len;
313 } while (*cnt < length);
314
315 if (lb->len > 0) {
316 /* Check if the timer was already running, initialize otherwise.
317 */
318 timeout = was_running ? timeout : WEBSOCKET_TIMEOUT;
319
320 k_work_reschedule(&ws->send_work, K_MSEC(timeout));
321 }
322
323 ws->shell_handler(SHELL_TRANSPORT_EVT_TX_RDY, ws->shell_context);
324
325 return 0;
326 }
327
sh_read(const struct shell_transport * transport,void * data,size_t length,size_t * cnt)328 static int sh_read(const struct shell_transport *transport,
329 void *data, size_t length, size_t *cnt)
330 {
331 struct shell_websocket *ws;
332 size_t read_len;
333
334 ws = (struct shell_websocket *)transport->ctx;
335
336 if (ws->fds[0].fd < 0) {
337 goto no_data;
338 }
339
340 k_mutex_lock(&ws->rx_lock, K_FOREVER);
341
342 if (ws->rx_len == 0) {
343 k_mutex_unlock(&ws->rx_lock);
344 goto no_data;
345 }
346
347 read_len = ws->rx_len;
348 if (read_len > length) {
349 read_len = length;
350 }
351
352 memcpy(data, ws->rx_buf, read_len);
353 *cnt = read_len;
354
355 ws->rx_len -= read_len;
356 if (ws->rx_len > 0) {
357 memmove(ws->rx_buf, ws->rx_buf + read_len, ws->rx_len);
358 }
359
360 k_mutex_unlock(&ws->rx_lock);
361
362 return 0;
363
364 no_data:
365 *cnt = 0;
366 return 0;
367 }
368
369 const struct shell_transport_api shell_websocket_transport_api = {
370 .init = init,
371 .uninit = uninit,
372 .enable = enable,
373 .write = sh_write,
374 .read = sh_read
375 };
376
shell_websocket_setup(int ws_socket,struct http_request_ctx * request_ctx,void * user_data)377 int shell_websocket_setup(int ws_socket, struct http_request_ctx *request_ctx, void *user_data)
378 {
379 struct shell_websocket *ws = user_data;
380
381 return shell_ws_init(ws, ws_socket);
382 }
383
shell_websocket_enable(const struct shell * sh)384 int shell_websocket_enable(const struct shell *sh)
385 {
386 bool log_backend = CONFIG_SHELL_WEBSOCKET_INIT_LOG_LEVEL > 0 &&
387 !IS_ENABLED(CONFIG_LOG_BACKEND_WS);
388 uint32_t level = (CONFIG_SHELL_WEBSOCKET_INIT_LOG_LEVEL > LOG_LEVEL_DBG) ?
389 CONFIG_LOG_MAX_LEVEL : CONFIG_SHELL_WEBSOCKET_INIT_LOG_LEVEL;
390 static const struct shell_backend_config_flags cfg_flags =
391 SHELL_DEFAULT_BACKEND_CONFIG_FLAGS;
392 int ret;
393
394 ret = shell_init(sh, NULL, cfg_flags, log_backend, level);
395 if (ret < 0) {
396 LOG_DBG("Cannot init websocket shell %p", sh);
397 }
398
399 return ret;
400 }
401