1 /*
2  * Copyright (c) 2022 G-Technologies Sdn. Bhd.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/kernel.h>
8 #include <zephyr/shell/shell_mqtt.h>
9 #include <zephyr/init.h>
10 #include <zephyr/logging/log.h>
11 #include <string.h>
12 #include <stdio.h>
13 #include <zephyr/drivers/hwinfo.h>
14 
15 SHELL_MQTT_DEFINE(shell_transport_mqtt);
16 SHELL_DEFINE(shell_mqtt, "", &shell_transport_mqtt,
17 	     CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_SIZE,
18 	     CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_TIMEOUT, SHELL_FLAG_OLF_CRLF);
19 
20 LOG_MODULE_REGISTER(shell_mqtt, CONFIG_SHELL_MQTT_LOG_LEVEL);
21 
22 #define NET_EVENT_MASK (NET_EVENT_L4_CONNECTED | NET_EVENT_L4_DISCONNECTED)
23 #define CONNECT_TIMEOUT_MS 2000
24 #define LISTEN_TIMEOUT_MS 500
25 #define MQTT_SEND_DELAY_MS K_MSEC(100)
26 #define PROCESS_INTERVAL K_SECONDS(2)
27 #define SHELL_MQTT_WORKQ_STACK_SIZE 2048
28 
29 #ifdef CONFIG_SHELL_MQTT_SERVER_USERNAME
30 #define MQTT_USERNAME CONFIG_SHELL_MQTT_SERVER_USERNAME
31 #else
32 #define MQTT_USERNAME NULL
33 #endif /* CONFIG_SHELL_MQTT_SERVER_USERNAME */
34 
35 #ifdef CONFIG_SHELL_MQTT_SERVER_PASSWORD
36 #define MQTT_PASSWORD CONFIG_SHELL_MQTT_SERVER_PASSWORD
37 #else
38 #define MQTT_PASSWORD NULL
39 #endif /*SHELL_MQTT_SERVER_PASSWORD */
40 
41 struct shell_mqtt *sh_mqtt;
42 K_KERNEL_STACK_DEFINE(sh_mqtt_workq_stack, SHELL_MQTT_WORKQ_STACK_SIZE);
43 
44 static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt);
45 
sh_mqtt_work_reschedule(struct k_work_delayable * dwork,k_timeout_t delay)46 static inline int sh_mqtt_work_reschedule(struct k_work_delayable *dwork, k_timeout_t delay)
47 {
48 	return k_work_reschedule_for_queue(&sh_mqtt->workq, dwork, delay);
49 }
50 
sh_mqtt_work_submit(struct k_work * work)51 static inline int sh_mqtt_work_submit(struct k_work *work)
52 {
53 	return k_work_submit_to_queue(&sh_mqtt->workq, work);
54 }
55 
56 /* Lock the context of the shell mqtt */
sh_mqtt_context_lock(k_timeout_t timeout)57 static inline int sh_mqtt_context_lock(k_timeout_t timeout)
58 {
59 	return k_mutex_lock(&sh_mqtt->lock, timeout);
60 }
61 
62 /* Unlock the context of the shell mqtt */
sh_mqtt_context_unlock(void)63 static inline void sh_mqtt_context_unlock(void)
64 {
65 	(void)k_mutex_unlock(&sh_mqtt->lock);
66 }
67 
sh_mqtt_rx_rb_flush(void)68 static void sh_mqtt_rx_rb_flush(void)
69 {
70 	uint8_t c;
71 	uint32_t size = ring_buf_size_get(&sh_mqtt->rx_rb);
72 
73 	while (size > 0) {
74 		size = ring_buf_get(&sh_mqtt->rx_rb, &c, 1U);
75 	}
76 }
77 
shell_mqtt_get_devid(char * id,int id_max_len)78 bool __weak shell_mqtt_get_devid(char *id, int id_max_len)
79 {
80 	uint8_t hwinfo_id[DEVICE_ID_BIN_MAX_SIZE];
81 	ssize_t length;
82 
83 	length = hwinfo_get_device_id(hwinfo_id, DEVICE_ID_BIN_MAX_SIZE);
84 	if (length <= 0) {
85 		return false;
86 	}
87 
88 	(void)memset(id, 0, id_max_len);
89 	length = bin2hex(hwinfo_id, (size_t)length, id, id_max_len);
90 
91 	return length > 0;
92 }
93 
prepare_fds(void)94 static void prepare_fds(void)
95 {
96 	if (sh_mqtt->mqtt_cli.transport.type == MQTT_TRANSPORT_NON_SECURE) {
97 		sh_mqtt->fds[0].fd = sh_mqtt->mqtt_cli.transport.tcp.sock;
98 	}
99 
100 	sh_mqtt->fds[0].events = ZSOCK_POLLIN;
101 	sh_mqtt->nfds = 1;
102 }
103 
clear_fds(void)104 static void clear_fds(void)
105 {
106 	sh_mqtt->nfds = 0;
107 }
108 
109 /*
110  * Upon successful completion, poll() shall return a non-negative value. A positive value indicates
111  * the total number of pollfd structures that have selected events (that is, those for which the
112  * revents member is non-zero). A value of 0 indicates that the call timed out and no file
113  * descriptors have been selected. Upon failure, poll() shall return -1 and set errno to indicate
114  * the error.
115  */
wait(int timeout)116 static int wait(int timeout)
117 {
118 	int rc = 0;
119 
120 	if (sh_mqtt->nfds > 0) {
121 		rc = zsock_poll(sh_mqtt->fds, sh_mqtt->nfds, timeout);
122 		if (rc < 0) {
123 			LOG_ERR("poll error: %d", errno);
124 		}
125 	}
126 
127 	return rc;
128 }
129 
130 /* Query IP address for the broker URL */
get_mqtt_broker_addrinfo(void)131 static int get_mqtt_broker_addrinfo(void)
132 {
133 	int rc;
134 	struct zsock_addrinfo hints = { .ai_family = AF_INET,
135 					.ai_socktype = SOCK_STREAM,
136 					.ai_protocol = 0 };
137 
138 	if (sh_mqtt->haddr != NULL) {
139 		zsock_freeaddrinfo(sh_mqtt->haddr);
140 	}
141 
142 	rc = zsock_getaddrinfo(CONFIG_SHELL_MQTT_SERVER_ADDR,
143 			       STRINGIFY(CONFIG_SHELL_MQTT_SERVER_PORT), &hints, &sh_mqtt->haddr);
144 	if (rc == 0) {
145 		LOG_INF("DNS%s resolved for %s:%d", "", CONFIG_SHELL_MQTT_SERVER_ADDR,
146 			CONFIG_SHELL_MQTT_SERVER_PORT);
147 
148 		return 0;
149 	}
150 
151 	LOG_ERR("DNS%s resolved for %s:%d, retrying", " not", CONFIG_SHELL_MQTT_SERVER_ADDR,
152 		CONFIG_SHELL_MQTT_SERVER_PORT);
153 
154 	return rc;
155 }
156 
157 /* Close MQTT connection properly and cleanup socket */
sh_mqtt_close_and_cleanup(void)158 static void sh_mqtt_close_and_cleanup(void)
159 {
160 	/* Initialize to negative value so that the mqtt_abort case can run */
161 	int rc = -1;
162 
163 	/* If both network & mqtt connected, mqtt_disconnect will send a
164 	 * disconnection packet to the broker, it will invoke
165 	 * mqtt_evt_handler:MQTT_EVT_DISCONNECT if success
166 	 */
167 	if ((sh_mqtt->network_state == SHELL_MQTT_NETWORK_CONNECTED) &&
168 	    (sh_mqtt->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED)) {
169 		rc = mqtt_disconnect(&sh_mqtt->mqtt_cli);
170 	}
171 
172 	/* If network/mqtt disconnected, or mqtt_disconnect failed, do mqtt_abort */
173 	if (rc < 0) {
174 		/* mqtt_abort doesn't send disconnection packet to the broker, but it
175 		 * makes sure that the MQTT connection is aborted locally and will
176 		 * always invoke mqtt_evt_handler:MQTT_EVT_DISCONNECT
177 		 */
178 		(void)mqtt_abort(&sh_mqtt->mqtt_cli);
179 	}
180 
181 	/* Cleanup socket */
182 	clear_fds();
183 }
184 
broker_init(void)185 static void broker_init(void)
186 {
187 	struct sockaddr_in *broker4 = (struct sockaddr_in *)&sh_mqtt->broker;
188 
189 	broker4->sin_family = AF_INET;
190 	broker4->sin_port = htons(CONFIG_SHELL_MQTT_SERVER_PORT);
191 
192 	net_ipaddr_copy(&broker4->sin_addr, &net_sin(sh_mqtt->haddr->ai_addr)->sin_addr);
193 }
194 
client_init(void)195 static void client_init(void)
196 {
197 	static struct mqtt_utf8 password;
198 	static struct mqtt_utf8 username;
199 
200 	password.utf8 = (uint8_t *)MQTT_PASSWORD;
201 	password.size = strlen(MQTT_PASSWORD);
202 	username.utf8 = (uint8_t *)MQTT_USERNAME;
203 	username.size = strlen(MQTT_USERNAME);
204 
205 	mqtt_client_init(&sh_mqtt->mqtt_cli);
206 
207 	/* MQTT client configuration */
208 	sh_mqtt->mqtt_cli.broker = &sh_mqtt->broker;
209 	sh_mqtt->mqtt_cli.evt_cb = mqtt_evt_handler;
210 	sh_mqtt->mqtt_cli.client_id.utf8 = (uint8_t *)sh_mqtt->device_id;
211 	sh_mqtt->mqtt_cli.client_id.size = strlen(sh_mqtt->device_id);
212 	sh_mqtt->mqtt_cli.password = &password;
213 	sh_mqtt->mqtt_cli.user_name = &username;
214 	sh_mqtt->mqtt_cli.protocol_version = MQTT_VERSION_3_1_1;
215 
216 	/* MQTT buffers configuration */
217 	sh_mqtt->mqtt_cli.rx_buf = sh_mqtt->buf.rx;
218 	sh_mqtt->mqtt_cli.rx_buf_size = sizeof(sh_mqtt->buf.rx);
219 	sh_mqtt->mqtt_cli.tx_buf = sh_mqtt->buf.tx;
220 	sh_mqtt->mqtt_cli.tx_buf_size = sizeof(sh_mqtt->buf.tx);
221 
222 	/* MQTT transport configuration */
223 	sh_mqtt->mqtt_cli.transport.type = MQTT_TRANSPORT_NON_SECURE;
224 }
225 
226 /* Work routine to process MQTT packet and keep alive MQTT connection */
sh_mqtt_process_handler(struct k_work * work)227 static void sh_mqtt_process_handler(struct k_work *work)
228 {
229 	ARG_UNUSED(work);
230 	int rc;
231 	int64_t remaining = LISTEN_TIMEOUT_MS;
232 	int64_t start_time = k_uptime_get();
233 
234 	if (sh_mqtt->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
235 		LOG_DBG("%s_work while %s", "process", "network disconnected");
236 		return;
237 	}
238 
239 	/* If context can't be locked, that means net conn cb locked it */
240 	if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
241 		/* In that case we should simply return */
242 		LOG_DBG("%s_work unable to lock context", "process");
243 		return;
244 	}
245 
246 	if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
247 		LOG_DBG("MQTT %s", "not connected");
248 		goto process_error;
249 	}
250 
251 	if (sh_mqtt->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
252 		LOG_DBG("%s_work while %s", "process", "MQTT not subscribed");
253 		goto process_error;
254 	}
255 
256 	LOG_DBG("MQTT %s", "Processing");
257 	/* Listen to the port for a duration defined by LISTEN_TIMEOUT_MS */
258 	while ((remaining > 0) && (sh_mqtt->network_state == SHELL_MQTT_NETWORK_CONNECTED) &&
259 	       (sh_mqtt->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) &&
260 	       (sh_mqtt->subscribe_state == SHELL_MQTT_SUBSCRIBED)) {
261 		LOG_DBG("Listening to socket");
262 		rc = wait(remaining);
263 		if (rc > 0) {
264 			LOG_DBG("Process socket for MQTT packet");
265 			rc = mqtt_input(&sh_mqtt->mqtt_cli);
266 			if (rc != 0) {
267 				LOG_ERR("%s error: %d", "processed: mqtt_input", rc);
268 				goto process_error;
269 			}
270 		} else if (rc < 0) {
271 			goto process_error;
272 		}
273 
274 		LOG_DBG("MQTT %s", "Keepalive");
275 		rc = mqtt_live(&sh_mqtt->mqtt_cli);
276 		if ((rc != 0) && (rc != -EAGAIN)) {
277 			LOG_ERR("%s error: %d", "mqtt_live", rc);
278 			goto process_error;
279 		}
280 
281 		remaining = LISTEN_TIMEOUT_MS + start_time - k_uptime_get();
282 	}
283 
284 	/* Reschedule the process work */
285 	LOG_DBG("Scheduling %s work", "process");
286 	(void)sh_mqtt_work_reschedule(&sh_mqtt->process_dwork, K_SECONDS(2));
287 	sh_mqtt_context_unlock();
288 	return;
289 
290 process_error:
291 	LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect");
292 	sh_mqtt_close_and_cleanup();
293 	(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(1));
294 	sh_mqtt_context_unlock();
295 }
296 
sh_mqtt_subscribe_handler(struct k_work * work)297 static void sh_mqtt_subscribe_handler(struct k_work *work)
298 {
299 	ARG_UNUSED(work);
300 	/* Subscribe config information */
301 	struct mqtt_topic subs_topic = { .topic = { .utf8 = sh_mqtt->sub_topic,
302 						    .size = strlen(sh_mqtt->sub_topic) },
303 					 .qos = MQTT_QOS_1_AT_LEAST_ONCE };
304 	const struct mqtt_subscription_list subs_list = { .list = &subs_topic,
305 							  .list_count = 1U,
306 							  .message_id = 1U };
307 	int rc;
308 
309 	if (sh_mqtt->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
310 		LOG_DBG("%s_work while %s", "subscribe", "network disconnected");
311 		return;
312 	}
313 
314 	/* If context can't be locked, that means net conn cb locked it */
315 	if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
316 		/* In that case we should simply return */
317 		LOG_DBG("%s_work unable to lock context", "subscribe");
318 		return;
319 	}
320 
321 	if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
322 		LOG_DBG("%s_work while %s", "subscribe", "transport disconnected");
323 		goto subscribe_error;
324 	}
325 
326 	rc = mqtt_subscribe(&sh_mqtt->mqtt_cli, &subs_list);
327 	if (rc == 0) {
328 		/* Wait for mqtt's connack */
329 		LOG_DBG("Listening to socket");
330 		rc = wait(CONNECT_TIMEOUT_MS);
331 		if (rc > 0) {
332 			LOG_DBG("Process socket for MQTT packet");
333 			rc = mqtt_input(&sh_mqtt->mqtt_cli);
334 			if (rc != 0) {
335 				LOG_ERR("%s error: %d", "subscribe: mqtt_input", rc);
336 				goto subscribe_error;
337 			}
338 		} else if (rc < 0) {
339 			goto subscribe_error;
340 		}
341 
342 		/* No suback, fail */
343 		if (sh_mqtt->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
344 			goto subscribe_error;
345 		}
346 
347 		LOG_DBG("Scheduling MQTT process work");
348 		(void)sh_mqtt_work_reschedule(&sh_mqtt->process_dwork, PROCESS_INTERVAL);
349 		sh_mqtt_context_unlock();
350 
351 		LOG_INF("Logs will be published to: %s", sh_mqtt->pub_topic);
352 		LOG_INF("Subscribing shell cmds from: %s", sh_mqtt->sub_topic);
353 
354 		return;
355 	}
356 
357 subscribe_error:
358 	LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "subscribe");
359 	sh_mqtt_close_and_cleanup();
360 	(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(2));
361 	sh_mqtt_context_unlock();
362 }
363 
364 /* Work routine to connect to MQTT */
sh_mqtt_connect_handler(struct k_work * work)365 static void sh_mqtt_connect_handler(struct k_work *work)
366 {
367 	ARG_UNUSED(work);
368 	int rc;
369 
370 	if (sh_mqtt->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
371 		LOG_DBG("%s_work while %s", "connect", "network disconnected");
372 		return;
373 	}
374 
375 	/* If context can't be locked, that means net conn cb locked it */
376 	if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
377 		/* In that case we should simply return */
378 		LOG_DBG("%s_work unable to lock context", "connect");
379 		return;
380 	}
381 
382 	if (sh_mqtt->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) {
383 		__ASSERT(0, "MQTT shouldn't be already connected");
384 		LOG_ERR("MQTT shouldn't be already connected");
385 		goto connect_error;
386 	}
387 
388 	/* Resolve the broker URL */
389 	LOG_DBG("Resolving DNS");
390 	rc = get_mqtt_broker_addrinfo();
391 	if (rc != 0) {
392 		(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(1));
393 		sh_mqtt_context_unlock();
394 		return;
395 	}
396 
397 	LOG_DBG("Initializing MQTT client");
398 	broker_init();
399 	client_init();
400 
401 	/* Try to connect to mqtt */
402 	LOG_DBG("Connecting to MQTT broker");
403 	rc = mqtt_connect(&sh_mqtt->mqtt_cli);
404 	if (rc != 0) {
405 		LOG_ERR("%s error: %d", "mqtt_connect", rc);
406 		goto connect_error;
407 	}
408 
409 	/* Prepare port config */
410 	LOG_DBG("Preparing socket");
411 	prepare_fds();
412 
413 	/* Wait for mqtt's connack */
414 	LOG_DBG("Listening to socket");
415 	rc = wait(CONNECT_TIMEOUT_MS);
416 	if (rc > 0) {
417 		LOG_DBG("Process socket for MQTT packet");
418 		rc = mqtt_input(&sh_mqtt->mqtt_cli);
419 		if (rc != 0) {
420 			LOG_ERR("%s error: %d", "connect: mqtt_input", rc);
421 			goto connect_error;
422 		}
423 	} else if (rc < 0) {
424 		goto connect_error;
425 	}
426 
427 	/* No connack, fail */
428 	if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
429 		goto connect_error;
430 	}
431 
432 	LOG_DBG("Scheduling %s work", "subscribe");
433 	(void)sh_mqtt_work_reschedule(&sh_mqtt->subscribe_dwork, K_SECONDS(2));
434 	sh_mqtt_context_unlock();
435 	return;
436 
437 connect_error:
438 	LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect");
439 	sh_mqtt_close_and_cleanup();
440 	(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(2));
441 	sh_mqtt_context_unlock();
442 }
443 
sh_mqtt_publish(uint8_t * data,uint32_t len)444 static int sh_mqtt_publish(uint8_t *data, uint32_t len)
445 {
446 	sh_mqtt->pub_data.message.payload.data = data;
447 	sh_mqtt->pub_data.message.payload.len = len;
448 	sh_mqtt->pub_data.message_id++;
449 
450 	return mqtt_publish(&sh_mqtt->mqtt_cli, &sh_mqtt->pub_data);
451 }
452 
sh_mqtt_publish_tx_buf(bool is_work)453 static int sh_mqtt_publish_tx_buf(bool is_work)
454 {
455 	int rc;
456 
457 	rc = sh_mqtt_publish(&sh_mqtt->tx_buf.buf[0], sh_mqtt->tx_buf.len);
458 	memset(&sh_mqtt->tx_buf, 0, sizeof(sh_mqtt->tx_buf));
459 	if (rc != 0) {
460 		LOG_ERR("MQTT publish error: %d", rc);
461 		return rc;
462 	}
463 
464 	/* Arbitrary delay to not kill the session */
465 	if (!is_work) {
466 		k_sleep(MQTT_SEND_DELAY_MS);
467 	}
468 
469 	return rc;
470 }
471 
sh_mqtt_publish_handler(struct k_work * work)472 static void sh_mqtt_publish_handler(struct k_work *work)
473 {
474 	ARG_UNUSED(work);
475 	int rc;
476 
477 	(void)sh_mqtt_context_lock(K_FOREVER);
478 
479 	rc = sh_mqtt_publish_tx_buf(true);
480 	if (rc != 0) {
481 		LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "publish");
482 		sh_mqtt_close_and_cleanup();
483 		(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(2));
484 	}
485 
486 	sh_mqtt_context_unlock();
487 }
488 
cancel_dworks_and_cleanup(void)489 static void cancel_dworks_and_cleanup(void)
490 {
491 	(void)k_work_cancel_delayable(&sh_mqtt->connect_dwork);
492 	(void)k_work_cancel_delayable(&sh_mqtt->subscribe_dwork);
493 	(void)k_work_cancel_delayable(&sh_mqtt->process_dwork);
494 	(void)k_work_cancel_delayable(&sh_mqtt->publish_dwork);
495 	sh_mqtt_close_and_cleanup();
496 }
497 
net_disconnect_handler(struct k_work * work)498 static void net_disconnect_handler(struct k_work *work)
499 {
500 	ARG_UNUSED(work);
501 
502 	LOG_WRN("Network %s", "disconnected");
503 	sh_mqtt->network_state = SHELL_MQTT_NETWORK_DISCONNECTED;
504 
505 	/* Stop all possible work */
506 	(void)sh_mqtt_context_lock(K_FOREVER);
507 	cancel_dworks_and_cleanup();
508 	sh_mqtt_context_unlock();
509 	/* If the transport was requested, the connect work will be rescheduled
510 	 * when internet is connected again
511 	 */
512 }
513 
514 /* Network connection event handler */
network_evt_handler(struct net_mgmt_event_callback * cb,uint32_t mgmt_event,struct net_if * iface)515 static void network_evt_handler(struct net_mgmt_event_callback *cb, uint32_t mgmt_event,
516 				struct net_if *iface)
517 {
518 	if ((mgmt_event == NET_EVENT_L4_CONNECTED) &&
519 	    (sh_mqtt->network_state == SHELL_MQTT_NETWORK_DISCONNECTED)) {
520 		LOG_WRN("Network %s", "connected");
521 		sh_mqtt->network_state = SHELL_MQTT_NETWORK_CONNECTED;
522 		(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork, K_SECONDS(1));
523 	} else if ((mgmt_event == NET_EVENT_L4_DISCONNECTED) &&
524 		   (sh_mqtt->network_state == SHELL_MQTT_NETWORK_CONNECTED)) {
525 		(void)sh_mqtt_work_submit(&sh_mqtt->net_disconnected_work);
526 	}
527 }
528 
mqtt_evt_handler(struct mqtt_client * const client,const struct mqtt_evt * evt)529 static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt)
530 {
531 	switch (evt->type) {
532 	case MQTT_EVT_CONNACK:
533 		if (evt->result != 0) {
534 			sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
535 			LOG_ERR("MQTT %s %d", "connect failed", evt->result);
536 			break;
537 		}
538 
539 		sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_CONNECTED;
540 		LOG_WRN("MQTT %s", "client connected!");
541 
542 		break;
543 	case MQTT_EVT_SUBACK:
544 		if (evt->result != 0) {
545 			LOG_ERR("MQTT subscribe: %s", "error");
546 			sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
547 			break;
548 		}
549 
550 		LOG_WRN("MQTT subscribe: %s", "ok");
551 		sh_mqtt->subscribe_state = SHELL_MQTT_SUBSCRIBED;
552 		break;
553 
554 	case MQTT_EVT_UNSUBACK:
555 		LOG_DBG("UNSUBACK packet id: %u", evt->param.suback.message_id);
556 		sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
557 		break;
558 
559 	case MQTT_EVT_DISCONNECT:
560 		LOG_WRN("MQTT disconnected: %d", evt->result);
561 		sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
562 		sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
563 		break;
564 
565 	case MQTT_EVT_PUBLISH: {
566 		const struct mqtt_publish_param *pub = &evt->param.publish;
567 		uint32_t payload_left;
568 		size_t size;
569 		int rc;
570 
571 		payload_left = pub->message.payload.len;
572 
573 		LOG_DBG("MQTT publish received %d, %d bytes", evt->result, payload_left);
574 		LOG_DBG("   id: %d, qos: %d", pub->message_id, pub->message.topic.qos);
575 		LOG_DBG("   item: %s", pub->message.topic.topic.utf8);
576 
577 		/* For MQTT_QOS_0_AT_MOST_ONCE no acknowledgment needed */
578 		if (pub->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
579 			struct mqtt_puback_param puback = { .message_id = pub->message_id };
580 
581 			(void)mqtt_publish_qos1_ack(client, &puback);
582 		}
583 
584 		while (payload_left > 0) {
585 			/* Attempt to claim `payload_left` bytes of buffer in rb */
586 			size = (size_t)ring_buf_put_claim(&sh_mqtt->rx_rb, &sh_mqtt->rx_rb_ptr,
587 							  payload_left);
588 			/* Read `size` bytes of payload from mqtt */
589 			rc = mqtt_read_publish_payload_blocking(client, sh_mqtt->rx_rb_ptr, size);
590 
591 			/* errno value, return */
592 			if (rc < 0) {
593 				(void)ring_buf_put_finish(&sh_mqtt->rx_rb, 0U);
594 				sh_mqtt_rx_rb_flush();
595 				return;
596 			}
597 
598 			size = (size_t)rc;
599 			/* Indicate that `size` bytes of payload has been written into rb */
600 			(void)ring_buf_put_finish(&sh_mqtt->rx_rb, size);
601 			/* Update `payload_left` */
602 			payload_left -= size;
603 			/* Tells the shell that we have new data for it */
604 			sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh_mqtt->shell_context);
605 			/* Arbitrary sleep for the shell to do its thing */
606 			(void)k_msleep(100);
607 		}
608 
609 		/* Shell won't execute the cmds without \r\n */
610 		while (true) {
611 			/* Check if rb's free space is enough to fit in \r\n */
612 			size = ring_buf_space_get(&sh_mqtt->rx_rb);
613 			if (size >= sizeof("\r\n")) {
614 				(void)ring_buf_put(&sh_mqtt->rx_rb, "\r\n", sizeof("\r\n"));
615 				break;
616 			}
617 			/* Arbitrary sleep for the shell to do its thing */
618 			(void)k_msleep(100);
619 		}
620 
621 		sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh_mqtt->shell_context);
622 		break;
623 	}
624 
625 	case MQTT_EVT_PUBACK:
626 		if (evt->result != 0) {
627 			LOG_ERR("MQTT PUBACK error %d", evt->result);
628 			break;
629 		}
630 
631 		LOG_DBG("PUBACK packet id: %u", evt->param.puback.message_id);
632 		break;
633 
634 	case MQTT_EVT_PINGRESP:
635 		LOG_DBG("PINGRESP packet");
636 		break;
637 
638 	default:
639 		LOG_DBG("MQTT event received %d", evt->type);
640 		break;
641 	}
642 }
643 
init(const struct shell_transport * transport,const void * config,shell_transport_handler_t evt_handler,void * context)644 static int init(const struct shell_transport *transport, const void *config,
645 		shell_transport_handler_t evt_handler, void *context)
646 {
647 	sh_mqtt = (struct shell_mqtt *)transport->ctx;
648 
649 	(void)memset(sh_mqtt, 0, sizeof(struct shell_mqtt));
650 
651 	(void)k_mutex_init(&sh_mqtt->lock);
652 
653 	if (!shell_mqtt_get_devid(sh_mqtt->device_id, DEVICE_ID_HEX_MAX_SIZE)) {
654 		LOG_ERR("Unable to get device identity, using dummy value");
655 		(void)snprintf(sh_mqtt->device_id, sizeof("dummy"), "dummy");
656 	}
657 
658 	LOG_DBG("Client ID is %s", sh_mqtt->device_id);
659 
660 	(void)snprintf(sh_mqtt->pub_topic, SH_MQTT_TOPIC_MAX_SIZE, "%s_tx", sh_mqtt->device_id);
661 	(void)snprintf(sh_mqtt->sub_topic, SH_MQTT_TOPIC_MAX_SIZE, "%s_rx", sh_mqtt->device_id);
662 
663 	ring_buf_init(&sh_mqtt->rx_rb, RX_RB_SIZE, sh_mqtt->rx_rb_buf);
664 
665 	LOG_DBG("Initializing shell MQTT backend");
666 
667 	sh_mqtt->shell_handler = evt_handler;
668 	sh_mqtt->shell_context = context;
669 
670 	sh_mqtt->pub_data.message.topic.qos = MQTT_QOS_0_AT_MOST_ONCE;
671 	sh_mqtt->pub_data.message.topic.topic.utf8 = (uint8_t *)sh_mqtt->pub_topic;
672 	sh_mqtt->pub_data.message.topic.topic.size =
673 		strlen(sh_mqtt->pub_data.message.topic.topic.utf8);
674 	sh_mqtt->pub_data.dup_flag = 0U;
675 	sh_mqtt->pub_data.retain_flag = 0U;
676 
677 	/* Initialize the work queue */
678 	k_work_queue_init(&sh_mqtt->workq);
679 	k_work_queue_start(&sh_mqtt->workq, sh_mqtt_workq_stack,
680 			   K_KERNEL_STACK_SIZEOF(sh_mqtt_workq_stack), K_PRIO_COOP(7), NULL);
681 	(void)k_thread_name_set(&sh_mqtt->workq.thread, "sh_mqtt_workq");
682 	k_work_init(&sh_mqtt->net_disconnected_work, net_disconnect_handler);
683 	k_work_init_delayable(&sh_mqtt->connect_dwork, sh_mqtt_connect_handler);
684 	k_work_init_delayable(&sh_mqtt->subscribe_dwork, sh_mqtt_subscribe_handler);
685 	k_work_init_delayable(&sh_mqtt->process_dwork, sh_mqtt_process_handler);
686 	k_work_init_delayable(&sh_mqtt->publish_dwork, sh_mqtt_publish_handler);
687 
688 	LOG_DBG("Initializing listener for network");
689 	net_mgmt_init_event_callback(&sh_mqtt->mgmt_cb, network_evt_handler, NET_EVENT_MASK);
690 
691 	sh_mqtt->network_state = SHELL_MQTT_NETWORK_DISCONNECTED;
692 	sh_mqtt->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
693 	sh_mqtt->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
694 
695 	return 0;
696 }
697 
uninit(const struct shell_transport * transport)698 static int uninit(const struct shell_transport *transport)
699 {
700 	ARG_UNUSED(transport);
701 
702 	/* Not initialized yet */
703 	if (sh_mqtt == NULL) {
704 		return -ENODEV;
705 	}
706 
707 	return 0;
708 }
709 
enable(const struct shell_transport * transport,bool blocking)710 static int enable(const struct shell_transport *transport, bool blocking)
711 {
712 	ARG_UNUSED(transport);
713 	ARG_UNUSED(blocking);
714 
715 	/* Not initialized yet */
716 	if (sh_mqtt == NULL) {
717 		return -ENODEV;
718 	}
719 
720 	/* Listen for network connection status */
721 	net_mgmt_add_event_callback(&sh_mqtt->mgmt_cb);
722 	conn_mgr_mon_resend_status();
723 
724 	return 0;
725 }
726 
write(const struct shell_transport * transport,const void * data,size_t length,size_t * cnt)727 static int write(const struct shell_transport *transport, const void *data, size_t length,
728 		 size_t *cnt)
729 {
730 	ARG_UNUSED(transport);
731 	int rc = 0;
732 	struct k_work_sync ws;
733 	size_t copy_len;
734 
735 	*cnt = 0;
736 
737 	/* Not initialized yet */
738 	if (sh_mqtt == NULL) {
739 		return -ENODEV;
740 	}
741 
742 	/* Not connected to broker */
743 	if (sh_mqtt->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
744 		goto out;
745 	}
746 
747 	(void)k_work_cancel_delayable_sync(&sh_mqtt->publish_dwork, &ws);
748 
749 	do {
750 		if ((sh_mqtt->tx_buf.len + length - *cnt) > TX_BUF_SIZE) {
751 			copy_len = TX_BUF_SIZE - sh_mqtt->tx_buf.len;
752 		} else {
753 			copy_len = length - *cnt;
754 		}
755 
756 		memcpy(sh_mqtt->tx_buf.buf + sh_mqtt->tx_buf.len, (uint8_t *)data + *cnt, copy_len);
757 		sh_mqtt->tx_buf.len += copy_len;
758 
759 		/* Send the data immediately if the buffer is full */
760 		if (sh_mqtt->tx_buf.len == TX_BUF_SIZE) {
761 			rc = sh_mqtt_publish_tx_buf(false);
762 			if (rc != 0) {
763 				sh_mqtt_close_and_cleanup();
764 				(void)sh_mqtt_work_reschedule(&sh_mqtt->connect_dwork,
765 							      K_SECONDS(2));
766 				*cnt = length;
767 				return rc;
768 			}
769 		}
770 
771 		*cnt += copy_len;
772 	} while (*cnt < length);
773 
774 	if (sh_mqtt->tx_buf.len > 0) {
775 		(void)sh_mqtt_work_reschedule(&sh_mqtt->publish_dwork, MQTT_SEND_DELAY_MS);
776 	}
777 
778 	/* Inform shell that it is ready for next TX */
779 	sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_TX_RDY, sh_mqtt->shell_context);
780 
781 out:
782 	/* We will always assume that we sent everything */
783 	*cnt = length;
784 	return rc;
785 }
786 
read(const struct shell_transport * transport,void * data,size_t length,size_t * cnt)787 static int read(const struct shell_transport *transport, void *data, size_t length, size_t *cnt)
788 {
789 	ARG_UNUSED(transport);
790 
791 	/* Not initialized yet */
792 	if (sh_mqtt == NULL) {
793 		return -ENODEV;
794 	}
795 
796 	/* Not subscribed yet */
797 	if (sh_mqtt->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
798 		*cnt = 0;
799 		return 0;
800 	}
801 
802 	*cnt = ring_buf_get(&sh_mqtt->rx_rb, data, length);
803 
804 	/* Inform the shell if there are still data in the rb */
805 	if (ring_buf_size_get(&sh_mqtt->rx_rb) > 0) {
806 		sh_mqtt->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh_mqtt->shell_context);
807 	}
808 
809 	return 0;
810 }
811 
812 const struct shell_transport_api shell_mqtt_transport_api = { .init = init,
813 							      .uninit = uninit,
814 							      .enable = enable,
815 							      .write = write,
816 							      .read = read };
817 
enable_shell_mqtt(void)818 static int enable_shell_mqtt(void)
819 {
820 
821 	bool log_backend = CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > 0;
822 	uint32_t level = (CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > LOG_LEVEL_DBG) ?
823 				       CONFIG_LOG_MAX_LEVEL :
824 				       CONFIG_SHELL_MQTT_INIT_LOG_LEVEL;
825 	static const struct shell_backend_config_flags cfg_flags = {
826 		.insert_mode = 0,
827 		.echo = 0,
828 		.obscure = 0,
829 		.mode_delete = 0,
830 		.use_colors = 0,
831 		.use_vt100 = 0,
832 	};
833 
834 	return shell_init(&shell_mqtt, NULL, cfg_flags, log_backend, level);
835 }
836 
837 /* Function is used for testing purposes */
shell_backend_mqtt_get_ptr(void)838 const struct shell *shell_backend_mqtt_get_ptr(void)
839 {
840 	return &shell_mqtt;
841 }
842 
843 SYS_INIT(enable_shell_mqtt, APPLICATION, CONFIG_APPLICATION_INIT_PRIORITY);
844