1 /*
2 * Copyright (c) 2022 G-Technologies Sdn. Bhd.
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <zephyr/kernel.h>
8 #include <zephyr/shell/shell_mqtt.h>
9 #include <zephyr/init.h>
10 #include <zephyr/logging/log.h>
11 #include <string.h>
12 #include <stdio.h>
13 #include <zephyr/drivers/hwinfo.h>
14
15 SHELL_MQTT_DEFINE(shell_transport_mqtt);
16 SHELL_DEFINE(shell_mqtt, "", &shell_transport_mqtt,
17 CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_SIZE,
18 CONFIG_SHELL_BACKEND_MQTT_LOG_MESSAGE_QUEUE_TIMEOUT, SHELL_FLAG_OLF_CRLF);
19
20 LOG_MODULE_REGISTER(shell_mqtt, CONFIG_SHELL_MQTT_LOG_LEVEL);
21
22 #define NET_EVENT_MASK (NET_EVENT_L4_CONNECTED | NET_EVENT_L4_DISCONNECTED)
23 #define CONNECT_TIMEOUT_MS CONFIG_SHELL_MQTT_CONNECT_TIMEOUT_MS
24 #define LISTEN_TIMEOUT_MS CONFIG_SHELL_MQTT_LISTEN_TIMEOUT_MS
25 #define MQTT_SEND_DELAY_MS K_MSEC(100)
26 #define PROCESS_INTERVAL K_MSEC(CONFIG_SHELL_MQTT_WORK_DELAY_MS)
27 #define SHELL_MQTT_WORKQ_STACK_SIZE 2048
28
29 struct shell_mqtt *sh_mqtt;
30 K_KERNEL_STACK_DEFINE(sh_mqtt_workq_stack, SHELL_MQTT_WORKQ_STACK_SIZE);
31
32 static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt);
33
sh_mqtt_work_reschedule(struct k_work_delayable * dwork,k_timeout_t delay)34 static inline int sh_mqtt_work_reschedule(struct k_work_delayable *dwork, k_timeout_t delay)
35 {
36 return k_work_reschedule_for_queue(&sh_mqtt->workq, dwork, delay);
37 }
38
sh_mqtt_work_submit(struct k_work * work)39 static inline int sh_mqtt_work_submit(struct k_work *work)
40 {
41 return k_work_submit_to_queue(&sh_mqtt->workq, work);
42 }
43
44 /* Lock the context of the shell mqtt */
sh_mqtt_context_lock(k_timeout_t timeout)45 static inline int sh_mqtt_context_lock(k_timeout_t timeout)
46 {
47 return k_mutex_lock(&sh_mqtt->lock, timeout);
48 }
49
50 /* Unlock the context of the shell mqtt */
sh_mqtt_context_unlock(void)51 static inline void sh_mqtt_context_unlock(void)
52 {
53 (void)k_mutex_unlock(&sh_mqtt->lock);
54 }
55
shell_mqtt_get_devid(char * id,int id_max_len)56 bool __weak shell_mqtt_get_devid(char *id, int id_max_len)
57 {
58 uint8_t hwinfo_id[DEVICE_ID_BIN_MAX_SIZE];
59 ssize_t length;
60
61 length = hwinfo_get_device_id(hwinfo_id, DEVICE_ID_BIN_MAX_SIZE);
62 if (length <= 0) {
63 return false;
64 }
65
66 (void)memset(id, 0, id_max_len);
67 length = bin2hex(hwinfo_id, (size_t)length, id, id_max_len);
68
69 return length > 0;
70 }
71
prepare_fds(struct shell_mqtt * sh)72 static void prepare_fds(struct shell_mqtt *sh)
73 {
74 if (sh->mqtt_cli.transport.type == MQTT_TRANSPORT_NON_SECURE) {
75 sh->fds[0].fd = sh->mqtt_cli.transport.tcp.sock;
76 }
77
78 sh->fds[0].events = ZSOCK_POLLIN;
79 sh->nfds = 1;
80 }
81
clear_fds(struct shell_mqtt * sh)82 static void clear_fds(struct shell_mqtt *sh)
83 {
84 sh->nfds = 0;
85 }
86
87 /*
88 * Upon successful completion, poll() shall return a non-negative value. A positive value indicates
89 * the total number of pollfd structures that have selected events (that is, those for which the
90 * revents member is non-zero). A value of 0 indicates that the call timed out and no file
91 * descriptors have been selected. Upon failure, poll() shall return -1 and set errno to indicate
92 * the error.
93 */
wait(struct shell_mqtt * sh,int timeout)94 static int wait(struct shell_mqtt *sh, int timeout)
95 {
96 int rc = 0;
97
98 if (sh->nfds > 0) {
99 rc = zsock_poll(sh->fds, sh->nfds, timeout);
100 if (rc < 0) {
101 LOG_ERR("poll error: %d", errno);
102 }
103 }
104
105 return rc;
106 }
107
108 /* Query IP address for the broker URL */
get_mqtt_broker_addrinfo(struct shell_mqtt * sh)109 static int get_mqtt_broker_addrinfo(struct shell_mqtt *sh)
110 {
111 int rc;
112 struct zsock_addrinfo hints = { .ai_family = NET_AF_INET,
113 .ai_socktype = NET_SOCK_STREAM,
114 .ai_protocol = 0 };
115
116 if (sh->haddr != NULL) {
117 zsock_freeaddrinfo(sh->haddr);
118 }
119
120 rc = zsock_getaddrinfo(CONFIG_SHELL_MQTT_SERVER_ADDR,
121 STRINGIFY(CONFIG_SHELL_MQTT_SERVER_PORT), &hints, &sh->haddr);
122 if (rc == 0) {
123 LOG_INF("DNS%s resolved for %s:%d", "", CONFIG_SHELL_MQTT_SERVER_ADDR,
124 CONFIG_SHELL_MQTT_SERVER_PORT);
125
126 return 0;
127 }
128
129 LOG_ERR("DNS%s resolved for %s:%d, retrying", " not", CONFIG_SHELL_MQTT_SERVER_ADDR,
130 CONFIG_SHELL_MQTT_SERVER_PORT);
131
132 return rc;
133 }
134
135 /* Close MQTT connection properly and cleanup socket */
sh_mqtt_close_and_cleanup(struct shell_mqtt * sh)136 static void sh_mqtt_close_and_cleanup(struct shell_mqtt *sh)
137 {
138 /* Initialize to negative value so that the mqtt_abort case can run */
139 int rc = -1;
140
141 /* If both network & mqtt connected, mqtt_disconnect will send a
142 * disconnection packet to the broker, it will invoke
143 * mqtt_evt_handler:MQTT_EVT_DISCONNECT if success
144 */
145 if ((sh->network_state == SHELL_MQTT_NETWORK_CONNECTED) &&
146 (sh->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED)) {
147 rc = mqtt_disconnect(&sh->mqtt_cli, NULL);
148 }
149
150 /* If network/mqtt disconnected, or mqtt_disconnect failed, do mqtt_abort */
151 if (rc < 0) {
152 /* mqtt_abort doesn't send disconnection packet to the broker, but it
153 * makes sure that the MQTT connection is aborted locally and will
154 * always invoke mqtt_evt_handler:MQTT_EVT_DISCONNECT
155 */
156 (void)mqtt_abort(&sh->mqtt_cli);
157 }
158
159 /* Cleanup socket */
160 clear_fds(sh);
161 }
162
broker_init(struct shell_mqtt * sh)163 static void broker_init(struct shell_mqtt *sh)
164 {
165 struct net_sockaddr_in *broker4 = (struct net_sockaddr_in *)&sh->broker;
166
167 broker4->sin_family = NET_AF_INET;
168 broker4->sin_port = net_htons(CONFIG_SHELL_MQTT_SERVER_PORT);
169
170 net_ipaddr_copy(&broker4->sin_addr, &net_sin(sh->haddr->ai_addr)->sin_addr);
171 }
172
client_init(struct shell_mqtt * sh)173 static void client_init(struct shell_mqtt *sh)
174 {
175 static struct mqtt_utf8 password;
176 static struct mqtt_utf8 username;
177
178 password.utf8 = (uint8_t *)CONFIG_SHELL_MQTT_SERVER_PASSWORD;
179 password.size = strlen(CONFIG_SHELL_MQTT_SERVER_PASSWORD);
180 username.utf8 = (uint8_t *)CONFIG_SHELL_MQTT_SERVER_USERNAME;
181 username.size = strlen(CONFIG_SHELL_MQTT_SERVER_USERNAME);
182
183 mqtt_client_init(&sh->mqtt_cli);
184
185 /* MQTT client configuration */
186 sh->mqtt_cli.broker = &sh->broker;
187 sh->mqtt_cli.evt_cb = mqtt_evt_handler;
188 sh->mqtt_cli.client_id.utf8 = (uint8_t *)sh->device_id;
189 sh->mqtt_cli.client_id.size = strlen(sh->device_id);
190 sh->mqtt_cli.password = &password;
191 sh->mqtt_cli.user_name = &username;
192 sh->mqtt_cli.protocol_version = MQTT_VERSION_3_1_1;
193
194 /* MQTT buffers configuration */
195 sh->mqtt_cli.rx_buf = sh->buf.rx;
196 sh->mqtt_cli.rx_buf_size = sizeof(sh->buf.rx);
197 sh->mqtt_cli.tx_buf = sh->buf.tx;
198 sh->mqtt_cli.tx_buf_size = sizeof(sh->buf.tx);
199
200 /* MQTT transport configuration */
201 sh->mqtt_cli.transport.type = MQTT_TRANSPORT_NON_SECURE;
202 }
203
204 /* Work routine to process MQTT packet and keep alive MQTT connection */
sh_mqtt_process_handler(struct k_work * work)205 static void sh_mqtt_process_handler(struct k_work *work)
206 {
207 ARG_UNUSED(work);
208 struct shell_mqtt *sh = sh_mqtt;
209 int rc;
210 int64_t remaining = LISTEN_TIMEOUT_MS;
211 int64_t start_time = k_uptime_get();
212
213 if (sh->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
214 LOG_DBG("%s_work while %s", "process", "network disconnected");
215 return;
216 }
217
218 /* If context can't be locked, that means net conn cb locked it */
219 if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
220 /* In that case we should simply return */
221 LOG_DBG("%s_work unable to lock context", "process");
222 return;
223 }
224
225 if (sh->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
226 LOG_DBG("MQTT %s", "not connected");
227 goto process_error;
228 }
229
230 if (sh->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
231 LOG_DBG("%s_work while %s", "process", "MQTT not subscribed");
232 goto process_error;
233 }
234
235 LOG_DBG("MQTT %s", "Processing");
236 /* Listen to the port for a duration defined by LISTEN_TIMEOUT_MS */
237 while ((remaining > 0) && (sh->network_state == SHELL_MQTT_NETWORK_CONNECTED) &&
238 (sh->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) &&
239 (sh->subscribe_state == SHELL_MQTT_SUBSCRIBED)) {
240 LOG_DBG("Listening to socket");
241 rc = wait(sh, remaining);
242 if (rc > 0) {
243 LOG_DBG("Process socket for MQTT packet");
244 rc = mqtt_input(&sh->mqtt_cli);
245 if (rc != 0) {
246 LOG_ERR("%s error: %d", "processed: mqtt_input", rc);
247 goto process_error;
248 }
249 } else if (rc < 0) {
250 goto process_error;
251 }
252
253 LOG_DBG("MQTT %s", "Keepalive");
254 rc = mqtt_live(&sh->mqtt_cli);
255 if ((rc != 0) && (rc != -EAGAIN)) {
256 LOG_ERR("%s error: %d", "mqtt_live", rc);
257 goto process_error;
258 }
259
260 remaining = LISTEN_TIMEOUT_MS + start_time - k_uptime_get();
261 }
262
263 /* Reschedule the process work */
264 LOG_DBG("Scheduling %s work", "process");
265 (void)sh_mqtt_work_reschedule(&sh->process_dwork, PROCESS_INTERVAL);
266 sh_mqtt_context_unlock();
267 return;
268
269 process_error:
270 LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect");
271 sh_mqtt_close_and_cleanup(sh);
272 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
273 sh_mqtt_context_unlock();
274 }
275
sh_mqtt_subscribe_handler(struct k_work * work)276 static void sh_mqtt_subscribe_handler(struct k_work *work)
277 {
278 ARG_UNUSED(work);
279 struct shell_mqtt *sh = sh_mqtt;
280
281 /* Subscribe config information */
282 struct mqtt_topic subs_topic = { .topic = { .utf8 = sh->sub_topic,
283 .size = strlen(sh->sub_topic) },
284 .qos = MQTT_QOS_1_AT_LEAST_ONCE };
285 const struct mqtt_subscription_list subs_list = { .list = &subs_topic,
286 .list_count = 1U,
287 .message_id = 1U };
288 int rc;
289
290 if (sh->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
291 LOG_DBG("%s_work while %s", "subscribe", "network disconnected");
292 return;
293 }
294
295 /* If context can't be locked, that means net conn cb locked it */
296 if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
297 /* In that case we should simply return */
298 LOG_DBG("%s_work unable to lock context", "subscribe");
299 return;
300 }
301
302 if (sh->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
303 LOG_DBG("%s_work while %s", "subscribe", "transport disconnected");
304 goto subscribe_error;
305 }
306
307 rc = mqtt_subscribe(&sh->mqtt_cli, &subs_list);
308 if (rc == 0) {
309 /* Wait for mqtt's connack */
310 LOG_DBG("Listening to socket");
311 rc = wait(sh, CONNECT_TIMEOUT_MS);
312 if (rc > 0) {
313 LOG_DBG("Process socket for MQTT packet");
314 rc = mqtt_input(&sh->mqtt_cli);
315 if (rc != 0) {
316 LOG_ERR("%s error: %d", "subscribe: mqtt_input", rc);
317 goto subscribe_error;
318 }
319 } else if (rc < 0) {
320 goto subscribe_error;
321 }
322
323 /* No suback, fail */
324 if (sh->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
325 goto subscribe_error;
326 }
327
328 LOG_DBG("Scheduling MQTT process work");
329 (void)sh_mqtt_work_reschedule(&sh->process_dwork, PROCESS_INTERVAL);
330 sh_mqtt_context_unlock();
331
332 LOG_INF("Logs will be published to: %s", sh->pub_topic);
333 LOG_INF("Subscribing shell cmds from: %s", sh->sub_topic);
334
335 return;
336 }
337
338 subscribe_error:
339 LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "subscribe");
340 sh_mqtt_close_and_cleanup(sh);
341 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
342 sh_mqtt_context_unlock();
343 }
344
345 /* Work routine to connect to MQTT */
sh_mqtt_connect_handler(struct k_work * work)346 static void sh_mqtt_connect_handler(struct k_work *work)
347 {
348 ARG_UNUSED(work);
349 struct shell_mqtt *sh = sh_mqtt;
350 int rc;
351
352 if (sh->network_state != SHELL_MQTT_NETWORK_CONNECTED) {
353 LOG_DBG("%s_work while %s", "connect", "network disconnected");
354 return;
355 }
356
357 /* If context can't be locked, that means net conn cb locked it */
358 if (sh_mqtt_context_lock(K_NO_WAIT) != 0) {
359 /* In that case we should simply return */
360 LOG_DBG("%s_work unable to lock context", "connect");
361 return;
362 }
363
364 if (sh->transport_state == SHELL_MQTT_TRANSPORT_CONNECTED) {
365 __ASSERT(0, "MQTT shouldn't be already connected");
366 LOG_ERR("MQTT shouldn't be already connected");
367 goto connect_error;
368 }
369
370 /* Resolve the broker URL */
371 LOG_DBG("Resolving DNS");
372 rc = get_mqtt_broker_addrinfo(sh);
373 if (rc != 0) {
374 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
375 sh_mqtt_context_unlock();
376 return;
377 }
378
379 LOG_DBG("Initializing MQTT client");
380 broker_init(sh);
381 client_init(sh);
382
383 /* Try to connect to mqtt */
384 LOG_DBG("Connecting to MQTT broker");
385 rc = mqtt_connect(&sh->mqtt_cli);
386 if (rc != 0) {
387 LOG_ERR("%s error: %d", "mqtt_connect", rc);
388 goto connect_error;
389 }
390
391 /* Prepare port config */
392 LOG_DBG("Preparing socket");
393 prepare_fds(sh);
394
395 /* Wait for mqtt's connack */
396 LOG_DBG("Listening to socket");
397 rc = wait(sh, CONNECT_TIMEOUT_MS);
398 if (rc > 0) {
399 LOG_DBG("Process socket for MQTT packet");
400 rc = mqtt_input(&sh->mqtt_cli);
401 if (rc != 0) {
402 LOG_ERR("%s error: %d", "connect: mqtt_input", rc);
403 goto connect_error;
404 }
405 } else if (rc < 0) {
406 goto connect_error;
407 }
408
409 /* No connack, fail */
410 if (sh->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
411 goto connect_error;
412 }
413
414 LOG_DBG("Scheduling %s work", "subscribe");
415 (void)sh_mqtt_work_reschedule(&sh->subscribe_dwork, PROCESS_INTERVAL);
416 sh_mqtt_context_unlock();
417 return;
418
419 connect_error:
420 LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "connect");
421 sh_mqtt_close_and_cleanup(sh);
422 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
423 sh_mqtt_context_unlock();
424 }
425
sh_mqtt_publish(struct shell_mqtt * sh,uint8_t * data,uint32_t len)426 static int sh_mqtt_publish(struct shell_mqtt *sh, uint8_t *data, uint32_t len)
427 {
428 sh->pub_data.message.payload.data = data;
429 sh->pub_data.message.payload.len = len;
430 sh->pub_data.message_id++;
431
432 return mqtt_publish(&sh->mqtt_cli, &sh->pub_data);
433 }
434
sh_mqtt_publish_tx_buf(struct shell_mqtt * sh,bool is_work)435 static int sh_mqtt_publish_tx_buf(struct shell_mqtt *sh, bool is_work)
436 {
437 int rc;
438
439 rc = sh_mqtt_publish(sh, &sh->tx_buf.buf[0], sh->tx_buf.len);
440 memset(&sh->tx_buf, 0, sizeof(sh->tx_buf));
441 if (rc != 0) {
442 LOG_ERR("MQTT publish error: %d", rc);
443 return rc;
444 }
445
446 /* Arbitrary delay to not kill the session */
447 if (!is_work) {
448 k_sleep(MQTT_SEND_DELAY_MS);
449 }
450
451 return rc;
452 }
453
sh_mqtt_publish_handler(struct k_work * work)454 static void sh_mqtt_publish_handler(struct k_work *work)
455 {
456 ARG_UNUSED(work);
457 struct shell_mqtt *sh = sh_mqtt;
458 int rc;
459
460 (void)sh_mqtt_context_lock(K_FOREVER);
461
462 rc = sh_mqtt_publish_tx_buf(sh, true);
463 if (rc != 0) {
464 LOG_DBG("%s: close MQTT, cleanup socket & reconnect", "publish");
465 sh_mqtt_close_and_cleanup(sh);
466 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
467 }
468
469 sh_mqtt_context_unlock();
470 }
471
cancel_dworks(struct shell_mqtt * sh)472 static void cancel_dworks(struct shell_mqtt *sh)
473 {
474 (void)k_work_cancel_delayable(&sh->connect_dwork);
475 (void)k_work_cancel_delayable(&sh->subscribe_dwork);
476 (void)k_work_cancel_delayable(&sh->process_dwork);
477 (void)k_work_cancel_delayable(&sh->publish_dwork);
478 }
479
net_disconnect_handler(struct k_work * work)480 static void net_disconnect_handler(struct k_work *work)
481 {
482 ARG_UNUSED(work);
483 struct shell_mqtt *sh = sh_mqtt;
484
485 LOG_WRN("Network %s", "disconnected");
486
487 /* Stop all possible work */
488 (void)sh_mqtt_context_lock(K_FOREVER);
489 sh_mqtt_close_and_cleanup(sh);
490 sh_mqtt_context_unlock();
491 /* If the transport was requested, the connect work will be rescheduled
492 * when internet is connected again
493 */
494 }
495
496 /* Network connection event handler */
network_evt_handler(struct net_mgmt_event_callback * cb,uint64_t mgmt_event,struct net_if * iface)497 static void network_evt_handler(struct net_mgmt_event_callback *cb, uint64_t mgmt_event,
498 struct net_if *iface)
499 {
500 struct shell_mqtt *sh = sh_mqtt;
501
502 if (mgmt_event == NET_EVENT_L4_CONNECTED) {
503 (void)k_work_cancel(&sh->net_disconnected_work);
504 if (sh->network_state == SHELL_MQTT_NETWORK_DISCONNECTED) {
505 LOG_WRN("Network %s", "connected");
506 sh->network_state = SHELL_MQTT_NETWORK_CONNECTED;
507 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
508 }
509 } else if ((mgmt_event == NET_EVENT_L4_DISCONNECTED) &&
510 (sh->network_state == SHELL_MQTT_NETWORK_CONNECTED)) {
511 sh->network_state = SHELL_MQTT_NETWORK_DISCONNECTED;
512 cancel_dworks(sh);
513 (void)sh_mqtt_work_submit(&sh->net_disconnected_work);
514 }
515 }
516
mqtt_evt_handler(struct mqtt_client * const client,const struct mqtt_evt * evt)517 static void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt)
518 {
519 struct shell_mqtt *sh = sh_mqtt;
520
521 switch (evt->type) {
522 case MQTT_EVT_CONNACK:
523 if (evt->result != 0) {
524 sh->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
525 LOG_ERR("MQTT %s %d", "connect failed", evt->result);
526 break;
527 }
528
529 sh->transport_state = SHELL_MQTT_TRANSPORT_CONNECTED;
530 LOG_WRN("MQTT %s", "client connected!");
531 break;
532
533 case MQTT_EVT_SUBACK:
534 if (evt->result != 0) {
535 LOG_ERR("MQTT subscribe: %s", "error");
536 sh->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
537 break;
538 }
539
540 LOG_WRN("MQTT subscribe: %s", "ok");
541 sh->subscribe_state = SHELL_MQTT_SUBSCRIBED;
542 break;
543
544 case MQTT_EVT_UNSUBACK:
545 LOG_DBG("UNSUBACK packet id: %u", evt->param.suback.message_id);
546 sh->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
547 break;
548
549 case MQTT_EVT_DISCONNECT:
550 LOG_WRN("MQTT disconnected: %d", evt->result);
551 sh->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
552 sh->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
553 break;
554
555 case MQTT_EVT_PUBLISH: {
556 const struct mqtt_publish_param *pub = &evt->param.publish;
557 uint32_t payload_left;
558 size_t size;
559 int rc;
560
561 payload_left = pub->message.payload.len;
562
563 LOG_DBG("MQTT publish received %d, %d bytes", evt->result, payload_left);
564 LOG_DBG(" id: %d, qos: %d", pub->message_id, pub->message.topic.qos);
565 LOG_DBG(" item: %s", pub->message.topic.topic.utf8);
566
567 /* For MQTT_QOS_0_AT_MOST_ONCE no acknowledgment needed */
568 if (pub->message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
569 struct mqtt_puback_param puback = { .message_id = pub->message_id };
570
571 (void)mqtt_publish_qos1_ack(client, &puback);
572 }
573
574 while (payload_left > 0) {
575 /* Attempt to claim `payload_left` bytes of buffer in rb */
576 size = (size_t)ring_buf_put_claim(&sh->rx_rb, &sh->rx_rb_ptr,
577 payload_left);
578 /* Read `size` bytes of payload from mqtt */
579 rc = mqtt_read_publish_payload_blocking(client, sh->rx_rb_ptr, size);
580
581 /* errno value, return */
582 if (rc < 0) {
583 ring_buf_reset(&sh->rx_rb);
584 return;
585 }
586
587 size = (size_t)rc;
588 /* Indicate that `size` bytes of payload has been written into rb */
589 (void)ring_buf_put_finish(&sh->rx_rb, size);
590 /* Update `payload_left` */
591 payload_left -= size;
592 /* Tells the shell that we have new data for it */
593 sh->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh->shell_context);
594 /* Arbitrary sleep for the shell to do its thing */
595 (void)k_msleep(100);
596 }
597
598 /* Shell won't execute the cmds without \r\n */
599 while (true) {
600 /* Check if rb's free space is enough to fit in \r\n */
601 size = ring_buf_space_get(&sh->rx_rb);
602 if (size >= sizeof("\r\n")) {
603 (void)ring_buf_put(&sh->rx_rb, "\r\n", sizeof("\r\n"));
604 break;
605 }
606 /* Arbitrary sleep for the shell to do its thing */
607 (void)k_msleep(100);
608 }
609
610 sh->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh->shell_context);
611 break;
612 }
613
614 case MQTT_EVT_PUBACK:
615 if (evt->result != 0) {
616 LOG_ERR("MQTT PUBACK error %d", evt->result);
617 break;
618 }
619
620 LOG_DBG("PUBACK packet id: %u", evt->param.puback.message_id);
621 break;
622
623 case MQTT_EVT_PINGRESP:
624 LOG_DBG("PINGRESP packet");
625 break;
626
627 default:
628 LOG_DBG("MQTT event received %d", evt->type);
629 break;
630 }
631 }
632
init(const struct shell_transport * transport,const void * config,shell_transport_handler_t evt_handler,void * context)633 static int init(const struct shell_transport *transport, const void *config,
634 shell_transport_handler_t evt_handler, void *context)
635 {
636 sh_mqtt = (struct shell_mqtt *)transport->ctx;
637 struct shell_mqtt *sh = sh_mqtt;
638
639 (void)memset(sh, 0, sizeof(struct shell_mqtt));
640
641 (void)k_mutex_init(&sh->lock);
642
643 if (!shell_mqtt_get_devid(sh->device_id, DEVICE_ID_HEX_MAX_SIZE)) {
644 LOG_ERR("Unable to get device identity, using dummy value");
645 (void)snprintf(sh->device_id, sizeof("dummy"), "dummy");
646 }
647
648 LOG_DBG("Client ID is %s", sh->device_id);
649
650 (void)snprintf(sh->pub_topic, SH_MQTT_TOPIC_TX_MAX_SIZE, "%s" CONFIG_SHELL_MQTT_TOPIC_TX_ID,
651 sh->device_id);
652 (void)snprintf(sh->sub_topic, SH_MQTT_TOPIC_RX_MAX_SIZE, "%s" CONFIG_SHELL_MQTT_TOPIC_RX_ID,
653 sh->device_id);
654
655 ring_buf_init(&sh->rx_rb, RX_RB_SIZE, sh->rx_rb_buf);
656
657 LOG_DBG("Initializing shell MQTT backend");
658
659 sh->shell_handler = evt_handler;
660 sh->shell_context = context;
661
662 sh->pub_data.message.topic.qos = MQTT_QOS_0_AT_MOST_ONCE;
663 sh->pub_data.message.topic.topic.utf8 = (uint8_t *)sh->pub_topic;
664 sh->pub_data.message.topic.topic.size =
665 strlen(sh->pub_data.message.topic.topic.utf8);
666 sh->pub_data.dup_flag = 0U;
667 sh->pub_data.retain_flag = 0U;
668
669 /* Initialize the work queue */
670 k_work_queue_init(&sh->workq);
671 k_work_queue_start(&sh->workq, sh_mqtt_workq_stack,
672 K_KERNEL_STACK_SIZEOF(sh_mqtt_workq_stack), K_PRIO_COOP(7), NULL);
673 (void)k_thread_name_set(&sh->workq.thread, "sh_mqtt_workq");
674 k_work_init(&sh->net_disconnected_work, net_disconnect_handler);
675 k_work_init_delayable(&sh->connect_dwork, sh_mqtt_connect_handler);
676 k_work_init_delayable(&sh->subscribe_dwork, sh_mqtt_subscribe_handler);
677 k_work_init_delayable(&sh->process_dwork, sh_mqtt_process_handler);
678 k_work_init_delayable(&sh->publish_dwork, sh_mqtt_publish_handler);
679
680 LOG_DBG("Initializing listener for network");
681 net_mgmt_init_event_callback(&sh->mgmt_cb, network_evt_handler, NET_EVENT_MASK);
682
683 sh->network_state = SHELL_MQTT_NETWORK_DISCONNECTED;
684 sh->transport_state = SHELL_MQTT_TRANSPORT_DISCONNECTED;
685 sh->subscribe_state = SHELL_MQTT_NOT_SUBSCRIBED;
686
687 return 0;
688 }
689
uninit(const struct shell_transport * transport)690 static int uninit(const struct shell_transport *transport)
691 {
692 ARG_UNUSED(transport);
693 struct shell_mqtt *sh = sh_mqtt;
694
695 /* Not initialized yet */
696 if (sh == NULL) {
697 return -ENODEV;
698 }
699
700 return 0;
701 }
702
enable(const struct shell_transport * transport,bool blocking)703 static int enable(const struct shell_transport *transport, bool blocking)
704 {
705 ARG_UNUSED(transport);
706 ARG_UNUSED(blocking);
707 struct shell_mqtt *sh = sh_mqtt;
708
709 /* Not initialized yet */
710 if (sh == NULL) {
711 return -ENODEV;
712 }
713
714 /* Listen for network connection status */
715 net_mgmt_add_event_callback(&sh->mgmt_cb);
716 conn_mgr_mon_resend_status();
717
718 return 0;
719 }
720
write_data(const struct shell_transport * transport,const void * data,size_t length,size_t * cnt)721 static int write_data(const struct shell_transport *transport, const void *data, size_t length,
722 size_t *cnt)
723 {
724 ARG_UNUSED(transport);
725 struct shell_mqtt *sh = sh_mqtt;
726 int rc = 0;
727 struct k_work_sync ws;
728 size_t copy_len;
729
730 *cnt = 0;
731
732 /* Not initialized yet */
733 if (sh == NULL) {
734 return -ENODEV;
735 }
736
737 /* Not connected to broker */
738 if (sh->transport_state != SHELL_MQTT_TRANSPORT_CONNECTED) {
739 goto out;
740 }
741
742 (void)k_work_cancel_delayable_sync(&sh->publish_dwork, &ws);
743
744 do {
745 if ((sh->tx_buf.len + length - *cnt) > TX_BUF_SIZE) {
746 copy_len = TX_BUF_SIZE - sh->tx_buf.len;
747 } else {
748 copy_len = length - *cnt;
749 }
750
751 memcpy(sh->tx_buf.buf + sh->tx_buf.len, (uint8_t *)data + *cnt, copy_len);
752 sh->tx_buf.len += copy_len;
753
754 /* Send the data immediately if the buffer is full */
755 if (sh->tx_buf.len == TX_BUF_SIZE) {
756 rc = sh_mqtt_publish_tx_buf(sh, false);
757 if (rc != 0) {
758 sh_mqtt_close_and_cleanup(sh);
759 (void)sh_mqtt_work_reschedule(&sh->connect_dwork, PROCESS_INTERVAL);
760 *cnt = length;
761 return rc;
762 }
763 }
764
765 *cnt += copy_len;
766 } while (*cnt < length);
767
768 if (sh->tx_buf.len > 0) {
769 (void)sh_mqtt_work_reschedule(&sh->publish_dwork, MQTT_SEND_DELAY_MS);
770 }
771
772 /* Inform shell that it is ready for next TX */
773 sh->shell_handler(SHELL_TRANSPORT_EVT_TX_RDY, sh->shell_context);
774
775 out:
776 /* We will always assume that we sent everything */
777 *cnt = length;
778 return rc;
779 }
780
read_data(const struct shell_transport * transport,void * data,size_t length,size_t * cnt)781 static int read_data(const struct shell_transport *transport, void *data, size_t length,
782 size_t *cnt)
783 {
784 ARG_UNUSED(transport);
785 struct shell_mqtt *sh = sh_mqtt;
786
787 /* Not initialized yet */
788 if (sh == NULL) {
789 return -ENODEV;
790 }
791
792 /* Not subscribed yet */
793 if (sh->subscribe_state != SHELL_MQTT_SUBSCRIBED) {
794 *cnt = 0;
795 return 0;
796 }
797
798 *cnt = ring_buf_get(&sh->rx_rb, data, length);
799
800 /* Inform the shell if there are still data in the rb */
801 if (ring_buf_size_get(&sh->rx_rb) > 0) {
802 sh->shell_handler(SHELL_TRANSPORT_EVT_RX_RDY, sh->shell_context);
803 }
804
805 return 0;
806 }
807
808 const struct shell_transport_api shell_mqtt_transport_api = { .init = init,
809 .uninit = uninit,
810 .enable = enable,
811 .write = write_data,
812 .read = read_data };
813
enable_shell_mqtt(void)814 static int enable_shell_mqtt(void)
815 {
816
817 bool log_backend = CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > 0;
818 uint32_t level = (CONFIG_SHELL_MQTT_INIT_LOG_LEVEL > LOG_LEVEL_DBG) ?
819 CONFIG_LOG_MAX_LEVEL :
820 CONFIG_SHELL_MQTT_INIT_LOG_LEVEL;
821 static const struct shell_backend_config_flags cfg_flags = {
822 .insert_mode = 0,
823 .echo = 0,
824 .obscure = 0,
825 .mode_delete = 0,
826 .use_colors = 0,
827 .use_vt100 = 0,
828 };
829
830 return shell_init(&shell_mqtt, NULL, cfg_flags, log_backend, level);
831 }
832
833 /* Function is used for testing purposes */
shell_backend_mqtt_get_ptr(void)834 const struct shell *shell_backend_mqtt_get_ptr(void)
835 {
836 return &shell_mqtt;
837 }
838
839 SYS_INIT(enable_shell_mqtt, APPLICATION, CONFIG_APPLICATION_INIT_PRIORITY);
840