1 /*
2  * Copyright (c) 2022 G-Technologies Sdn. Bhd.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/kernel.h>
8 #include <zephyr/shell/shell_mqtt.h>
9 #include <zephyr/init.h>
10 #include <zephyr/logging/log.h>
11 #include <string.h>
12 #include <stdio.h>
13 #include <zephyr/drivers/hwinfo.h>
14 
15 SHELL_MQTT_DEFINE(shell_transport_mqtt);
16 SHELL_DEFINE(shell_mqtt, "", &shell_transport_mqtt,
17 	     CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_SIZE,
18 	     CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_TIMEOUT, SHELL_FLAG_OLF_CRLF);
19 
20 LOG_MODULE_REGISTER(shell_mqtt, CONFIG_SHELL_MQTT_LOG_LEVEL);
21 
22 #define NET_EVENT_MASK (NET_EVENT_L4_CONNECTED | NET_EVENT_L4_DISCONNECTED)
23 #define CONNECT_TIMEOUT_MS CONFIG_SHELL_MQTT_CONNECT_TIMEOUT_MS
24 #define LISTEN_TIMEOUT_MS CONFIG_SHELL_MQTT_LISTEN_TIMEOUT_MS
25 #define MQTT_SEND_DELAY_MS K_MSEC(100)
26 #define PROCESS_INTERVAL K_MSEC(CONFIG_SHELL_MQTT_WORK_DELAY_MS)
27 #define SHELL_MQTT_WORKQ_STACK_SIZE 2048
28 
29 struct shell_mqtt *sh_mqtt;
30 K_KERNEL_STACK_DEFINE(sh_mqtt_workq_stack, SHELL_MQTT_WORKQ_STACK_SIZE);
31 
32 static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt);
33 
sh_mqtt_work_reschedule(struct k_work_delayable * dwork,k_timeout_t delay)34 static inline int sh_mqtt_work_reschedule(struct k_work_delayable *dwork, k_timeout_t delay)
35 {
36 	return k_work_reschedule_for_queue(&sh_mqtt->workq, dwork, delay);
37 }
38 
sh_mqtt_work_submit(struct k_work * work)39 static inline int sh_mqtt_work_submit(struct k_work *work)
40 {
41 	return k_work_submit_to_queue(&sh_mqtt->workq, work);
42 }
43 
44 /* Lock the context of the shell mqtt */
sh_mqtt_context_lock(k_timeout_t timeout)45 static inline int sh_mqtt_context_lock(k_timeout_t timeout)
46 {
47 	return k_mutex_lock(&sh_mqtt->lock, timeout);
48 }
49 
50 /* Unlock the context of the shell mqtt */
sh_mqtt_context_unlock(void)51 static inline void sh_mqtt_context_unlock(void)
52 {
53 	(void)k_mutex_unlock(&sh_mqtt->lock);
54 }
55 
shell_mqtt_get_devid(char * id,int id_max_len)56 bool __weak shell_mqtt_get_devid(char *id, int id_max_len)
57 {
58 	uint8_t hwinfo_id[DEVICE_ID_BIN_MAX_SIZE];
59 	ssize_t length;
60 
61 	length = hwinfo_get_device_id(hwinfo_id, DEVICE_ID_BIN_MAX_SIZE);
62 	if (length <= 0) {
63 		return false;
64 	}
65 
66 	(void)memset(id, 0, id_max_len);
67 	length = bin2hex(hwinfo_id, (size_t)length, id, id_max_len);
68 
69 	return length > 0;
70 }
71 
prepare_fds(struct shell_mqtt * sh)72 static void prepare_fds(struct shell_mqtt *sh)
73 {
74 	if (sh->mqtt_cli.transport.type == MQTT_TRANSPORT_NON_SECURE) {
75 		sh->fds[0].fd = sh->mqtt_cli.transport.tcp.sock;
76 	}
77 
78 	sh->fds[0].events = ZSOCK_POLLIN;
79 	sh->nfds = 1;
80 }
81 
clear_fds(struct shell_mqtt * sh)82 static void clear_fds(struct shell_mqtt *sh)
83 {
84 	sh->nfds = 0;
85 }
86 
87 /*
88  * Upon successful completion, poll() shall return a non-negative value. A positive value indicates
89  * the total number of pollfd structures that have selected events (that is, those for which the
90  * revents member is non-zero). A value of 0 indicates that the call timed out and no file
91  * descriptors have been selected. Upon failure, poll() shall return -1 and set errno to indicate
92  * the error.
93  */
wait(struct shell_mqtt * sh,int timeout)94 static int wait(struct shell_mqtt *sh, int timeout)
95 {
96 	int rc = 0;
97 
98 	if (sh->nfds > 0) {
99 		rc = zsock_poll(sh->fds, sh->nfds, timeout);
100 		if (rc < 0) {
101 			LOG_ERR("poll error: %d", errno);
102 		}
103 	}
104 
105 	return rc;
106 }
107 
108 /* Query IP address for the broker URL */
get_mqtt_broker_addrinfo(struct shell_mqtt * sh)109 static int get_mqtt_broker_addrinfo(struct shell_mqtt *sh)
110 {
111 	int rc;
112 	struct zsock_addrinfo hints = { .ai_family = NET_AF_INET,
113 					.ai_socktype = NET_SOCK_STREAM,
114 					.ai_protocol = 0 };
115 
116 	if (sh->haddr != NULL) {
117 		zsock_freeaddrinfo(sh->haddr);
118 	}
119 
120 	rc = zsock_getaddrinfo(CONFIG_SHELL_MQTT_SERVER_ADDR,
121 			       STRINGIFY(CONFIG_SHELL_MQTT_SERVER_PORT), &hints, &sh->haddr);
122 	if (rc == 0) {
123 		LOG_INF("DNS%s resolved for %s:%d", "", CONFIG_SHELL_MQTT_SERVER_ADDR,
124 			CONFIG_SHELL_MQTT_SERVER_PORT);
125 
126 		return 0;
127 	}
128 
129 	LOG_ERR("DNS%s resolved for %s:%d, retrying", " not", CONFIG_SHELL_MQTT_SERVER_ADDR,
130 		CONFIG_SHELL_MQTT_SERVER_PORT);
131 
132 	return rc;
133 }
134 
135 /* Close MQTT connection properly and cleanup socket */
sh_mqtt_close_and_cleanup(struct shell_mqtt * sh)136 static void sh_mqtt_close_and_cleanup(struct shell_mqtt *sh)
137 {
138 	/* Initialize to negative value so that the mqtt_abort case can run */
139 	int rc = -1;
140 
141 	/* If both network & mqtt connected, mqtt_disconnect will send a
142 	 * disconnection packet to the broker, it will invoke
143 	 * mqtt_evt_handler:MQTT_EVT_DISCONNECT if success
144 	 */
145 	if ((sh->network_state == SHELL_MQTT_NETWORK_CONNECTED) &&
146 	    (sh->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED)) {
147 		rc = mqtt_disconnect(&sh->mqtt_cli, NULL);
148 	}
149 
150 	/* If network/mqtt disconnected, or mqtt_disconnect failed, do mqtt_abort */
151 	if (rc < 0) {
152 		/* mqtt_abort doesn't send disconnection packet to the broker, but it
153 		 * makes sure that the MQTT connection is aborted locally and will
154 		 * always invoke mqtt_evt_handler:MQTT_EVT_DISCONNECT
155 		 */
156 		(void)mqtt_abort(&sh->mqtt_cli);
157 	}
158 
159 	/* Cleanup socket */
160 	clear_fds(sh);
161 }
162 
broker_init(struct shell_mqtt * sh)163 static void broker_init(struct shell_mqtt *sh)
164 {
165 	struct net_sockaddr_in *broker4 = (struct net_sockaddr_in *)&sh->broker;
166 
167 	broker4->sin_family = NET_AF_INET;
168 	broker4->sin_port = net_htons(CONFIG_SHELL_MQTT_SERVER_PORT);
169 
170 	net_ipaddr_copy(&broker4->sin_addr, &net_sin(sh->haddr->ai_addr)->sin_addr);
171 }
172 
client_init(struct shell_mqtt * sh)173 static void client_init(struct shell_mqtt *sh)
174 {
175 	static struct mqtt_utf8 password;
176 	static struct mqtt_utf8 username;
177 
178 	password.utf8 = (uint8_t *)CONFIG_SHELL_MQTT_SERVER_PASSWORD;
179 	password.size = strlen(CONFIG_SHELL_MQTT_SERVER_PASSWORD);
180 	username.utf8 = (uint8_t *)CONFIG_SHELL_MQTT_SERVER_USERNAME;
181 	username.size = strlen(CONFIG_SHELL_MQTT_SERVER_USERNAME);
182 
183 	mqtt_client_init(&sh->mqtt_cli);
184 
185 	/* MQTT client configuration */
186 	sh->mqtt_cli.broker = &sh->broker;
187 	sh->mqtt_cli.evt_cb = mqtt_evt_handler;
188 	sh->mqtt_cli.client_id.utf8 = (uint8_t *)sh->device_id;
189 	sh->mqtt_cli.client_id.size = strlen(sh->device_id);
190 	sh->mqtt_cli.password = &password;
191 	sh->mqtt_cli.user_name = &username;
192 	sh->mqtt_cli.protocol_version = MQTT_VERSION_3_1_1;
193 
194 	/* MQTT buffers configuration */
195 	sh->mqtt_cli.rx_buf = sh->buf.rx;
196 	sh->mqtt_cli.rx_buf_size = sizeof(sh->buf.rx);
197 	sh->mqtt_cli.tx_buf = sh->buf.tx;
198 	sh->mqtt_cli.tx_buf_size = sizeof(sh->buf.tx);
199 
200 	/* MQTT transport configuration */
201 	sh->mqtt_cli.transport.type = MQTT_TRANSPORT_NON_SECURE;
202 }
203 
204 /* Work routine to process MQTT packet and keep alive MQTT connection */
sh_mqtt_process_handler(struct k_work * work)205 static void sh_mqtt_process_handler(struct k_work *work)
206 {
207 	ARG_UNUSED(work);
208 	struct shell_mqtt *sh = sh_mqtt;
209 	int rc;
210 	int64_t remaining = LISTEN_TIMEOUT_MS;
211 	int64_t start_time = k_uptime_get();
212 
213 	if (sh->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
214 		LOG_DBG("%s_work while %s", "process", "network disconnected");
215 		return;
216 	}
217 
218 	/* If context can't be locked, that means net conn cb locked it */
219 	if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
220 		/* In that case we should simply return */
221 		LOG_DBG("%s_work unable to lock context", "process");
222 		return;
223 	}
224 
225 	if (sh->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
226 		LOG_DBG("MQTT %s", "not connected");
227 		goto process_error;
228 	}
229 
230 	if (sh->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
231 		LOG_DBG("%s_work while %s", "process", "MQTT not subscribed");
232 		goto process_error;
233 	}
234 
235 	LOG_DBG("MQTT %s", "Processing");
236 	/* Listen to the port for a duration defined by LISTEN_TIMEOUT_MS */
237 	while ((remaining > 0) && (sh->network_state == SHELL_MQTT_NETWORK_CONNECTED) &&
238 	       (sh->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) &&
239 	       (sh->subscribe_state == SHELL_MQTT_SUBSCRIBED)) {
240 		LOG_DBG("Listening to socket");
241 		rc = wait(sh, remaining);
242 		if (rc > 0) {
243 			LOG_DBG("Process socket for MQTT packet");
244 			rc = mqtt_input(&sh->mqtt_cli);
245 			if (rc != 0) {
246 				LOG_ERR("%s error: %d", "processed: mqtt_input", rc);
247 				goto process_error;
248 			}
249 		} else if (rc < 0) {
250 			goto process_error;
251 		}
252 
253 		LOG_DBG("MQTT %s", "Keepalive");
254 		rc = mqtt_live(&sh->mqtt_cli);
255 		if ((rc != 0) && (rc != -EAGAIN)) {
256 			LOG_ERR("%s error: %d", "mqtt_live", rc);
257 			goto process_error;
258 		}
259 
260 		remaining = LISTEN_TIMEOUT_MS + start_time - k_uptime_get();
261 	}
262 
263 	/* Reschedule the process work */
264 	LOG_DBG("Scheduling %s work", "process");
265 	(void)sh_mqtt_work_reschedule(&sh->process_dwork, PROCESS_INTERVAL);
266 	sh_mqtt_context_unlock();
267 	return;
268 
269 process_error:
270 	LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect");
271 	sh_mqtt_close_and_cleanup(sh);
272 	(void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
273 	sh_mqtt_context_unlock();
274 }
275 
sh_mqtt_subscribe_handler(struct k_work * work)276 static void sh_mqtt_subscribe_handler(struct k_work *work)
277 {
278 	ARG_UNUSED(work);
279 	struct shell_mqtt *sh = sh_mqtt;
280 
281 	/* Subscribe config information */
282 	struct mqtt_topic subs_topic = { .topic = { .utf8 = sh->sub_topic,
283 						    .size = strlen(sh->sub_topic) },
284 					 .qos = MQTT_QOS_1_AT_LEAST_ONCE };
285 	const struct mqtt_subscription_list subs_list = { .list = &subs_topic,
286 							  .list_count = 1U,
287 							  .message_id = 1U };
288 	int rc;
289 
290 	if (sh->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
291 		LOG_DBG("%s_work while %s", "subscribe", "network disconnected");
292 		return;
293 	}
294 
295 	/* If context can't be locked, that means net conn cb locked it */
296 	if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
297 		/* In that case we should simply return */
298 		LOG_DBG("%s_work unable to lock context", "subscribe");
299 		return;
300 	}
301 
302 	if (sh->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
303 		LOG_DBG("%s_work while %s", "subscribe", "transport disconnected");
304 		goto subscribe_error;
305 	}
306 
307 	rc = mqtt_subscribe(&sh->mqtt_cli, &subs_list);
308 	if (rc == 0) {
309 		/* Wait for mqtt's connack */
310 		LOG_DBG("Listening to socket");
311 		rc = wait(sh, CONNECT_TIMEOUT_MS);
312 		if (rc > 0) {
313 			LOG_DBG("Process socket for MQTT packet");
314 			rc = mqtt_input(&sh->mqtt_cli);
315 			if (rc != 0) {
316 				LOG_ERR("%s error: %d", "subscribe: mqtt_input", rc);
317 				goto subscribe_error;
318 			}
319 		} else if (rc < 0) {
320 			goto subscribe_error;
321 		}
322 
323 		/* No suback, fail */
324 		if (sh->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
325 			goto subscribe_error;
326 		}
327 
328 		LOG_DBG("Scheduling MQTT process work");
329 		(void)sh_mqtt_work_reschedule(&sh->process_dwork, PROCESS_INTERVAL);
330 		sh_mqtt_context_unlock();
331 
332 		LOG_INF("Logs will be published to: %s", sh->pub_topic);
333 		LOG_INF("Subscribing shell cmds from: %s", sh->sub_topic);
334 
335 		return;
336 	}
337 
338 subscribe_error:
339 	LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "subscribe");
340 	sh_mqtt_close_and_cleanup(sh);
341 	(void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
342 	sh_mqtt_context_unlock();
343 }
344 
345 /* Work routine to connect to MQTT */
sh_mqtt_connect_handler(struct k_work * work)346 static void sh_mqtt_connect_handler(struct k_work *work)
347 {
348 	ARG_UNUSED(work);
349 	struct shell_mqtt *sh = sh_mqtt;
350 	int rc;
351 
352 	if (sh->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
353 		LOG_DBG("%s_work while %s", "connect", "network disconnected");
354 		return;
355 	}
356 
357 	/* If context can't be locked, that means net conn cb locked it */
358 	if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
359 		/* In that case we should simply return */
360 		LOG_DBG("%s_work unable to lock context", "connect");
361 		return;
362 	}
363 
364 	if (sh->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) {
365 		__ASSERT(0, "MQTT shouldn't be already connected");
366 		LOG_ERR("MQTT shouldn't be already connected");
367 		goto connect_error;
368 	}
369 
370 	/* Resolve the broker URL */
371 	LOG_DBG("Resolving DNS");
372 	rc = get_mqtt_broker_addrinfo(sh);
373 	if (rc != 0) {
374 		(void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
375 		sh_mqtt_context_unlock();
376 		return;
377 	}
378 
379 	LOG_DBG("Initializing MQTT client");
380 	broker_init(sh);
381 	client_init(sh);
382 
383 	/* Try to connect to mqtt */
384 	LOG_DBG("Connecting to MQTT broker");
385 	rc = mqtt_connect(&sh->mqtt_cli);
386 	if (rc != 0) {
387 		LOG_ERR("%s error: %d", "mqtt_connect", rc);
388 		goto connect_error;
389 	}
390 
391 	/* Prepare port config */
392 	LOG_DBG("Preparing socket");
393 	prepare_fds(sh);
394 
395 	/* Wait for mqtt's connack */
396 	LOG_DBG("Listening to socket");
397 	rc = wait(sh, CONNECT_TIMEOUT_MS);
398 	if (rc > 0) {
399 		LOG_DBG("Process socket for MQTT packet");
400 		rc = mqtt_input(&sh->mqtt_cli);
401 		if (rc != 0) {
402 			LOG_ERR("%s error: %d", "connect: mqtt_input", rc);
403 			goto connect_error;
404 		}
405 	} else if (rc < 0) {
406 		goto connect_error;
407 	}
408 
409 	/* No connack, fail */
410 	if (sh->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
411 		goto connect_error;
412 	}
413 
414 	LOG_DBG("Scheduling %s work", "subscribe");
415 	(void)sh_mqtt_work_reschedule(&sh->subscribe_dwork, PROCESS_INTERVAL);
416 	sh_mqtt_context_unlock();
417 	return;
418 
419 connect_error:
420 	LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect");
421 	sh_mqtt_close_and_cleanup(sh);
422 	(void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
423 	sh_mqtt_context_unlock();
424 }
425 
sh_mqtt_publish(struct shell_mqtt * sh,uint8_t * data,uint32_t len)426 static int sh_mqtt_publish(struct shell_mqtt *sh, uint8_t *data, uint32_t len)
427 {
428 	sh->pub_data.message.payload.data = data;
429 	sh->pub_data.message.payload.len = len;
430 	sh->pub_data.message_id++;
431 
432 	return mqtt_publish(&sh->mqtt_cli, &sh->pub_data);
433 }
434 
sh_mqtt_publish_tx_buf(struct shell_mqtt * sh,bool is_work)435 static int sh_mqtt_publish_tx_buf(struct shell_mqtt *sh, bool is_work)
436 {
437 	int rc;
438 
439 	rc = sh_mqtt_publish(sh, &sh->tx_buf.buf[0], sh->tx_buf.len);
440 	memset(&sh->tx_buf, 0, sizeof(sh->tx_buf));
441 	if (rc != 0) {
442 		LOG_ERR("MQTT publish error: %d", rc);
443 		return rc;
444 	}
445 
446 	/* Arbitrary delay to not kill the session */
447 	if (!is_work) {
448 		k_sleep(MQTT_SEND_DELAY_MS);
449 	}
450 
451 	return rc;
452 }
453 
sh_mqtt_publish_handler(struct k_work * work)454 static void sh_mqtt_publish_handler(struct k_work *work)
455 {
456 	ARG_UNUSED(work);
457 	struct shell_mqtt *sh = sh_mqtt;
458 	int rc;
459 
460 	(void)sh_mqtt_context_lock(K_FOREVER);
461 
462 	rc = sh_mqtt_publish_tx_buf(sh, true);
463 	if (rc != 0) {
464 		LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "publish");
465 		sh_mqtt_close_and_cleanup(sh);
466 		(void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
467 	}
468 
469 	sh_mqtt_context_unlock();
470 }
471 
cancel_dworks(struct shell_mqtt * sh)472 static void cancel_dworks(struct shell_mqtt *sh)
473 {
474 	(void)k_work_cancel_delayable(&sh->connect_dwork);
475 	(void)k_work_cancel_delayable(&sh->subscribe_dwork);
476 	(void)k_work_cancel_delayable(&sh->process_dwork);
477 	(void)k_work_cancel_delayable(&sh->publish_dwork);
478 }
479 
net_disconnect_handler(struct k_work * work)480 static void net_disconnect_handler(struct k_work *work)
481 {
482 	ARG_UNUSED(work);
483 	struct shell_mqtt *sh = sh_mqtt;
484 
485 	LOG_WRN("Network %s", "disconnected");
486 
487 	/* Stop all possible work */
488 	(void)sh_mqtt_context_lock(K_FOREVER);
489 	sh_mqtt_close_and_cleanup(sh);
490 	sh_mqtt_context_unlock();
491 	/* If the transport was requested, the connect work will be rescheduled
492 	 * when internet is connected again
493 	 */
494 }
495 
496 /* Network connection event handler */
network_evt_handler(struct net_mgmt_event_callback * cb,uint64_t mgmt_event,struct net_if * iface)497 static void network_evt_handler(struct net_mgmt_event_callback *cb, uint64_t mgmt_event,
498 				struct net_if *iface)
499 {
500 	struct shell_mqtt *sh = sh_mqtt;
501 
502 	if (mgmt_event == NET_EVENT_L4_CONNECTED) {
503 		(void)k_work_cancel(&sh->net_disconnected_work);
504 		if (sh->network_state == SHELL_MQTT_NETWORK_DISCONNECTED) {
505 			LOG_WRN("Network %s", "connected");
506 			sh->network_state = SHELL_MQTT_NETWORK_CONNECTED;
507 			(void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
508 		}
509 	} else if ((mgmt_event == NET_EVENT_L4_DISCONNECTED) &&
510 		   (sh->network_state == SHELL_MQTT_NETWORK_CONNECTED)) {
511 		sh->network_state = SHELL_MQTT_NETWORK_DISCONNECTED;
512 		cancel_dworks(sh);
513 		(void)sh_mqtt_work_submit(&sh->net_disconnected_work);
514 	}
515 }
516 
mqtt_evt_handler(struct mqtt_client * const client,const struct mqtt_evt * evt)517 static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt)
518 {
519 	struct shell_mqtt *sh = sh_mqtt;
520 
521 	switch (evt->type) {
522 	case MQTT_EVT_CONNACK:
523 		if (evt->result != 0) {
524 			sh->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
525 			LOG_ERR("MQTT %s %d", "connect failed", evt->result);
526 			break;
527 		}
528 
529 		sh->transport_state = SHELL_MQTT_TRANSPORT_CONNECTED;
530 		LOG_WRN("MQTT %s", "client connected!");
531 		break;
532 
533 	case MQTT_EVT_SUBACK:
534 		if (evt->result != 0) {
535 			LOG_ERR("MQTT subscribe: %s", "error");
536 			sh->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
537 			break;
538 		}
539 
540 		LOG_WRN("MQTT subscribe: %s", "ok");
541 		sh->subscribe_state = SHELL_MQTT_SUBSCRIBED;
542 		break;
543 
544 	case MQTT_EVT_UNSUBACK:
545 		LOG_DBG("UNSUBACK packet id: %u", evt->param.suback.message_id);
546 		sh->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
547 		break;
548 
549 	case MQTT_EVT_DISCONNECT:
550 		LOG_WRN("MQTT disconnected: %d", evt->result);
551 		sh->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
552 		sh->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
553 		break;
554 
555 	case MQTT_EVT_PUBLISH: {
556 		const struct mqtt_publish_param *pub = &evt->param.publish;
557 		uint32_t payload_left;
558 		size_t size;
559 		int rc;
560 
561 		payload_left = pub->message.payload.len;
562 
563 		LOG_DBG("MQTT publish received %d, %d bytes", evt->result, payload_left);
564 		LOG_DBG("   id: %d, qos: %d", pub->message_id, pub->message.topic.qos);
565 		LOG_DBG("   item: %s", pub->message.topic.topic.utf8);
566 
567 		/* For MQTT_QOS_0_AT_MOST_ONCE no acknowledgment needed */
568 		if (pub->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
569 			struct mqtt_puback_param puback = { .message_id = pub->message_id };
570 
571 			(void)mqtt_publish_qos1_ack(client, &puback);
572 		}
573 
574 		while (payload_left > 0) {
575 			/* Attempt to claim `payload_left` bytes of buffer in rb */
576 			size = (size_t)ring_buf_put_claim(&sh->rx_rb, &sh->rx_rb_ptr,
577 							  payload_left);
578 			/* Read `size` bytes of payload from mqtt */
579 			rc = mqtt_read_publish_payload_blocking(client, sh->rx_rb_ptr, size);
580 
581 			/* errno value, return */
582 			if (rc < 0) {
583 				ring_buf_reset(&sh->rx_rb);
584 				return;
585 			}
586 
587 			size = (size_t)rc;
588 			/* Indicate that `size` bytes of payload has been written into rb */
589 			(void)ring_buf_put_finish(&sh->rx_rb, size);
590 			/* Update `payload_left` */
591 			payload_left -= size;
592 			/* Tells the shell that we have new data for it */
593 			sh->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh->shell_context);
594 			/* Arbitrary sleep for the shell to do its thing */
595 			(void)k_msleep(100);
596 		}
597 
598 		/* Shell won't execute the cmds without \r\n */
599 		while (true) {
600 			/* Check if rb's free space is enough to fit in \r\n */
601 			size = ring_buf_space_get(&sh->rx_rb);
602 			if (size >= sizeof("\r\n")) {
603 				(void)ring_buf_put(&sh->rx_rb, "\r\n", sizeof("\r\n"));
604 				break;
605 			}
606 			/* Arbitrary sleep for the shell to do its thing */
607 			(void)k_msleep(100);
608 		}
609 
610 		sh->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh->shell_context);
611 		break;
612 	}
613 
614 	case MQTT_EVT_PUBACK:
615 		if (evt->result != 0) {
616 			LOG_ERR("MQTT PUBACK error %d", evt->result);
617 			break;
618 		}
619 
620 		LOG_DBG("PUBACK packet id: %u", evt->param.puback.message_id);
621 		break;
622 
623 	case MQTT_EVT_PINGRESP:
624 		LOG_DBG("PINGRESP packet");
625 		break;
626 
627 	default:
628 		LOG_DBG("MQTT event received %d", evt->type);
629 		break;
630 	}
631 }
632 
init(const struct shell_transport * transport,const void * config,shell_transport_handler_t evt_handler,void * context)633 static int init(const struct shell_transport *transport, const void *config,
634 		shell_transport_handler_t evt_handler, void *context)
635 {
636 	sh_mqtt = (struct shell_mqtt *)transport->ctx;
637 	struct shell_mqtt *sh = sh_mqtt;
638 
639 	(void)memset(sh, 0, sizeof(struct shell_mqtt));
640 
641 	(void)k_mutex_init(&sh->lock);
642 
643 	if (!shell_mqtt_get_devid(sh->device_id, DEVICE_ID_HEX_MAX_SIZE)) {
644 		LOG_ERR("Unable to get device identity, using dummy value");
645 		(void)snprintf(sh->device_id, sizeof("dummy"), "dummy");
646 	}
647 
648 	LOG_DBG("Client ID is %s", sh->device_id);
649 
650 	(void)snprintf(sh->pub_topic, SH_MQTT_TOPIC_TX_MAX_SIZE, "%s" CONFIG_SHELL_MQTT_TOPIC_TX_ID,
651 		       sh->device_id);
652 	(void)snprintf(sh->sub_topic, SH_MQTT_TOPIC_RX_MAX_SIZE, "%s" CONFIG_SHELL_MQTT_TOPIC_RX_ID,
653 		       sh->device_id);
654 
655 	ring_buf_init(&sh->rx_rb, RX_RB_SIZE, sh->rx_rb_buf);
656 
657 	LOG_DBG("Initializing shell MQTT backend");
658 
659 	sh->shell_handler = evt_handler;
660 	sh->shell_context = context;
661 
662 	sh->pub_data.message.topic.qos = MQTT_QOS_0_AT_MOST_ONCE;
663 	sh->pub_data.message.topic.topic.utf8 = (uint8_t *)sh->pub_topic;
664 	sh->pub_data.message.topic.topic.size =
665 		strlen(sh->pub_data.message.topic.topic.utf8);
666 	sh->pub_data.dup_flag = 0U;
667 	sh->pub_data.retain_flag = 0U;
668 
669 	/* Initialize the work queue */
670 	k_work_queue_init(&sh->workq);
671 	k_work_queue_start(&sh->workq, sh_mqtt_workq_stack,
672 			   K_KERNEL_STACK_SIZEOF(sh_mqtt_workq_stack), K_PRIO_COOP(7), NULL);
673 	(void)k_thread_name_set(&sh->workq.thread, "sh_mqtt_workq");
674 	k_work_init(&sh->net_disconnected_work, net_disconnect_handler);
675 	k_work_init_delayable(&sh->connect_dwork, sh_mqtt_connect_handler);
676 	k_work_init_delayable(&sh->subscribe_dwork, sh_mqtt_subscribe_handler);
677 	k_work_init_delayable(&sh->process_dwork, sh_mqtt_process_handler);
678 	k_work_init_delayable(&sh->publish_dwork, sh_mqtt_publish_handler);
679 
680 	LOG_DBG("Initializing listener for network");
681 	net_mgmt_init_event_callback(&sh->mgmt_cb, network_evt_handler, NET_EVENT_MASK);
682 
683 	sh->network_state = SHELL_MQTT_NETWORK_DISCONNECTED;
684 	sh->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
685 	sh->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
686 
687 	return 0;
688 }
689 
uninit(const struct shell_transport * transport)690 static int uninit(const struct shell_transport *transport)
691 {
692 	ARG_UNUSED(transport);
693 	struct shell_mqtt *sh = sh_mqtt;
694 
695 	/* Not initialized yet */
696 	if (sh == NULL) {
697 		return -ENODEV;
698 	}
699 
700 	return 0;
701 }
702 
enable(const struct shell_transport * transport,bool blocking)703 static int enable(const struct shell_transport *transport, bool blocking)
704 {
705 	ARG_UNUSED(transport);
706 	ARG_UNUSED(blocking);
707 	struct shell_mqtt *sh = sh_mqtt;
708 
709 	/* Not initialized yet */
710 	if (sh == NULL) {
711 		return -ENODEV;
712 	}
713 
714 	/* Listen for network connection status */
715 	net_mgmt_add_event_callback(&sh->mgmt_cb);
716 	conn_mgr_mon_resend_status();
717 
718 	return 0;
719 }
720 
write_data(const struct shell_transport * transport,const void * data,size_t length,size_t * cnt)721 static int write_data(const struct shell_transport *transport, const void *data, size_t length,
722 		      size_t *cnt)
723 {
724 	ARG_UNUSED(transport);
725 	struct shell_mqtt *sh = sh_mqtt;
726 	int rc = 0;
727 	struct k_work_sync ws;
728 	size_t copy_len;
729 
730 	*cnt = 0;
731 
732 	/* Not initialized yet */
733 	if (sh == NULL) {
734 		return -ENODEV;
735 	}
736 
737 	/* Not connected to broker */
738 	if (sh->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
739 		goto out;
740 	}
741 
742 	(void)k_work_cancel_delayable_sync(&sh->publish_dwork, &ws);
743 
744 	do {
745 		if ((sh->tx_buf.len + length - *cnt) > TX_BUF_SIZE) {
746 			copy_len = TX_BUF_SIZE - sh->tx_buf.len;
747 		} else {
748 			copy_len = length - *cnt;
749 		}
750 
751 		memcpy(sh->tx_buf.buf + sh->tx_buf.len, (uint8_t *)data + *cnt, copy_len);
752 		sh->tx_buf.len += copy_len;
753 
754 		/* Send the data immediately if the buffer is full */
755 		if (sh->tx_buf.len == TX_BUF_SIZE) {
756 			rc = sh_mqtt_publish_tx_buf(sh, false);
757 			if (rc != 0) {
758 				sh_mqtt_close_and_cleanup(sh);
759 				(void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
760 				*cnt = length;
761 				return rc;
762 			}
763 		}
764 
765 		*cnt += copy_len;
766 	} while (*cnt < length);
767 
768 	if (sh->tx_buf.len > 0) {
769 		(void)sh_mqtt_work_reschedule(&sh->publish_dwork, MQTT_SEND_DELAY_MS);
770 	}
771 
772 	/* Inform shell that it is ready for next TX */
773 	sh->shell_handler(SHELL_TRANSPORT_EVT_TX_RDY, sh->shell_context);
774 
775 out:
776 	/* We will always assume that we sent everything */
777 	*cnt = length;
778 	return rc;
779 }
780 
read_data(const struct shell_transport * transport,void * data,size_t length,size_t * cnt)781 static int read_data(const struct shell_transport *transport, void *data, size_t length,
782 		     size_t *cnt)
783 {
784 	ARG_UNUSED(transport);
785 	struct shell_mqtt *sh = sh_mqtt;
786 
787 	/* Not initialized yet */
788 	if (sh == NULL) {
789 		return -ENODEV;
790 	}
791 
792 	/* Not subscribed yet */
793 	if (sh->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
794 		*cnt = 0;
795 		return 0;
796 	}
797 
798 	*cnt = ring_buf_get(&sh->rx_rb, data, length);
799 
800 	/* Inform the shell if there are still data in the rb */
801 	if (ring_buf_size_get(&sh->rx_rb) > 0) {
802 		sh->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh->shell_context);
803 	}
804 
805 	return 0;
806 }
807 
808 const struct shell_transport_api shell_mqtt_transport_api = { .init = init,
809 							      .uninit = uninit,
810 							      .enable = enable,
811 							      .write = write_data,
812 							      .read = read_data };
813 
enable_shell_mqtt(void)814 static int enable_shell_mqtt(void)
815 {
816 
817 	bool log_backend = CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > 0;
818 	uint32_t level = (CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > LOG_LEVEL_DBG) ?
819 				       CONFIG_LOG_MAX_LEVEL :
820 				       CONFIG_SHELL_MQTT_INIT_LOG_LEVEL;
821 	static const struct shell_backend_config_flags cfg_flags = {
822 		.insert_mode = 0,
823 		.echo = 0,
824 		.obscure = 0,
825 		.mode_delete = 0,
826 		.use_colors = 0,
827 		.use_vt100 = 0,
828 	};
829 
830 	return shell_init(&shell_mqtt, NULL, cfg_flags, log_backend, level);
831 }
832 
833 /* Function is used for testing purposes */
shell_backend_mqtt_get_ptr(void)834 const struct shell *shell_backend_mqtt_get_ptr(void)
835 {
836 	return &shell_mqtt;
837 }
838 
839 SYS_INIT(enable_shell_mqtt, APPLICATION, CONFIG_APPLICATION_INIT_PRIORITY);
840