1 /***************************************************************************
2 * Copyright (c) 2024 Microsoft Corporation
3 *
4 * This program and the accompanying materials are made available under the
5 * terms of the MIT License which is available at
6 * https://opensource.org/licenses/MIT.
7 *
8 * SPDX-License-Identifier: MIT
9 **************************************************************************/
10
11
12 /**************************************************************************/
13 /**************************************************************************/
14 /** */
15 /** NetX Component */
16 /** */
17 /** MQTT (MQTT) */
18 /** */
19 /**************************************************************************/
20 /**************************************************************************/
21
22 #define NXD_MQTT_CLIENT_SOURCE_CODE
23
24
25 /* Force error checking to be disabled in this module */
26
27 #ifndef NX_DISABLE_ERROR_CHECKING
28 #define NX_DISABLE_ERROR_CHECKING
29 #endif
30
31
32 /* Include necessary system files. */
33
34 #include "tx_api.h"
35 #include "nx_api.h"
36 #include "nx_ip.h"
37 #include "nxd_mqtt_client.h"
38
39 /* Bring in externals for caller checking code. */
40
41 #define MQTT_ALL_EVENTS ((ULONG)0xFFFFFFFF)
42 #define MQTT_TIMEOUT_EVENT ((ULONG)0x00000001)
43 #define MQTT_PACKET_RECEIVE_EVENT ((ULONG)0x00000002)
44 #define MQTT_START_EVENT ((ULONG)0x00000004)
45 #define MQTT_DELETE_EVENT ((ULONG)0x00000008)
46 #define MQTT_PING_TIMEOUT_EVENT ((ULONG)0x00000010)
47 #define MQTT_NETWORK_DISCONNECT_EVENT ((ULONG)0x00000020)
48 #define MQTT_TCP_ESTABLISH_EVENT ((ULONG)0x00000040)
49
50 static UINT _nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
51 CHAR *client_id, UINT client_id_length,
52 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
53 VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority);
54 static UINT _nxd_mqtt_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UINT wait_option);
55 static UINT _nxd_mqtt_packet_receive(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, UINT wait_option);
56 static UINT _nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET **new_packet_ptr,
57 USHORT packet_id, UCHAR set_duplicate_flag, UINT wait_option);
58 static VOID _nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr);
59 static VOID _nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr);
60 static UINT _nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option);
61 static UINT _nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option);
62
63 /**************************************************************************/
64 /* */
65 /* FUNCTION RELEASE */
66 /* */
67 /* _nxd_mqtt_client_set_fixed_header PORTABLE C */
68 /* 6.1 */
69 /* AUTHOR */
70 /* */
71 /* Yuxin Zhou, Microsoft Corporation */
72 /* */
73 /* DESCRIPTION */
74 /* */
75 /* This function writes the fixed header filed in the outgoing */
76 /* MQTT packet. */
77 /* */
78 /* This function follows the logic outlined in 2.2 in MQTT */
79 /* specification. */
80 /* */
81 /* INPUT */
82 /* */
83 /* client_ptr Pointer to MQTT Client */
84 /* packet_ptr Outgoing MQTT packet */
85 /* control_header Control byte */
86 /* length Remaining length in bytes */
87 /* wait_option Wait option */
88 /* */
89 /* OUTPUT */
90 /* */
91 /* status */
92 /* */
93 /* CALLS */
94 /* */
95 /* nx_packet_data_append Append packet data */
96 /* */
97 /* CALLED BY */
98 /* */
99 /* _nxd_mqtt_client_sub_unsub */
100 /* _nxd_mqtt_client_connect */
101 /* _nxd_mqtt_client_publish */
102 /* */
103 /* RELEASE HISTORY */
104 /* */
105 /* DATE NAME DESCRIPTION */
106 /* */
107 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
108 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
109 /* resulting in version 6.1 */
110 /* */
111 /**************************************************************************/
_nxd_mqtt_client_set_fixed_header(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,UCHAR control_header,UINT length,UINT wait_option)112 UINT _nxd_mqtt_client_set_fixed_header(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UCHAR control_header, UINT length, UINT wait_option)
113 {
114 UCHAR fixed_header[5];
115 UCHAR *byte = fixed_header;
116 UINT count = 0;
117 UINT ret;
118
119 *byte = control_header;
120 byte++;
121
122 do
123 {
124 if (length & 0xFFFFFF80)
125 {
126 *(byte + count) = (UCHAR)((length & 0x7F) | 0x80);
127 }
128 else
129 {
130 *(byte + count) = length & 0x7F;
131 }
132 length = length >> 7;
133
134 count++;
135 } while (length != 0);
136
137 ret = nx_packet_data_append(packet_ptr, fixed_header, count + 1,
138 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
139
140 return(ret);
141 }
142
143
144
145 /**************************************************************************/
146 /* */
147 /* FUNCTION RELEASE */
148 /* */
149 /* _nxd_mqtt_read_remaining_length PORTABLE C */
150 /* 6.1 */
151 /* AUTHOR */
152 /* */
153 /* Yuxin Zhou, Microsoft Corporation */
154 /* */
155 /* DESCRIPTION */
156 /* */
157 /* This function parses the remaining length filed in the incoming */
158 /* MQTT packet. */
159 /* */
160 /* This function follows the logic outlined in 2.2.3 in MQTT */
161 /* specification */
162 /* */
163 /* INPUT */
164 /* */
165 /* packet_ptr Incoming MQTT packet. */
166 /* remaining_length remaining length in bytes, */
167 /* this is the return value. */
168 /* offset Pointer to offset of the */
169 /* remaining data */
170 /* */
171 /* OUTPUT */
172 /* */
173 /* status */
174 /* */
175 /* CALLS */
176 /* */
177 /* None */
178 /* */
179 /* CALLED BY */
180 /* */
181 /* _nxd_mqtt_process_publish */
182 /* _nxd_mqtt_client_message_get */
183 /* _nxd_mqtt_process_sub_unsub_ack */
184 /* */
185 /* RELEASE HISTORY */
186 /* */
187 /* DATE NAME DESCRIPTION */
188 /* */
189 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
190 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
191 /* resulting in version 6.1 */
192 /* */
193 /**************************************************************************/
_nxd_mqtt_read_remaining_length(NX_PACKET * packet_ptr,UINT * remaining_length,ULONG * offset_ptr)194 UINT _nxd_mqtt_read_remaining_length(NX_PACKET *packet_ptr, UINT *remaining_length, ULONG *offset_ptr)
195 {
196 UINT value = 0;
197 UCHAR bytes[4] = {0};
198 UINT multiplier = 1;
199 UINT byte_count = 0;
200 ULONG bytes_copied;
201
202 if (nx_packet_data_extract_offset(packet_ptr, 1, &bytes, sizeof(bytes), &bytes_copied))
203 {
204
205 /* Packet is incomplete. */
206 return(NXD_MQTT_PARTIAL_PACKET);
207 }
208
209 do
210 {
211 if (byte_count >= bytes_copied)
212 {
213 if (byte_count == 4)
214 {
215 return(NXD_MQTT_INTERNAL_ERROR);
216 }
217 else
218 {
219
220 /* Packet is incomplete. */
221 return(NXD_MQTT_PARTIAL_PACKET);
222 }
223 }
224 value += (((bytes[byte_count]) & 0x7F) * multiplier);
225 multiplier = multiplier << 7;
226 } while ((bytes[byte_count++]) & 0x80);
227
228 if ((1 + byte_count + value) > packet_ptr -> nx_packet_length)
229 {
230
231 /* Packet is incomplete. */
232 /* Remaining length is larger than packet size. */
233 return(NXD_MQTT_PARTIAL_PACKET);
234 }
235
236 *remaining_length = value;
237 *offset_ptr = (1 + byte_count);
238
239 return(NXD_MQTT_SUCCESS);
240 }
241
242
243
244 /**************************************************************************/
245 /* */
246 /* FUNCTION RELEASE */
247 /* */
248 /* _nxd_mqtt_client_sub_unsub PORTABLE C */
249 /* 6.3.0 */
250 /* AUTHOR */
251 /* */
252 /* Yuxin Zhou, Microsoft Corporation */
253 /* */
254 /* DESCRIPTION */
255 /* */
256 /* This function sends a subscribe or unsubscribe message to the */
257 /* broker. */
258 /* */
259 /* */
260 /* INPUT */
261 /* */
262 /* client_ptr Pointer to MQTT Client */
263 /* op Subscribe or Unsubscribe */
264 /* topic_name Pointer to the topic string */
265 /* to subscribe to */
266 /* topic_name_length Length of the topic string */
267 /* in bytes */
268 /* packet_id_ptr Pointer to packet id that */
269 /* will be filled with */
270 /* assigned packet id for */
271 /* sub/unsub message */
272 /* QoS Expected QoS level */
273 /* */
274 /* OUTPUT */
275 /* */
276 /* status Completion status */
277 /* */
278 /* CALLS */
279 /* */
280 /* tx_mutex_get */
281 /* _nxd_mqtt_client_packet_allocate */
282 /* _nxd_mqtt_client_set_fixed_header */
283 /* _nxd_mqtt_client_append_message */
284 /* tx_mutex_put */
285 /* _nxd_mqtt_packet_send */
286 /* nx_packet_release */
287 /* _nxd_mqtt_copy_transmit_packet */
288 /* */
289 /* CALLED BY */
290 /* */
291 /* _nxd_mqtt_client_subscribe */
292 /* _nxd_mqtt_client_unsubscribe */
293 /* */
294 /* RELEASE HISTORY */
295 /* */
296 /* DATE NAME DESCRIPTION */
297 /* */
298 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
299 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
300 /* resulting in version 6.1 */
301 /* 11-09-2020 Yuxin Zhou Modified comment(s), and */
302 /* added packet id parameter, */
303 /* resulting in version 6.1.2 */
304 /* 07-29-2022 Spencer McDonough Modified comment(s), */
305 /* improved internal logic, */
306 /* resulting in version 6.1.12 */
307 /* 10-31-2022 Bo Chen Modified comment(s), improved */
308 /* the logic of sending packet,*/
309 /* resulting in version 6.2.0 */
310 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
311 /* internal logic, */
312 /* resulting in version 6.3.0 */
313 /* */
314 /**************************************************************************/
_nxd_mqtt_client_sub_unsub(NXD_MQTT_CLIENT * client_ptr,UINT op,CHAR * topic_name,UINT topic_name_length,USHORT * packet_id_ptr,UINT QoS)315 UINT _nxd_mqtt_client_sub_unsub(NXD_MQTT_CLIENT *client_ptr, UINT op,
316 CHAR *topic_name, UINT topic_name_length,
317 USHORT *packet_id_ptr, UINT QoS)
318 {
319
320
321 NX_PACKET *packet_ptr;
322 NX_PACKET *transmit_packet_ptr;
323 UINT status;
324 UINT length = 0;
325 UINT ret = NXD_MQTT_SUCCESS;
326 UCHAR temp_data[2];
327
328 /* Obtain the mutex. */
329 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
330
331 if (status != TX_SUCCESS)
332 {
333 return(NXD_MQTT_MUTEX_FAILURE);
334 }
335
336 /* Do nothing if the client is already connected. */
337 if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTED)
338 {
339 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
340 return(NXD_MQTT_NOT_CONNECTED);
341 }
342
343 status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
344 if (status)
345 {
346 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
347 return(status);
348 }
349
350 /* Compute the remaining length field, starting with 2 bytes of packet ID */
351 length = 2;
352
353 /* Count the topic. */
354 length += (2 + topic_name_length);
355
356 if (op == ((MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02))
357 {
358 /* Count one byte for QoS */
359 length++;
360 }
361
362 /* Write out the control header and remaining length field. */
363 ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, (UCHAR )op, length, NX_WAIT_FOREVER);
364
365 if (ret)
366 {
367
368 /* Release the mutex. */
369 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
370
371 /* Release the packet. */
372 nx_packet_release(packet_ptr);
373
374 return(NXD_MQTT_PACKET_POOL_FAILURE);
375 }
376
377 temp_data[0] = (UCHAR)(client_ptr -> nxd_mqtt_client_packet_identifier >> 8);
378 temp_data[1] = (client_ptr -> nxd_mqtt_client_packet_identifier & 0xFF);
379
380 if (packet_id_ptr)
381 {
382 *packet_id_ptr = (USHORT)(client_ptr -> nxd_mqtt_client_packet_identifier & 0xFFFF);
383 }
384
385 /* Append packet ID. */
386 ret = nx_packet_data_append(packet_ptr, temp_data, 2, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_WAIT_FOREVER);
387
388 if (ret)
389 {
390
391 /* Release the mutex. */
392 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
393
394 /* Release the packet. */
395 nx_packet_release(packet_ptr);
396
397 return(NXD_MQTT_PACKET_POOL_FAILURE);
398 }
399
400 /* Append topic name */
401 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, topic_name, topic_name_length, NX_WAIT_FOREVER);
402
403 if (ret)
404 {
405
406 /* Release the mutex. */
407 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
408
409 /* Release the packet. */
410 nx_packet_release(packet_ptr);
411
412 return(NXD_MQTT_PACKET_POOL_FAILURE);
413 }
414
415 if (op == ((MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02))
416 {
417 /* Fill in QoS value. */
418 temp_data[0] = QoS & 0x3;
419
420 ret = nx_packet_data_append(packet_ptr, temp_data, 1, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_WAIT_FOREVER);
421
422 if (ret)
423 {
424
425 /* Release the mutex. */
426 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
427
428 /* Release the packet. */
429 nx_packet_release(packet_ptr);
430
431 return(NXD_MQTT_PACKET_POOL_FAILURE);
432 }
433 }
434
435 /* Copy packet for retransmission. */
436 if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
437 (USHORT)(client_ptr -> nxd_mqtt_client_packet_identifier),
438 NX_FALSE, NX_WAIT_FOREVER))
439 {
440 /* Release the mutex. */
441 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
442
443 /* Release the packet. */
444 nx_packet_release(packet_ptr);
445
446 return(NXD_MQTT_PACKET_POOL_FAILURE);
447 }
448
449 if (client_ptr -> message_transmit_queue_head == NX_NULL)
450 {
451 client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
452 }
453 else
454 {
455 client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
456 }
457 client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
458
459 client_ptr -> nxd_mqtt_client_packet_identifier = (client_ptr -> nxd_mqtt_client_packet_identifier + 1) & 0xFFFF;
460
461 /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
462 if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
463 client_ptr -> nxd_mqtt_client_packet_identifier = 1;
464
465 /* Update the timeout value. */
466 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
467
468 /* Release the mutex. */
469 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
470
471 /* Ready to send the connect message to the server. */
472 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
473
474 if (status)
475 {
476 /* Release the packet. */
477 nx_packet_release(packet_ptr);
478
479 ret = NXD_MQTT_COMMUNICATION_FAILURE;
480 }
481
482 return(ret);
483 }
484
485
486 /**************************************************************************/
487 /* */
488 /* FUNCTION RELEASE */
489 /* */
490 /* _nxd_mqtt_client_packet_allocate PORTABLE C */
491 /* 6.3.0 */
492 /* AUTHOR */
493 /* */
494 /* Yuxin Zhou, Microsoft Corporation */
495 /* */
496 /* DESCRIPTION */
497 /* */
498 /* This function allocates a packet for transmitting MQTT message. */
499 /* Special care has to be taken for accommodating IPv4/IPv6 header, */
500 /* and possibly TLS record if TLS is being used. On failure, the */
501 /* TLS mutex is released and the caller can simply return. */
502 /* */
503 /* */
504 /* INPUT */
505 /* */
506 /* client_ptr Pointer to MQTT Client */
507 /* packet_ptr Allocated packet to be */
508 /* returned to the caller. */
509 /* */
510 /* OUTPUT */
511 /* */
512 /* status Completion status */
513 /* */
514 /* CALLS */
515 /* */
516 /* nx_secure_tls_packet_allocate Allocate packet for MQTT */
517 /* over TLS socket */
518 /* nx_packet_allocate Allocate a packet for MQTT */
519 /* over regular TCP socket */
520 /* tx_mutex_put Release a mutex */
521 /* */
522 /* CALLED BY */
523 /* */
524 /* _nxd_mqtt_process_publish */
525 /* _nxd_mqtt_client_connect */
526 /* _nxd_mqtt_client_publish */
527 /* _nxd_mqtt_client_subscribe */
528 /* _nxd_mqtt_client_unsubscribe */
529 /* _nxd_mqtt_client_send_simple_message */
530 /* */
531 /* RELEASE HISTORY */
532 /* */
533 /* DATE NAME DESCRIPTION */
534 /* */
535 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
536 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
537 /* resulting in version 6.1 */
538 /* 07-29-2022 Spencer McDonough Modified comment(s), and */
539 /* improved internal logic, */
540 /* resulting in version 6.1.12 */
541 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
542 /* mqtt over websocket, */
543 /* resulting in version 6.2.0 */
544 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
545 /* internal logic, */
546 /* resulting in version 6.3.0 */
547 /* */
548 /**************************************************************************/
_nxd_mqtt_client_packet_allocate(NXD_MQTT_CLIENT * client_ptr,NX_PACKET ** packet_ptr,ULONG wait_option)549 UINT _nxd_mqtt_client_packet_allocate(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, ULONG wait_option)
550 {
551 UINT status = NXD_MQTT_SUCCESS;
552
553 #ifdef NXD_MQTT_OVER_WEBSOCKET
554 if (client_ptr -> nxd_mqtt_client_use_websocket)
555 {
556
557 /* Use WebSocket packet allocate since it is able to count for WebSocket-related header space */
558 status = nx_websocket_client_packet_allocate(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, wait_option);
559 }
560 else
561 #endif /* NXD_MQTT_OVER_WEBSOCKET */
562 #ifdef NX_SECURE_ENABLE
563 if (client_ptr -> nxd_mqtt_client_use_tls)
564 {
565 /* Use TLS packet allocate. The TLS packet allocate is able to count for
566 TLS-related header space including crypto initial vector area. */
567 status = nx_secure_tls_packet_allocate(&client_ptr -> nxd_mqtt_tls_session, client_ptr -> nxd_mqtt_client_packet_pool_ptr,
568 packet_ptr, wait_option);
569 }
570 /* Allocate a packet */
571 else
572 {
573 #endif /* NX_SECURE_ENABLE */
574 if (client_ptr -> nxd_mqtt_client_socket.nx_tcp_socket_connect_ip.nxd_ip_version == NX_IP_VERSION_V4)
575 {
576 status = nx_packet_allocate(client_ptr -> nxd_mqtt_client_packet_pool_ptr, packet_ptr, NX_IPv4_TCP_PACKET,
577 wait_option);
578 }
579 else
580 {
581 status = nx_packet_allocate(client_ptr -> nxd_mqtt_client_packet_pool_ptr, packet_ptr, NX_IPv6_TCP_PACKET,
582 wait_option);
583 }
584 #ifdef NX_SECURE_ENABLE
585 }
586 #endif /* NX_SECURE_ENABLE */
587
588 if (status != NX_SUCCESS)
589 {
590 return(NXD_MQTT_PACKET_POOL_FAILURE);
591 }
592
593 return(NXD_MQTT_SUCCESS);
594 }
595
596 /**************************************************************************/
597 /* */
598 /* FUNCTION RELEASE */
599 /* */
600 /* _nxd_mqtt_packet_send PORTABLE C */
601 /* 6.2.0 */
602 /* AUTHOR */
603 /* */
604 /* Yuxin Zhou, Microsoft Corporation */
605 /* */
606 /* DESCRIPTION */
607 /* */
608 /* This function sends out a packet. */
609 /* */
610 /* */
611 /* INPUT */
612 /* */
613 /* client_ptr Pointer to MQTT Client */
614 /* packet_ptr Pointer to packet */
615 /* wait_option Timeout value */
616 /* */
617 /* OUTPUT */
618 /* */
619 /* status Completion status */
620 /* */
621 /* CALLS */
622 /* */
623 /* nx_websocket_client_send */
624 /* nx_secure_tls_session_send */
625 /* nx_tcp_socket_send */
626 /* */
627 /* CALLED BY */
628 /* */
629 /* */
630 /* RELEASE HISTORY */
631 /* */
632 /* DATE NAME DESCRIPTION */
633 /* */
634 /* 10-31-2022 Bo Chen Initial Version 6.2.0 */
635 /* */
636 /**************************************************************************/
_nxd_mqtt_packet_send(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,UINT wait_option)637 static UINT _nxd_mqtt_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UINT wait_option)
638 {
639 UINT status = NXD_MQTT_SUCCESS;
640
641 #ifdef NXD_MQTT_OVER_WEBSOCKET
642 if (client_ptr -> nxd_mqtt_client_use_websocket)
643 {
644 status = nx_websocket_client_send(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, NX_WEBSOCKET_OPCODE_BINARY_FRAME, NX_TRUE, wait_option);
645 }
646 else
647 #endif /* NXD_MQTT_OVER_WEBSOCKET */
648
649 #ifdef NX_SECURE_ENABLE
650 if (client_ptr -> nxd_mqtt_client_use_tls)
651 {
652 status = nx_secure_tls_session_send(&(client_ptr -> nxd_mqtt_tls_session), packet_ptr, wait_option);
653 }
654 else
655 #endif /* NX_SECURE_ENABLE */
656 {
657 status = nx_tcp_socket_send(&client_ptr -> nxd_mqtt_client_socket, packet_ptr, wait_option);
658 }
659
660 return(status);
661 }
662
663 /**************************************************************************/
664 /* */
665 /* FUNCTION RELEASE */
666 /* */
667 /* _nxd_mqtt_packet_receive PORTABLE C */
668 /* 6.2.0 */
669 /* AUTHOR */
670 /* */
671 /* Yuxin Zhou, Microsoft Corporation */
672 /* */
673 /* DESCRIPTION */
674 /* */
675 /* This function receives a packet. */
676 /* */
677 /* */
678 /* INPUT */
679 /* */
680 /* client_ptr Pointer to MQTT Client */
681 /* packet_ptr Pointer to packet */
682 /* wait_option Timeout value */
683 /* */
684 /* OUTPUT */
685 /* */
686 /* status Completion status */
687 /* */
688 /* CALLS */
689 /* */
690 /* nx_websocket_client_receive */
691 /* nx_secure_tls_session_receive */
692 /* nx_tcp_socket_receive */
693 /* */
694 /* CALLED BY */
695 /* */
696 /* */
697 /* RELEASE HISTORY */
698 /* */
699 /* DATE NAME DESCRIPTION */
700 /* */
701 /* 10-31-2022 Bo Chen Initial Version 6.2.0 */
702 /* */
703 /**************************************************************************/
_nxd_mqtt_packet_receive(NXD_MQTT_CLIENT * client_ptr,NX_PACKET ** packet_ptr,UINT wait_option)704 static UINT _nxd_mqtt_packet_receive(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, UINT wait_option)
705 {
706 UINT status = NXD_MQTT_SUCCESS;
707 #ifdef NXD_MQTT_OVER_WEBSOCKET
708 UINT op_code = 0;
709 #endif /* NXD_MQTT_OVER_WEBSOCKET*/
710
711 #ifdef NXD_MQTT_OVER_WEBSOCKET
712 if (client_ptr -> nxd_mqtt_client_use_websocket)
713 {
714 status = nx_websocket_client_receive(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, &op_code, wait_option);
715 if ((status == NX_SUCCESS) && (op_code != NX_WEBSOCKET_OPCODE_BINARY_FRAME))
716 {
717 return(NX_INVALID_PACKET);
718 }
719 }
720 else
721 #endif /* NXD_MQTT_OVER_WEBSOCKET */
722
723 #ifdef NX_SECURE_ENABLE
724 if (client_ptr -> nxd_mqtt_client_use_tls)
725 {
726 status = nx_secure_tls_session_receive(&(client_ptr -> nxd_mqtt_tls_session), packet_ptr, wait_option);
727 }
728 else
729 #endif /* NX_SECURE_ENABLE */
730 {
731 status = nx_tcp_socket_receive(&(client_ptr -> nxd_mqtt_client_socket), packet_ptr, wait_option);
732 }
733
734 return(status);
735 }
736
737 /**************************************************************************/
738 /* */
739 /* FUNCTION RELEASE */
740 /* */
741 /* _nxd_mqtt_tcp_establish_notify PORTABLE C */
742 /* 6.1 */
743 /* AUTHOR */
744 /* */
745 /* Yuxin Zhou, Microsoft Corporation */
746 /* */
747 /* DESCRIPTION */
748 /* */
749 /* This internal function is installed as TCP connection establish */
750 /* callback function. */
751 /* */
752 /* INPUT */
753 /* */
754 /* socket_ptr The socket that receives */
755 /* the message. */
756 /* */
757 /* OUTPUT */
758 /* */
759 /* None */
760 /* */
761 /* CALLS */
762 /* */
763 /* tx_event_flags_set */
764 /* */
765 /* CALLED BY */
766 /* */
767 /* _nxd_mqtt_thread_entry */
768 /* _nxd_mqtt_client_event_process */
769 /* */
770 /* RELEASE HISTORY */
771 /* */
772 /* DATE NAME DESCRIPTION */
773 /* */
774 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
775 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
776 /* resulting in version 6.1 */
777 /* */
778 /**************************************************************************/
_nxd_mqtt_tcp_establish_notify(NX_TCP_SOCKET * socket_ptr)779 static VOID _nxd_mqtt_tcp_establish_notify(NX_TCP_SOCKET *socket_ptr)
780 {
781 NXD_MQTT_CLIENT *client_ptr;
782
783 client_ptr = (NXD_MQTT_CLIENT *)socket_ptr -> nx_tcp_socket_reserved_ptr;
784
785 if (&(client_ptr -> nxd_mqtt_client_socket) == socket_ptr)
786 {
787
788 /* Set the event flag. */
789 #ifndef NXD_MQTT_CLOUD_ENABLE
790 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_TCP_ESTABLISH_EVENT, TX_OR);
791 #else
792 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_TCP_ESTABLISH_EVENT);
793 #endif /* NXD_MQTT_CLOUD_ENABLE */
794 }
795 }
796
797 /**************************************************************************/
798 /* */
799 /* FUNCTION RELEASE */
800 /* */
801 /* _nxd_mqtt_receive_callback PORTABLE C */
802 /* 6.1 */
803 /* AUTHOR */
804 /* */
805 /* Yuxin Zhou, Microsoft Corporation */
806 /* */
807 /* DESCRIPTION */
808 /* */
809 /* This internal function is installed as TCP receive callback */
810 /* function. On receiving a TCP message, the callback function */
811 /* sets an event flag to trigger MQTT client to process received */
812 /* message. */
813 /* */
814 /* */
815 /* INPUT */
816 /* */
817 /* socket_ptr The socket that receives */
818 /* the message. */
819 /* */
820 /* OUTPUT */
821 /* */
822 /* None */
823 /* */
824 /* CALLS */
825 /* */
826 /* tx_event_flags_set */
827 /* */
828 /* CALLED BY */
829 /* */
830 /* _nxd_mqtt_thread_entry */
831 /* _nxd_mqtt_client_event_process */
832 /* */
833 /* RELEASE HISTORY */
834 /* */
835 /* DATE NAME DESCRIPTION */
836 /* */
837 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
838 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
839 /* resulting in version 6.1 */
840 /* */
841 /**************************************************************************/
_nxd_mqtt_receive_callback(NX_TCP_SOCKET * socket_ptr)842 static VOID _nxd_mqtt_receive_callback(NX_TCP_SOCKET *socket_ptr)
843 {
844 NXD_MQTT_CLIENT *client_ptr;
845
846 client_ptr = (NXD_MQTT_CLIENT *)socket_ptr -> nx_tcp_socket_reserved_ptr;
847
848 if (&(client_ptr -> nxd_mqtt_client_socket) == socket_ptr)
849 {
850 /* Set the event flag. */
851 #ifndef NXD_MQTT_CLOUD_ENABLE
852 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_PACKET_RECEIVE_EVENT, TX_OR);
853 #else
854 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PACKET_RECEIVE_EVENT);
855 #endif /* NXD_MQTT_CLOUD_ENABLE */
856 }
857 }
858
859 /**************************************************************************/
860 /* */
861 /* FUNCTION RELEASE */
862 /* */
863 /* _nxd_mqtt_copy_transmit_packet PORTABLE C */
864 /* 6.1.8 */
865 /* AUTHOR */
866 /* */
867 /* Yuxin Zhou, Microsoft Corporation */
868 /* */
869 /* DESCRIPTION */
870 /* */
871 /* This internal function saves a transmit packet. */
872 /* A transmit packet is allocated to store QoS 1 and 2 messages. */
873 /* Upon a message being properly acknowledged, the packet will */
874 /* be released. */
875 /* */
876 /* */
877 /* INPUT */
878 /* */
879 /* client_ptr Pointer to MQTT Client */
880 /* packet_ptr Pointer to the MQTT message */
881 /* packet to be saved */
882 /* new_packet_ptr Return a copied packet */
883 /* packet_id Current packet ID */
884 /* set_duplicate_flag Set duplicate flag for fixed */
885 /* header or not */
886 /* wait_option Timeout value */
887 /* */
888 /* OUTPUT */
889 /* */
890 /* status */
891 /* */
892 /* CALLS */
893 /* */
894 /* nx_packet_copy */
895 /* */
896 /* CALLED BY */
897 /* */
898 /* _nxd_mqtt_client_sub_unsub */
899 /* _nxd_mqtt_process_publish */
900 /* */
901 /* RELEASE HISTORY */
902 /* */
903 /* DATE NAME DESCRIPTION */
904 /* */
905 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
906 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
907 /* resulting in version 6.1 */
908 /* 08-02-2021 Yuxin Zhou Modified comment(s), */
909 /* supported maximum transmit */
910 /* queue depth, */
911 /* resulting in version 6.1.8 */
912 /* */
913 /**************************************************************************/
_nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET ** new_packet_ptr,USHORT packet_id,UCHAR set_duplicate_flag,UINT wait_option)914 static UINT _nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET **new_packet_ptr,
915 USHORT packet_id, UCHAR set_duplicate_flag, UINT wait_option)
916 {
917 UINT status;
918
919 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
920 if (client_ptr -> message_transmit_queue_depth >= NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH)
921 {
922
923 /* Hit the transmit queue depeth. No more packets should be queued. */
924 return(NX_TX_QUEUE_DEPTH);
925 }
926 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
927
928 /* Copy current packet. */
929 status = nx_packet_copy(packet_ptr, new_packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
930 if (status)
931 {
932
933 /* No available packet to be stored. */
934 return(NXD_MQTT_PACKET_POOL_FAILURE);
935 }
936
937 /* Save packet_id at the beginning of packet. */
938 *((USHORT *)(*new_packet_ptr) -> nx_packet_data_start) = packet_id;
939
940 if (set_duplicate_flag)
941 {
942
943 /* Update duplicate flag in fixed header. */
944 *((*new_packet_ptr) -> nx_packet_prepend_ptr) = (*((*new_packet_ptr) -> nx_packet_prepend_ptr)) | MQTT_PUBLISH_DUP_FLAG;
945 }
946
947 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
948 /* Increase the transmit queue depth. */
949 client_ptr -> message_transmit_queue_depth++;
950 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
951
952 return(NXD_MQTT_SUCCESS);
953 }
954
955 /**************************************************************************/
956 /* */
957 /* FUNCTION RELEASE */
958 /* */
959 /* _nxd_mqtt_release_transmit_packet PORTABLE C */
960 /* 6.1.8 */
961 /* AUTHOR */
962 /* */
963 /* Yuxin Zhou, Microsoft Corporation */
964 /* */
965 /* DESCRIPTION */
966 /* */
967 /* This internal function releases a transmit packet. */
968 /* A transmit packet is allocated to store QoS 1 and 2 messages. */
969 /* Upon a message being properly acknowledged, the packet can */
970 /* be released. */
971 /* */
972 /* */
973 /* INPUT */
974 /* */
975 /* client_ptr Pointer to MQTT Client */
976 /* packet_ptr Pointer to the MQTT message */
977 /* packet to be removed */
978 /* previous_packet_ptr Pointer to the previous packet*/
979 /* or NULL if none exists */
980 /* */
981 /* OUTPUT */
982 /* */
983 /* None */
984 /* */
985 /* CALLS */
986 /* */
987 /* None */
988 /* */
989 /* CALLED BY */
990 /* */
991 /* _nxd_mqtt_thread_entry */
992 /* _nxd_mqtt_client_event_process */
993 /* */
994 /* RELEASE HISTORY */
995 /* */
996 /* DATE NAME DESCRIPTION */
997 /* */
998 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
999 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1000 /* resulting in version 6.1 */
1001 /* 08-02-2021 Yuxin Zhou Modified comment(s), */
1002 /* supported maximum transmit */
1003 /* queue depth, */
1004 /* resulting in version 6.1.8 */
1005 /* */
1006 /**************************************************************************/
_nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET * previous_packet_ptr)1007 static VOID _nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr)
1008 {
1009
1010 if (previous_packet_ptr)
1011 {
1012 previous_packet_ptr -> nx_packet_queue_next = packet_ptr -> nx_packet_queue_next;
1013 }
1014 else
1015 {
1016 client_ptr -> message_transmit_queue_head = packet_ptr -> nx_packet_queue_next;
1017 }
1018
1019 if (packet_ptr == client_ptr -> message_transmit_queue_tail)
1020 {
1021 client_ptr -> message_transmit_queue_tail = previous_packet_ptr;
1022 }
1023 nx_packet_release(packet_ptr);
1024
1025 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
1026 client_ptr -> message_transmit_queue_depth--;
1027 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
1028 }
1029
1030 /**************************************************************************/
1031 /* */
1032 /* FUNCTION RELEASE */
1033 /* */
1034 /* _nxd_mqtt_release_receive_packet PORTABLE C */
1035 /* 6.1 */
1036 /* AUTHOR */
1037 /* */
1038 /* Yuxin Zhou, Microsoft Corporation */
1039 /* */
1040 /* DESCRIPTION */
1041 /* */
1042 /* This internal function releases a receive packet. */
1043 /* A receive packet is allocated to store QoS 1 and 2 messages. */
1044 /* Upon a message being properly acknowledged, the packet can */
1045 /* be released. */
1046 /* */
1047 /* */
1048 /* INPUT */
1049 /* */
1050 /* client_ptr Pointer to MQTT Client */
1051 /* packet_ptr Pointer to the MQTT message */
1052 /* packet to be removed */
1053 /* previous_packet_ptr Pointer to the previous packet*/
1054 /* or NULL if none exists */
1055 /* */
1056 /* OUTPUT */
1057 /* */
1058 /* None */
1059 /* */
1060 /* CALLS */
1061 /* */
1062 /* None */
1063 /* */
1064 /* CALLED BY */
1065 /* */
1066 /* _nxd_mqtt_thread_entry */
1067 /* _nxd_mqtt_client_event_process */
1068 /* */
1069 /* RELEASE HISTORY */
1070 /* */
1071 /* DATE NAME DESCRIPTION */
1072 /* */
1073 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1074 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1075 /* resulting in version 6.1 */
1076 /* */
1077 /**************************************************************************/
_nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET * previous_packet_ptr)1078 static VOID _nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr)
1079 {
1080
1081 if (previous_packet_ptr)
1082 {
1083 previous_packet_ptr -> nx_packet_queue_next = packet_ptr -> nx_packet_queue_next;
1084 }
1085 else
1086 {
1087 client_ptr -> message_receive_queue_head = packet_ptr -> nx_packet_queue_next;
1088 }
1089
1090 if (packet_ptr == client_ptr -> message_receive_queue_tail)
1091 {
1092 client_ptr -> message_receive_queue_tail = previous_packet_ptr;
1093 }
1094
1095 client_ptr -> message_receive_queue_depth--;
1096 }
1097
1098 /**************************************************************************/
1099 /* */
1100 /* FUNCTION RELEASE */
1101 /* */
1102 /* _nxd_mqtt_process_connack PORTABLE C */
1103 /* 6.1 */
1104 /* AUTHOR */
1105 /* */
1106 /* Yuxin Zhou, Microsoft Corporation */
1107 /* */
1108 /* DESCRIPTION */
1109 /* */
1110 /* This internal function processes a CONNACK message from the broker. */
1111 /* */
1112 /* */
1113 /* INPUT */
1114 /* */
1115 /* client_ptr Pointer to MQTT Client */
1116 /* packet_ptr Pointer to the packet */
1117 /* wait_option Timeout value */
1118 /* */
1119 /* OUTPUT */
1120 /* */
1121 /* status */
1122 /* */
1123 /* CALLS */
1124 /* */
1125 /* [nxd_mqtt_connect_notify] User supplied connect */
1126 /* callback function */
1127 /* tx_mutex_get */
1128 /* tx_mutex_put */
1129 /* _nxd_mqtt_client_retransmit_message */
1130 /* _nxd_mqtt_client_connection_end */
1131 /* */
1132 /* */
1133 /* CALLED BY */
1134 /* */
1135 /* _nxd_mqtt_packet_receive_process */
1136 /* */
1137 /* RELEASE HISTORY */
1138 /* */
1139 /* DATE NAME DESCRIPTION */
1140 /* */
1141 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1142 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1143 /* resulting in version 6.1 */
1144 /* */
1145 /**************************************************************************/
_nxd_mqtt_process_connack(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,ULONG wait_option)1146 static UINT _nxd_mqtt_process_connack(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, ULONG wait_option)
1147 {
1148
1149 UINT ret = NXD_MQTT_COMMUNICATION_FAILURE;
1150 MQTT_PACKET_CONNACK *connack_packet_ptr = (MQTT_PACKET_CONNACK *)(packet_ptr -> nx_packet_prepend_ptr);
1151
1152
1153 /* Check the length. */
1154 if ((packet_ptr -> nx_packet_length != sizeof(MQTT_PACKET_CONNACK)) ||
1155 (connack_packet_ptr -> mqtt_connack_packet_header >> 4 != MQTT_CONTROL_PACKET_TYPE_CONNACK))
1156 {
1157 /* Invalid packet length. Free the packet and process error. */
1158 ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1159 }
1160 else
1161 {
1162
1163 /* Check remaining length. */
1164 if (connack_packet_ptr -> mqtt_connack_packet_remaining_length != 2)
1165 {
1166 ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1167 }
1168 /* Follow MQTT-3.2.2-1 rule. */
1169 else if ((client_ptr -> nxd_mqtt_clean_session) && (connack_packet_ptr -> mqtt_connack_packet_ack_flags & MQTT_CONNACK_CONNECT_FLAGS_SP))
1170 {
1171
1172 /* Client requested clean session, and server responded with Session Present. This is a violation. */
1173 ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1174 }
1175 else if (connack_packet_ptr -> mqtt_connack_packet_return_code > MQTT_CONNACK_CONNECT_RETURN_CODE_NOT_AUTHORIZED)
1176 {
1177 ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1178 }
1179 else if (connack_packet_ptr -> mqtt_connack_packet_return_code > 0)
1180 {
1181
1182 /* Pass the server return code to the application. */
1183 ret = (UINT)(NXD_MQTT_ERROR_CONNECT_RETURN_CODE + connack_packet_ptr -> mqtt_connack_packet_return_code);
1184 }
1185 else
1186 {
1187 ret = NXD_MQTT_SUCCESS;
1188
1189 /* Obtain mutex before we modify client control block. */
1190 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
1191
1192 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_CONNECTED;
1193
1194 /* Initialize the packet identification field. */
1195 client_ptr -> nxd_mqtt_client_packet_identifier = NXD_MQTT_INITIAL_PACKET_ID_VALUE;
1196
1197 /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
1198 if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
1199 client_ptr -> nxd_mqtt_client_packet_identifier = 1;
1200
1201 /* Release mutex */
1202 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1203 }
1204 }
1205
1206 /* Check callback function. */
1207 if (client_ptr -> nxd_mqtt_connect_notify)
1208 {
1209 client_ptr -> nxd_mqtt_connect_notify(client_ptr, ret, client_ptr -> nxd_mqtt_connect_context);
1210 }
1211
1212 if (ret == NXD_MQTT_SUCCESS)
1213 {
1214
1215 /* If client doesn't start with Clean Session, and there are un-acked PUBLISH messages,
1216 we shall re-publish these messages. */
1217 if ((client_ptr -> nxd_mqtt_clean_session != NX_TRUE) && (client_ptr -> message_transmit_queue_head))
1218 {
1219
1220 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
1221
1222 /* There are messages from the previous session that has not been acknowledged. */
1223 ret = _nxd_mqtt_client_retransmit_message(client_ptr, wait_option);
1224
1225 /* Release mutex */
1226 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1227 }
1228 }
1229 else
1230 {
1231
1232 /* End connection. */
1233 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
1234 }
1235
1236 return(ret);
1237 }
1238
1239 /**************************************************************************/
1240 /* */
1241 /* FUNCTION RELEASE */
1242 /* */
1243 /* _nxd_mqtt_process_publish_packet PORTABLE C */
1244 /* 6.1 */
1245 /* AUTHOR */
1246 /* */
1247 /* Yuxin Zhou, Microsoft Corporation */
1248 /* */
1249 /* DESCRIPTION */
1250 /* */
1251 /* This internal function processes a packet and parses topic and */
1252 /* message. */
1253 /* */
1254 /* */
1255 /* INPUT */
1256 /* */
1257 /* packet_ptr Pointer to the packet */
1258 /* topic_offset_ptr Return topic offset */
1259 /* topic_length_ptr Return topic length */
1260 /* message_offset_ptr Return message offset */
1261 /* message_length_ptr Return message length */
1262 /* */
1263 /* OUTPUT */
1264 /* */
1265 /* Status */
1266 /* */
1267 /* CALLS */
1268 /* */
1269 /* _nxd_mqtt_read_remaining_length */
1270 /* nx_packet_data_extract_offset */
1271 /* */
1272 /* */
1273 /* CALLED BY */
1274 /* */
1275 /* _nxd_mqtt_process_publish */
1276 /* */
1277 /* RELEASE HISTORY */
1278 /* */
1279 /* DATE NAME DESCRIPTION */
1280 /* */
1281 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1282 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1283 /* resulting in version 6.1 */
1284 /* */
1285 /**************************************************************************/
_nxd_mqtt_process_publish_packet(NX_PACKET * packet_ptr,ULONG * topic_offset_ptr,USHORT * topic_length_ptr,ULONG * message_offset_ptr,ULONG * message_length_ptr)1286 UINT _nxd_mqtt_process_publish_packet(NX_PACKET *packet_ptr, ULONG *topic_offset_ptr, USHORT *topic_length_ptr,
1287 ULONG *message_offset_ptr, ULONG *message_length_ptr)
1288 {
1289 UCHAR QoS;
1290 UINT remaining_length = 0;
1291 UINT topic_length;
1292 ULONG offset;
1293 UCHAR bytes[2];
1294 ULONG bytes_copied;
1295
1296
1297 QoS = (UCHAR)((*(packet_ptr -> nx_packet_prepend_ptr) & MQTT_PUBLISH_QOS_LEVEL_FIELD) >> 1);
1298
1299 if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1300 {
1301 return(NXD_MQTT_INVALID_PACKET);
1302 }
1303
1304 if (remaining_length < 2)
1305 {
1306 return(NXD_MQTT_INVALID_PACKET);
1307 }
1308
1309 /* Get topic length fields. */
1310 if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1311 (bytes_copied != sizeof(bytes)))
1312 {
1313 return(NXD_MQTT_INVALID_PACKET);
1314 }
1315
1316 topic_length = (UINT)(*(bytes) << 8) | (*(bytes + 1));
1317
1318 if (topic_length > remaining_length - 2u)
1319 {
1320 return(NXD_MQTT_INVALID_PACKET);
1321 }
1322
1323 *topic_offset_ptr = offset + 2;
1324 *topic_length_ptr = (USHORT)topic_length;
1325
1326 remaining_length = remaining_length - topic_length - 2;
1327 if ((QoS == 1) || (QoS == 2))
1328 {
1329 offset += 2 + 2 + topic_length;
1330
1331 if (remaining_length < 2)
1332 {
1333 return(NXD_MQTT_INVALID_PACKET);
1334 }
1335 remaining_length = remaining_length - 2;
1336 }
1337 else
1338 {
1339 offset += 2 + topic_length;
1340 }
1341
1342 *message_offset_ptr = offset;
1343 *message_length_ptr = (ULONG)remaining_length;
1344
1345 /* Return */
1346 return(NXD_MQTT_SUCCESS);
1347 }
1348 /**************************************************************************/
1349 /* */
1350 /* FUNCTION RELEASE */
1351 /* */
1352 /* _nxd_mqtt_process_publish PORTABLE C */
1353 /* 6.3.0 */
1354 /* AUTHOR */
1355 /* */
1356 /* Yuxin Zhou, Microsoft Corporation */
1357 /* */
1358 /* DESCRIPTION */
1359 /* */
1360 /* This internal function process a publish message from the broker. */
1361 /* */
1362 /* */
1363 /* INPUT */
1364 /* */
1365 /* client_ptr Pointer to MQTT Client */
1366 /* packet_ptr Pointer to the packet */
1367 /* */
1368 /* OUTPUT */
1369 /* */
1370 /* NX_TRUE - packet is consumed */
1371 /* NX_FALSE - packet is not consumed */
1372 /* */
1373 /* CALLS */
1374 /* */
1375 /* [receive_notify] User supplied receive */
1376 /* callback function */
1377 /* _nxd_mqtt_client_packet_allocate */
1378 /* _nxd_mqtt_packet_send */
1379 /* nx_packet_release */
1380 /* nx_secure_tls_session_send */
1381 /* _nxd_mqtt_process_publish_packet */
1382 /* _nxd_mqtt_copy_transmit_packet */
1383 /* */
1384 /* */
1385 /* CALLED BY */
1386 /* */
1387 /* _nxd_mqtt_packet_receive_process */
1388 /* */
1389 /* RELEASE HISTORY */
1390 /* */
1391 /* DATE NAME DESCRIPTION */
1392 /* */
1393 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1394 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1395 /* resulting in version 6.1 */
1396 /* 10-31-2022 Bo Chen Modified comment(s), improved */
1397 /* the logic of sending packet,*/
1398 /* resulting in version 6.2.0 */
1399 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
1400 /* internal logic, */
1401 /* resulting in version 6.3.0 */
1402 /* */
1403 /**************************************************************************/
_nxd_mqtt_process_publish(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1404 static UINT _nxd_mqtt_process_publish(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1405 {
1406 MQTT_PACKET_PUBLISH_RESPONSE *pubresp_ptr;
1407 UINT status;
1408 USHORT packet_id = 0;
1409 UCHAR QoS;
1410 UINT enqueue_message = 0;
1411 NX_PACKET *transmit_packet_ptr;
1412 UINT remaining_length = 0;
1413 UINT packet_consumed = NX_FALSE;
1414 UCHAR fixed_header;
1415 USHORT transmit_packet_id;
1416 UINT topic_length;
1417 ULONG offset;
1418 UCHAR bytes[2];
1419 ULONG bytes_copied;
1420
1421 QoS = (UCHAR)((*(packet_ptr -> nx_packet_prepend_ptr) & MQTT_PUBLISH_QOS_LEVEL_FIELD) >> 1);
1422
1423 if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1424 {
1425 return(NX_FALSE);
1426 }
1427
1428 if (remaining_length < 2)
1429 {
1430 return(NXD_MQTT_INVALID_PACKET);
1431 }
1432
1433 /* Get topic length fields. */
1434 if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1435 (bytes_copied != sizeof(bytes)))
1436 {
1437 return(NXD_MQTT_INVALID_PACKET);
1438 }
1439
1440 topic_length = (UINT)(*(bytes) << 8) | (*(bytes + 1));
1441
1442 if (topic_length > remaining_length - 2u)
1443 {
1444 return(NXD_MQTT_INVALID_PACKET);
1445 }
1446
1447 if (QoS == 0)
1448 {
1449 enqueue_message = 1;
1450 }
1451 else
1452 {
1453 /* QoS 1 or QoS 2 messages. */
1454 /* Get packet id fields. */
1455 if (nx_packet_data_extract_offset(packet_ptr, offset + 2 + topic_length, &bytes, sizeof(bytes), &bytes_copied))
1456 {
1457 return(NXD_MQTT_INVALID_PACKET);
1458 }
1459
1460 packet_id = (USHORT)(((*bytes) << 8) | (*(bytes + 1)));
1461
1462 /* Look for an existing transmit packets with the same packet id */
1463 transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1464
1465 while (transmit_packet_ptr)
1466 {
1467 fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1468 transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1469 if ((transmit_packet_id == packet_id) &&
1470 ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_PUBREC << 4)))
1471 {
1472
1473 /* Found a packet containing the packet_id */
1474 break;
1475 }
1476 transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1477 }
1478
1479 if (transmit_packet_ptr)
1480 {
1481 /* This published data is already in our system. No need to deliver this message to the application. */
1482 enqueue_message = 0;
1483 }
1484 else
1485 {
1486 enqueue_message = 1;
1487 }
1488 }
1489
1490 if (enqueue_message)
1491 {
1492 if (packet_ptr -> nx_packet_length > (offset + remaining_length))
1493 {
1494
1495 /* This packet contains multiple messages. */
1496 if (nx_packet_copy(packet_ptr, &packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_NO_WAIT))
1497 {
1498
1499 /* No packet is available. */
1500 return(NX_FALSE);
1501 }
1502 }
1503 else
1504 {
1505 packet_consumed = NX_TRUE;
1506 }
1507
1508 /* Increment the queue depth counter. */
1509 client_ptr -> message_receive_queue_depth++;
1510
1511 if (client_ptr -> message_receive_queue_head == NX_NULL)
1512 {
1513 client_ptr -> message_receive_queue_head = packet_ptr;
1514 }
1515 else
1516 {
1517 client_ptr -> message_receive_queue_tail -> nx_packet_queue_next = packet_ptr;
1518 }
1519 client_ptr -> message_receive_queue_tail = packet_ptr;
1520
1521 /* Invoke the user-defined receive notify function if it is set. */
1522 if (client_ptr -> nxd_mqtt_client_receive_notify)
1523 {
1524 (*(client_ptr -> nxd_mqtt_client_receive_notify))(client_ptr, client_ptr -> message_receive_queue_depth);
1525 }
1526 }
1527
1528 /* If the message QoS level is 0, we are done. */
1529 if (QoS == 0)
1530 {
1531 /* Return */
1532 return(packet_consumed);
1533 }
1534
1535 /* Send out proper ACKs for QoS 1 and 2 messages. */
1536 /* Allocate a new packet so we can send out a response. */
1537 status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
1538 if (status)
1539 {
1540 /* Packet allocation fails. */
1541 return(packet_consumed);
1542 }
1543
1544 /* Fill in the packet ID */
1545 pubresp_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)(packet_ptr -> nx_packet_prepend_ptr);
1546 pubresp_ptr -> mqtt_publish_response_packet_remaining_length = 2;
1547 pubresp_ptr -> mqtt_publish_response_packet_packet_identifier_msb = (UCHAR)(packet_id >> 8);
1548 pubresp_ptr -> mqtt_publish_response_packet_packet_identifier_lsb = (UCHAR)(packet_id & 0xFF);
1549
1550 if (QoS == 1)
1551 {
1552
1553 pubresp_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBACK << 4;
1554 }
1555 else
1556 {
1557 pubresp_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBREC << 4;
1558 }
1559
1560 packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + sizeof(MQTT_PACKET_PUBLISH_RESPONSE);
1561 packet_ptr -> nx_packet_length = sizeof(MQTT_PACKET_PUBLISH_RESPONSE);
1562
1563 if (QoS == 2)
1564 {
1565
1566 /* Copy packet for checking duplicate publish packet. */
1567 if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
1568 packet_id, NX_FALSE, NX_WAIT_FOREVER))
1569 {
1570
1571 /* Release the packet. */
1572 nx_packet_release(packet_ptr);
1573 return(packet_consumed);
1574 }
1575 if (client_ptr -> message_transmit_queue_head == NX_NULL)
1576 {
1577 client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
1578 }
1579 else
1580 {
1581 client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
1582 }
1583 client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
1584 }
1585
1586 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1587
1588 /* Send packet to server. */
1589 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
1590
1591 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
1592 if (status)
1593 {
1594
1595 /* Release the packet. */
1596 nx_packet_release(packet_ptr);
1597 }
1598 else
1599 {
1600 /* Update the timeout value. */
1601 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
1602 }
1603
1604 /* Return */
1605 return(packet_consumed);
1606 }
1607
1608 /**************************************************************************/
1609 /* */
1610 /* FUNCTION RELEASE */
1611 /* */
1612 /* _nxd_mqtt_process_publish_response PORTABLE C */
1613 /* 6.3.0 */
1614 /* AUTHOR */
1615 /* */
1616 /* Yuxin Zhou, Microsoft Corporation */
1617 /* */
1618 /* DESCRIPTION */
1619 /* */
1620 /* This internal function process a publish response messages. */
1621 /* Publish Response messages are: PUBACK, PUBREC, PUBREL */
1622 /* */
1623 /* INPUT */
1624 /* */
1625 /* client_ptr Pointer to MQTT Client */
1626 /* packet_ptr Pointer to the packet */
1627 /* */
1628 /* OUTPUT */
1629 /* */
1630 /* Status */
1631 /* */
1632 /* CALLS */
1633 /* */
1634 /* [nxd_mqtt_client_receive_notify] User supplied publish */
1635 /* callback function */
1636 /* _nxd_mqtt_release_transmit_packet */
1637 /* _nxd_mqtt_packet_send */
1638 /* _nxd_mqtt_client_packet_allocate */
1639 /* */
1640 /* */
1641 /* CALLED BY */
1642 /* */
1643 /* _nxd_mqtt_packet_receive_process */
1644 /* */
1645 /* RELEASE HISTORY */
1646 /* */
1647 /* DATE NAME DESCRIPTION */
1648 /* */
1649 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1650 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
1651 /* added ack receive notify, */
1652 /* resulting in version 6.1 */
1653 /* 10-31-2022 Bo Chen Modified comment(s), improved */
1654 /* the logic of sending packet,*/
1655 /* resulting in version 6.2.0 */
1656 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
1657 /* internal logic, */
1658 /* resulting in version 6.3.0 */
1659 /* */
1660 /**************************************************************************/
_nxd_mqtt_process_publish_response(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1661 static UINT _nxd_mqtt_process_publish_response(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1662 {
1663 MQTT_PACKET_PUBLISH_RESPONSE *response_ptr;
1664 USHORT packet_id;
1665 NX_PACKET *previous_packet_ptr;
1666 NX_PACKET *transmit_packet_ptr;
1667 NX_PACKET *response_packet;
1668 UINT ret;
1669 UCHAR fixed_header;
1670 USHORT transmit_packet_id;
1671
1672 response_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)(packet_ptr -> nx_packet_prepend_ptr);
1673
1674 /* Validate the packet. */
1675 if (response_ptr -> mqtt_publish_response_packet_remaining_length != 2)
1676 {
1677 /* Invalid remaining_length value. Return 1 so the caller can release
1678 the packet. */
1679
1680 return(1);
1681 }
1682
1683 packet_id = (USHORT)((response_ptr -> mqtt_publish_response_packet_packet_identifier_msb << 8) |
1684 (response_ptr -> mqtt_publish_response_packet_packet_identifier_lsb));
1685
1686 /* Search all the outstanding transmitted packets for a match. */
1687 previous_packet_ptr = NX_NULL;
1688 transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1689 while (transmit_packet_ptr)
1690 {
1691 fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1692 transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1693 if (transmit_packet_id == packet_id)
1694 {
1695
1696 /* Found the matching packet id */
1697 if (((response_ptr -> mqtt_publish_response_packet_header) >> 4) == MQTT_CONTROL_PACKET_TYPE_PUBACK)
1698 {
1699
1700 /* PUBACK is the response to a PUBLISH packet with QoS Level 1*/
1701 /* Therefore we verify that packet contains PUBLISH packet with QoS level 1*/
1702 if ((fixed_header & 0xF6) == ((MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4) | MQTT_PUBLISH_QOS_LEVEL_1))
1703 {
1704
1705 /* Check ack notify function. */
1706 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1707 {
1708
1709 /* Call notify function. Note: user routine should not release the packet. */
1710 client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_PUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1711 }
1712
1713 /* QoS Level1 message receives an ACK. */
1714 /* This message can be released. */
1715 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1716
1717 /* Return with value 1, so the caller will release packet_ptr */
1718 return(1);
1719 }
1720 }
1721 else if (((response_ptr -> mqtt_publish_response_packet_header) >> 4) == MQTT_CONTROL_PACKET_TYPE_PUBREL)
1722 {
1723
1724 /* QoS 2 publish Release received, part 2. */
1725 /* Therefore we verify that packet contains PUBLISH packet with QoS level 2*/
1726 if ((fixed_header & 0xF6) == (MQTT_CONTROL_PACKET_TYPE_PUBREC << 4))
1727 {
1728
1729 /* QoS Level2 message receives an ACK. */
1730 /* This message can be released. */
1731 /* Send PUBCOMP */
1732
1733 /* Allocate a packet to send the response. */
1734 ret = _nxd_mqtt_client_packet_allocate(client_ptr, &response_packet, NX_WAIT_FOREVER);
1735 if (ret)
1736 {
1737 return(1);
1738 }
1739
1740 if (4u > ((ULONG)(response_packet -> nx_packet_data_end) - (ULONG)(response_packet -> nx_packet_append_ptr)))
1741 {
1742 nx_packet_release(response_packet);
1743
1744 /* Packet buffer is too small to hold the message. */
1745 return(NX_SIZE_ERROR);
1746 }
1747
1748 response_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)response_packet -> nx_packet_prepend_ptr;
1749
1750 response_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBCOMP << 4;
1751 response_ptr -> mqtt_publish_response_packet_remaining_length = 2;
1752
1753 /* Fill in packet ID */
1754 response_packet -> nx_packet_prepend_ptr[3] = packet_ptr -> nx_packet_prepend_ptr[3];
1755 response_packet -> nx_packet_prepend_ptr[4] = packet_ptr -> nx_packet_prepend_ptr[4];
1756 response_packet -> nx_packet_append_ptr = response_packet -> nx_packet_prepend_ptr + 4;
1757 response_packet -> nx_packet_length = 4;
1758
1759 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1760
1761 /* Send packet to server. */
1762 ret = _nxd_mqtt_packet_send(client_ptr, response_packet, NX_WAIT_FOREVER);
1763
1764 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
1765
1766 /* Update the timeout value. */
1767 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
1768
1769 if (ret)
1770 {
1771 nx_packet_release(response_packet);
1772 }
1773
1774 /* Check ack notify function. */
1775 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1776 {
1777
1778 /* Call notify function. Note: user routine should not release the packet. */
1779 client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_PUBREL, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1780 }
1781
1782 /* This packet can be released. */
1783 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1784
1785 /* Return with value 1, so the caller will release packet_ptr */
1786 return(1);
1787 }
1788 }
1789 }
1790
1791 /* Move on to the next packet */
1792 previous_packet_ptr = transmit_packet_ptr;
1793 transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1794 }
1795
1796 /* nothing is found. Return 1 to release the packet.*/
1797 return(1);
1798 }
1799
1800 /**************************************************************************/
1801 /* */
1802 /* FUNCTION RELEASE */
1803 /* */
1804 /* _nxd_mqtt_process_sub_unsub_ack PORTABLE C */
1805 /* 6.1 */
1806 /* AUTHOR */
1807 /* */
1808 /* Yuxin Zhou, Microsoft Corporation */
1809 /* */
1810 /* DESCRIPTION */
1811 /* */
1812 /* This internal function process an ACK message for subscribe */
1813 /* or unsubscribe request. */
1814 /* */
1815 /* INPUT */
1816 /* */
1817 /* client_ptr Pointer to MQTT Client */
1818 /* packet_ptr Pointer to the packet */
1819 /* */
1820 /* OUTPUT */
1821 /* */
1822 /* Status */
1823 /* */
1824 /* CALLS */
1825 /* */
1826 /* [nxd_mqtt_client_receive_notify] User supplied publish */
1827 /* callback function */
1828 /* _nxd_mqtt_release_transmit_packet */
1829 /* Release the memory block */
1830 /* _nxd_mqtt_read_remaining_length Skip the remaining length */
1831 /* field */
1832 /* */
1833 /* CALLED BY */
1834 /* */
1835 /* _nxd_mqtt_packet_receive_process */
1836 /* */
1837 /* RELEASE HISTORY */
1838 /* */
1839 /* DATE NAME DESCRIPTION */
1840 /* */
1841 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1842 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
1843 /* added ack receive notify, */
1844 /* resulting in version 6.1 */
1845 /* */
1846 /**************************************************************************/
_nxd_mqtt_process_sub_unsub_ack(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1847 static UINT _nxd_mqtt_process_sub_unsub_ack(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1848 {
1849
1850 USHORT packet_id;
1851 NX_PACKET *previous_packet_ptr;
1852 NX_PACKET *transmit_packet_ptr;
1853 UCHAR response_header;
1854 UCHAR fixed_header;
1855 USHORT transmit_packet_id;
1856 UINT remaining_length;
1857 ULONG offset;
1858 UCHAR bytes[2];
1859 ULONG bytes_copied;
1860
1861
1862 response_header = *(packet_ptr -> nx_packet_prepend_ptr);
1863
1864 if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1865 {
1866
1867 /* Unable to process the sub/unsub ack. Simply return and release the packet. */
1868 return(NXD_MQTT_INVALID_PACKET);
1869 }
1870
1871 /* Get packet id fields. */
1872 if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1873 (bytes_copied != sizeof(bytes)))
1874 {
1875 return(NXD_MQTT_INVALID_PACKET);
1876 }
1877
1878 packet_id = (USHORT)(((*bytes) << 8) | (*(bytes + 1)));
1879
1880 /* Search all the outstanding transmitted packets for a match. */
1881 previous_packet_ptr = NX_NULL;
1882 transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1883 while (transmit_packet_ptr)
1884 {
1885 fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1886 transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1887 if (transmit_packet_id == packet_id)
1888 {
1889
1890 /* Found the matching packet id */
1891 if (((response_header >> 4) == MQTT_CONTROL_PACKET_TYPE_SUBACK) &&
1892 ((fixed_header >> 4) == MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE))
1893 {
1894 /* Validate the packet. */
1895 if (remaining_length != 3)
1896 {
1897 /* Invalid remaining_length value. */
1898 return(1);
1899 }
1900
1901 /* Check ack notify function. */
1902 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1903 {
1904
1905 /* Call notify function. Note: user routine should not release the packet. */
1906 client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_SUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1907 }
1908
1909 /* Release the transmit packet. */
1910 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1911
1912 return(1);
1913 }
1914 else if (((response_header >> 4) == MQTT_CONTROL_PACKET_TYPE_UNSUBACK) &&
1915 ((fixed_header >> 4) == MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE))
1916 {
1917 /* Validate the packet. */
1918 if (remaining_length != 2)
1919 {
1920 /* Invalid remaining_length value. */
1921 return(1);
1922 }
1923
1924 /* Check ack notify function. */
1925 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1926 {
1927
1928 /* Call notify function. Note: user routine should not release the packet. */
1929 client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_UNSUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1930 }
1931
1932 /* Unsubscribe succeeded. */
1933 /* Release the transmit packet. */
1934 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1935
1936 return(1);
1937 }
1938 }
1939
1940 /* Move on to the next packet */
1941 previous_packet_ptr = transmit_packet_ptr;
1942 transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1943 }
1944 return(1);
1945 }
1946
1947
1948 /**************************************************************************/
1949 /* */
1950 /* FUNCTION RELEASE */
1951 /* */
1952 /* _nxd_mqtt_process_pingresp PORTABLE C */
1953 /* 6.1 */
1954 /* AUTHOR */
1955 /* */
1956 /* Yuxin Zhou, Microsoft Corporation */
1957 /* */
1958 /* DESCRIPTION */
1959 /* */
1960 /* This internal function process a PINGRESP message. */
1961 /* */
1962 /* INPUT */
1963 /* */
1964 /* client_ptr Pointer to MQTT Client */
1965 /* packet_ptr Pointer to the packet */
1966 /* */
1967 /* OUTPUT */
1968 /* */
1969 /* Status */
1970 /* */
1971 /* CALLS */
1972 /* */
1973 /* None */
1974 /* callback function */
1975 /* */
1976 /* CALLED BY */
1977 /* */
1978 /* _nxd_mqtt_packet_receive_process */
1979 /* */
1980 /* RELEASE HISTORY */
1981 /* */
1982 /* DATE NAME DESCRIPTION */
1983 /* */
1984 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1985 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1986 /* resulting in version 6.1 */
1987 /* */
1988 /**************************************************************************/
_nxd_mqtt_process_pingresp(NXD_MQTT_CLIENT * client_ptr)1989 static VOID _nxd_mqtt_process_pingresp(NXD_MQTT_CLIENT *client_ptr)
1990 {
1991
1992
1993 /* If there is an outstanding ping, mark it as responded. */
1994 if (client_ptr -> nxd_mqtt_ping_not_responded == NX_TRUE)
1995 {
1996 client_ptr -> nxd_mqtt_ping_not_responded = NX_FALSE;
1997
1998 client_ptr -> nxd_mqtt_ping_sent_time = 0;
1999 }
2000
2001 return;
2002 }
2003
2004
2005 /**************************************************************************/
2006 /* */
2007 /* FUNCTION RELEASE */
2008 /* */
2009 /* _nxd_mqtt_process_disconnect PORTABLE C */
2010 /* 6.1 */
2011 /* AUTHOR */
2012 /* */
2013 /* Yuxin Zhou, Microsoft Corporation */
2014 /* */
2015 /* DESCRIPTION */
2016 /* */
2017 /* This internal function process a DISCONNECT message. */
2018 /* */
2019 /* INPUT */
2020 /* */
2021 /* client_ptr Pointer to MQTT Client */
2022 /* packet_ptr Pointer to the packet */
2023 /* */
2024 /* OUTPUT */
2025 /* */
2026 /* None */
2027 /* */
2028 /* CALLS */
2029 /* */
2030 /* nx_secure_tls_session_send */
2031 /* nx_tcp_socket_disconnect */
2032 /* nx_tcp_client_socket_unbind */
2033 /* _nxd_mqtt_release_transmit_packet */
2034 /* _nxd_mqtt_release_receive_packet */
2035 /* _nxd_mqtt_client_connection_end */
2036 /* */
2037 /* CALLED BY */
2038 /* */
2039 /* _nxd_mqtt_packet_receive_process */
2040 /* */
2041 /* RELEASE HISTORY */
2042 /* */
2043 /* DATE NAME DESCRIPTION */
2044 /* */
2045 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2046 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2047 /* resulting in version 6.1 */
2048 /* */
2049 /**************************************************************************/
_nxd_mqtt_process_disconnect(NXD_MQTT_CLIENT * client_ptr)2050 static VOID _nxd_mqtt_process_disconnect(NXD_MQTT_CLIENT *client_ptr)
2051 {
2052 NX_PACKET *previous = NX_NULL;
2053 NX_PACKET *current;
2054 NX_PACKET *next;
2055 UINT disconnect_callback = NX_FALSE;
2056 UINT status;
2057 UCHAR fixed_header;
2058
2059 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2060 {
2061 /* State changes from CONNECTED TO IDLE. Call disconnect notify callback
2062 if the function is set. */
2063 disconnect_callback = NX_TRUE;
2064 }
2065 else if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTING)
2066 {
2067
2068 /* If state isn't CONNECTED or CONNECTING, just return. */
2069 return;
2070 }
2071
2072 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2073
2074 /* End connection. */
2075 _nxd_mqtt_client_connection_end(client_ptr, NXD_MQTT_SOCKET_TIMEOUT);
2076
2077 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
2078
2079 /* Free up sub/unsub packets on the transmit queue. */
2080 current = client_ptr -> message_transmit_queue_head;
2081
2082 while (current)
2083 {
2084 next = current -> nx_packet_queue_next;
2085 fixed_header = *(current -> nx_packet_prepend_ptr);
2086
2087 if (((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4)) ||
2088 ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE << 4)))
2089 {
2090 _nxd_mqtt_release_transmit_packet(client_ptr, current, previous);
2091 }
2092 else
2093 {
2094 previous = current;
2095 }
2096 current = next;
2097 }
2098
2099 /* If a callback notification is defined, call it now. */
2100 if ((disconnect_callback == NX_TRUE) && (client_ptr -> nxd_mqtt_disconnect_notify))
2101 {
2102 client_ptr -> nxd_mqtt_disconnect_notify(client_ptr);
2103 }
2104
2105 /* If a connect callback notification is defined and is still in connecting stage, call it now. */
2106 if ((disconnect_callback == NX_FALSE) && (client_ptr -> nxd_mqtt_connect_notify))
2107 {
2108 client_ptr -> nxd_mqtt_connect_notify(client_ptr, NXD_MQTT_CONNECT_FAILURE, client_ptr -> nxd_mqtt_connect_context);
2109 }
2110
2111 if (status == TX_SUCCESS)
2112 {
2113 /* Remove all the packets in the receive queue. */
2114 while (client_ptr -> message_receive_queue_head)
2115 {
2116 _nxd_mqtt_release_receive_packet(client_ptr, client_ptr -> message_receive_queue_head, NX_NULL);
2117 }
2118 client_ptr -> message_receive_queue_depth = 0;
2119
2120 /* Clear the MQTT_PACKET_RECEIVE_EVENT */
2121 #ifndef NXD_MQTT_CLOUD_ENABLE
2122 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, ~MQTT_PACKET_RECEIVE_EVENT, TX_AND);
2123 #else
2124 nx_cloud_module_event_clear(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PACKET_RECEIVE_EVENT);
2125 #endif /* NXD_MQTT_CLOUD_ENABLE */
2126 }
2127
2128 /* Clear flags if keep alive is enabled. */
2129 if (client_ptr -> nxd_mqtt_keepalive)
2130 {
2131 client_ptr -> nxd_mqtt_ping_not_responded = 0;
2132 client_ptr -> nxd_mqtt_ping_sent_time = 0;
2133 }
2134
2135 /* Clean up the information when disconnecting. */
2136 client_ptr -> nxd_mqtt_client_username = NX_NULL;
2137 client_ptr -> nxd_mqtt_client_password = NX_NULL;
2138 client_ptr -> nxd_mqtt_client_will_topic = NX_NULL;
2139 client_ptr -> nxd_mqtt_client_will_message = NX_NULL;
2140 client_ptr -> nxd_mqtt_client_will_qos_retain = 0;
2141
2142 /* Release current processing packet. */
2143 if (client_ptr -> nxd_mqtt_client_processing_packet)
2144 {
2145 nx_packet_release(client_ptr -> nxd_mqtt_client_processing_packet);
2146 client_ptr -> nxd_mqtt_client_processing_packet = NX_NULL;
2147 }
2148
2149 return;
2150 }
2151
2152
2153 /**************************************************************************/
2154 /* */
2155 /* FUNCTION RELEASE */
2156 /* */
2157 /* _nxd_mqtt_packet_receive_process PORTABLE C */
2158 /* 6.2.0 */
2159 /* AUTHOR */
2160 /* */
2161 /* Yuxin Zhou, Microsoft Corporation */
2162 /* */
2163 /* DESCRIPTION */
2164 /* */
2165 /* This internal function process MQTT message. */
2166 /* NOTE: MQTT Mutex is NOT obtained on entering this function. */
2167 /* Therefore it shouldn't hold the mutex when it exists this function. */
2168 /* */
2169 /* INPUT */
2170 /* */
2171 /* client_ptr Pointer to MQTT Client */
2172 /* */
2173 /* OUTPUT */
2174 /* */
2175 /* None */
2176 /* */
2177 /* CALLS */
2178 /* */
2179 /* nx_secure_tls_session_receive */
2180 /* nx_tcp_socket_receive */
2181 /* _nxd_mqtt_process_publish */
2182 /* _nxd_mqtt_process_publish_response */
2183 /* _nxd_mqtt_process_sub_unsub_ack */
2184 /* _nxd_mqtt_process_pingresp */
2185 /* _nxd_mqtt_process_disconnect */
2186 /* nx_packet_release */
2187 /* */
2188 /* CALLED BY */
2189 /* */
2190 /* _nxd_mqtt_client_event_process */
2191 /* */
2192 /* RELEASE HISTORY */
2193 /* */
2194 /* DATE NAME DESCRIPTION */
2195 /* */
2196 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2197 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2198 /* resulting in version 6.1 */
2199 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
2200 /* mqtt over websocket, */
2201 /* resulting in version 6.2.0 */
2202 /* */
2203 /**************************************************************************/
_nxd_mqtt_packet_receive_process(NXD_MQTT_CLIENT * client_ptr)2204 static VOID _nxd_mqtt_packet_receive_process(NXD_MQTT_CLIENT *client_ptr)
2205 {
2206 NX_PACKET *packet_ptr;
2207 NX_PACKET *previous_packet_ptr;
2208 UINT status;
2209 UCHAR packet_type;
2210 UINT remaining_length;
2211 UINT packet_consumed;
2212 ULONG offset;
2213 ULONG bytes_copied;
2214 ULONG packet_length;
2215
2216 for (;;)
2217 {
2218
2219 /* Release the mutex. */
2220 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2221
2222 /* Make a receive call. */
2223 status = _nxd_mqtt_packet_receive(client_ptr, &packet_ptr, NX_NO_WAIT);
2224
2225 if (status != NX_SUCCESS)
2226 {
2227 if ((status != NX_NO_PACKET) && (status != NX_CONTINUE))
2228 {
2229
2230 /* Network issue. Close the MQTT session. */
2231 #ifndef NXD_MQTT_CLOUD_ENABLE
2232 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_NETWORK_DISCONNECT_EVENT, TX_OR);
2233 #else
2234 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_NETWORK_DISCONNECT_EVENT);
2235 #endif /* NXD_MQTT_CLOUD_ENABLE */
2236 }
2237
2238 break;
2239 }
2240
2241 /* Obtain the mutex. */
2242 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
2243
2244 /* Is there a packet waiting for processing? */
2245 if (client_ptr -> nxd_mqtt_client_processing_packet)
2246 {
2247
2248 /* Yes. Link received packet to existing one. */
2249 if (client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last)
2250 {
2251 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last -> nx_packet_next = packet_ptr;
2252 }
2253 else
2254 {
2255 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_next = packet_ptr;
2256 }
2257 if (packet_ptr -> nx_packet_last)
2258 {
2259 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last = packet_ptr -> nx_packet_last;
2260 }
2261 else
2262 {
2263 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last = packet_ptr;
2264 }
2265 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_length += packet_ptr -> nx_packet_length;
2266
2267 /* Start to process existing packet. */
2268 packet_ptr = client_ptr -> nxd_mqtt_client_processing_packet;
2269 client_ptr -> nxd_mqtt_client_processing_packet = NX_NULL;
2270 }
2271
2272 /* Check notify function. */
2273 if (client_ptr -> nxd_mqtt_packet_receive_notify)
2274 {
2275
2276 /* Call notify function. Return NX_TRUE if the packet has been consumed. */
2277 if (client_ptr -> nxd_mqtt_packet_receive_notify(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_packet_receive_context) == NX_TRUE)
2278 {
2279 continue;
2280 }
2281 }
2282
2283 packet_consumed = NX_FALSE;
2284 while (packet_ptr)
2285 {
2286 /* Parse the incoming packet. */
2287 status = _nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset);
2288 if (status == NXD_MQTT_PARTIAL_PACKET)
2289 {
2290
2291 /* We only have partial MQTT message.
2292 * Put it to waiting list for more packets. */
2293 client_ptr -> nxd_mqtt_client_processing_packet = packet_ptr;
2294 packet_consumed = NX_TRUE;
2295 break;
2296 }
2297 else if (status)
2298 {
2299
2300 /* Invalid packet. */
2301 break;
2302 }
2303
2304 /* Get packet type. */
2305 if (nx_packet_data_extract_offset(packet_ptr, 0, &packet_type, 1, &bytes_copied))
2306 {
2307
2308 /* Unable to read packet type. */
2309 break;
2310 }
2311
2312 /* Right shift 4 bits to get the packet type. */
2313 packet_type = packet_type >> 4;
2314
2315 /* Process based on packet type. */
2316 switch (packet_type)
2317 {
2318 case MQTT_CONTROL_PACKET_TYPE_CONNECT:
2319 /* Client does not accept connections. Nothing needs to be done. */
2320 break;
2321 case MQTT_CONTROL_PACKET_TYPE_CONNACK:
2322 _nxd_mqtt_process_connack(client_ptr, packet_ptr, NX_NO_WAIT);
2323 break;
2324
2325 case MQTT_CONTROL_PACKET_TYPE_PUBLISH:
2326 packet_consumed = _nxd_mqtt_process_publish(client_ptr, packet_ptr);
2327 break;
2328
2329 case MQTT_CONTROL_PACKET_TYPE_PUBACK:
2330 case MQTT_CONTROL_PACKET_TYPE_PUBREL:
2331 _nxd_mqtt_process_publish_response(client_ptr, packet_ptr);
2332 break;
2333
2334 case MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE:
2335 case MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE:
2336 /* Client should not process subscribe or unsubscribe message. */
2337 break;
2338
2339 case MQTT_CONTROL_PACKET_TYPE_SUBACK:
2340 case MQTT_CONTROL_PACKET_TYPE_UNSUBACK:
2341 _nxd_mqtt_process_sub_unsub_ack(client_ptr, packet_ptr);
2342 break;
2343
2344
2345 case MQTT_CONTROL_PACKET_TYPE_PINGREQ:
2346 /* Client is not supposed to receive ping req. Ignore it. */
2347 break;
2348
2349 case MQTT_CONTROL_PACKET_TYPE_PINGRESP:
2350 _nxd_mqtt_process_pingresp(client_ptr);
2351 break;
2352
2353 case MQTT_CONTROL_PACKET_TYPE_DISCONNECT:
2354 _nxd_mqtt_process_disconnect(client_ptr);
2355 break;
2356
2357 /* Publisher sender message type for QoS 2. Not supported. */
2358 case MQTT_CONTROL_PACKET_TYPE_PUBREC:
2359 case MQTT_CONTROL_PACKET_TYPE_PUBCOMP:
2360 default:
2361 /* Unknown type. */
2362 break;
2363 }
2364
2365 if (packet_consumed)
2366 {
2367 break;
2368 }
2369
2370 /* Trim current packet. */
2371 offset += remaining_length;
2372 packet_length = packet_ptr -> nx_packet_length;
2373 if (packet_length > offset)
2374 {
2375
2376 /* Multiple MQTT message in one packet. */
2377 packet_length = packet_ptr -> nx_packet_length - offset;
2378 while ((ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr) <= offset)
2379 {
2380 offset -= (ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr);
2381
2382 /* Current packet can be released. */
2383 previous_packet_ptr = packet_ptr;
2384 packet_ptr = packet_ptr -> nx_packet_next;
2385 previous_packet_ptr -> nx_packet_next = NX_NULL;
2386 nx_packet_release(previous_packet_ptr);
2387 if (packet_ptr == NX_NULL)
2388 {
2389
2390 /* Invalid packet. */
2391 break;
2392 }
2393 }
2394
2395 if (packet_ptr)
2396 {
2397
2398 /* Adjust current packet. */
2399 packet_ptr -> nx_packet_prepend_ptr = packet_ptr -> nx_packet_prepend_ptr + offset;
2400 packet_ptr -> nx_packet_length = packet_length;
2401 }
2402 }
2403 else
2404 {
2405
2406 /* All messages in current packet is processed. */
2407 break;
2408 }
2409 }
2410
2411 if (!packet_consumed)
2412 {
2413 nx_packet_release(packet_ptr);
2414 }
2415 }
2416
2417 /* No more data in the receive queue. Return. */
2418
2419 return;
2420 }
2421
2422
2423 /**************************************************************************/
2424 /* */
2425 /* FUNCTION RELEASE */
2426 /* */
2427 /* _nxd_mqtt_tcp_establish_process PORTABLE C */
2428 /* 6.2.0 */
2429 /* AUTHOR */
2430 /* */
2431 /* Yuxin Zhou, Microsoft Corporation */
2432 /* */
2433 /* DESCRIPTION */
2434 /* */
2435 /* This function processes MQTT TCP connection establish event. */
2436 /* */
2437 /* INPUT */
2438 /* */
2439 /* client_ptr Pointer to MQTT Client */
2440 /* */
2441 /* OUTPUT */
2442 /* */
2443 /* None */
2444 /* */
2445 /* CALLS */
2446 /* */
2447 /* nx_secure_tls_session_start */
2448 /* _nxd_mqtt_client_connection_end */
2449 /* _nxd_mqtt_client_connect_packet_send */
2450 /* */
2451 /* CALLED BY */
2452 /* */
2453 /* _nxd_mqtt_client_event_process */
2454 /* */
2455 /* RELEASE HISTORY */
2456 /* */
2457 /* DATE NAME DESCRIPTION */
2458 /* */
2459 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2460 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2461 /* resulting in version 6.1 */
2462 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
2463 /* mqtt over websocket, */
2464 /* resulting in version 6.2.0 */
2465 /* */
2466 /**************************************************************************/
_nxd_mqtt_tcp_establish_process(NXD_MQTT_CLIENT * client_ptr)2467 static VOID _nxd_mqtt_tcp_establish_process(NXD_MQTT_CLIENT *client_ptr)
2468 {
2469 UINT status;
2470
2471
2472 /* TCP connection is established. */
2473
2474 /* If TLS is enabled, start TLS */
2475 #ifdef NX_SECURE_ENABLE
2476 if (client_ptr -> nxd_mqtt_client_use_tls)
2477 {
2478 status = nx_secure_tls_session_start(&(client_ptr -> nxd_mqtt_tls_session), &(client_ptr -> nxd_mqtt_client_socket), NX_NO_WAIT);
2479
2480 if (status != NX_CONTINUE)
2481 {
2482
2483 /* End connection. */
2484 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2485
2486 /* Check callback function. */
2487 if (client_ptr -> nxd_mqtt_connect_notify)
2488 {
2489 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2490 }
2491
2492 return;
2493 }
2494
2495 /* TLS in progress. */
2496 client_ptr -> nxd_mqtt_tls_in_progress = NX_TRUE;
2497
2498 return;
2499 }
2500 #endif /* NX_SECURE_ENABLE */
2501
2502 #ifdef NXD_MQTT_OVER_WEBSOCKET
2503
2504 /* If using websocket, start websocket connection. */
2505 if (client_ptr -> nxd_mqtt_client_use_websocket)
2506 {
2507 status = nx_websocket_client_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_client_socket),
2508 client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
2509 client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
2510 (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
2511 NX_NO_WAIT);
2512
2513 if (status != NX_IN_PROGRESS)
2514 {
2515
2516 /* End connection. */
2517 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2518
2519 /* Check callback function. */
2520 if (client_ptr -> nxd_mqtt_connect_notify)
2521 {
2522 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2523 }
2524
2525 return;
2526 }
2527
2528 return;
2529 }
2530 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2531
2532 /* Start to send MQTT connect packet. */
2533 status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
2534
2535 /* Check status. */
2536 if (status)
2537 {
2538
2539 /* End connection. */
2540 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2541
2542 /* Check callback function. */
2543 if (client_ptr -> nxd_mqtt_connect_notify)
2544 {
2545 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2546 }
2547 }
2548 }
2549
2550
2551 #ifdef NX_SECURE_ENABLE
2552 /**************************************************************************/
2553 /* */
2554 /* FUNCTION RELEASE */
2555 /* */
2556 /* _nxd_mqtt_tls_establish_process PORTABLE C */
2557 /* 6.2.0 */
2558 /* AUTHOR */
2559 /* */
2560 /* Yuxin Zhou, Microsoft Corporation */
2561 /* */
2562 /* DESCRIPTION */
2563 /* */
2564 /* This function processes TLS connection establish event. */
2565 /* */
2566 /* INPUT */
2567 /* */
2568 /* client_ptr Pointer to MQTT Client */
2569 /* */
2570 /* OUTPUT */
2571 /* */
2572 /* None */
2573 /* */
2574 /* CALLS */
2575 /* */
2576 /* _nx_secure_tls_handshake_process */
2577 /* _nxd_mqtt_client_connect_packet_send */
2578 /* _nxd_mqtt_client_connection_end */
2579 /* */
2580 /* CALLED BY */
2581 /* */
2582 /* _nxd_mqtt_client_event_process */
2583 /* */
2584 /* RELEASE HISTORY */
2585 /* */
2586 /* DATE NAME DESCRIPTION */
2587 /* */
2588 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2589 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2590 /* resulting in version 6.1 */
2591 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
2592 /* mqtt over websocket, */
2593 /* resulting in version 6.2.0 */
2594 /* */
2595 /**************************************************************************/
_nxd_mqtt_tls_establish_process(NXD_MQTT_CLIENT * client_ptr)2596 static VOID _nxd_mqtt_tls_establish_process(NXD_MQTT_CLIENT *client_ptr)
2597 {
2598 UINT status;
2599
2600
2601 /* Directly call handshake process for async mode. */
2602 status = _nx_secure_tls_handshake_process(&(client_ptr -> nxd_mqtt_tls_session), NX_NO_WAIT);
2603 if (status == NX_SUCCESS)
2604 {
2605
2606 /* TLS session established. */
2607 client_ptr -> nxd_mqtt_tls_in_progress = NX_FALSE;
2608
2609 #ifdef NXD_MQTT_OVER_WEBSOCKET
2610
2611 /* If using websocket, start websocket connection. */
2612 if (client_ptr -> nxd_mqtt_client_use_websocket)
2613 {
2614 status = nx_websocket_client_secure_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_tls_session),
2615 client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
2616 client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
2617 (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
2618 NX_NO_WAIT);
2619
2620 if (status == NX_IN_PROGRESS)
2621 {
2622 return;
2623 }
2624 }
2625 else
2626 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2627 {
2628
2629 /* Start to send MQTT connect packet. */
2630 status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
2631 }
2632 }
2633 else if (status == NX_CONTINUE)
2634 {
2635 return;
2636 }
2637
2638 /* Check status. */
2639 if (status)
2640 {
2641
2642 /* End connection. */
2643 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2644
2645 /* Check callback function. */
2646 if (client_ptr -> nxd_mqtt_connect_notify)
2647 {
2648 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2649 }
2650 }
2651
2652 return;
2653 }
2654 #endif /* NX_SECURE_ENABLE */
2655
2656 /**************************************************************************/
2657 /* */
2658 /* FUNCTION RELEASE */
2659 /* */
2660 /* _nxd_mqtt_client_append_message PORTABLE C */
2661 /* 6.1 */
2662 /* AUTHOR */
2663 /* */
2664 /* Yuxin Zhou, Microsoft Corporation */
2665 /* */
2666 /* DESCRIPTION */
2667 /* */
2668 /* This function writes the message length and message in the outgoing */
2669 /* MQTT packet. */
2670 /* */
2671 /* INPUT */
2672 /* */
2673 /* client_ptr Pointer to MQTT Client */
2674 /* packet_ptr Outgoing MQTT packet */
2675 /* message Pointer to the message */
2676 /* length Length of the message */
2677 /* wait_option Wait option */
2678 /* */
2679 /* OUTPUT */
2680 /* */
2681 /* status */
2682 /* */
2683 /* CALLS */
2684 /* */
2685 /* nx_packet_data_append Append packet data */
2686 /* */
2687 /* CALLED BY */
2688 /* */
2689 /* _nxd_mqtt_client_sub_unsub */
2690 /* _nxd_mqtt_client_connect */
2691 /* _nxd_mqtt_client_publish */
2692 /* */
2693 /* RELEASE HISTORY */
2694 /* */
2695 /* DATE NAME DESCRIPTION */
2696 /* */
2697 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2698 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2699 /* resulting in version 6.1 */
2700 /* */
2701 /**************************************************************************/
_nxd_mqtt_client_append_message(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,CHAR * message,UINT length,ULONG wait_option)2702 UINT _nxd_mqtt_client_append_message(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, CHAR *message, UINT length, ULONG wait_option)
2703 {
2704 UINT ret = 0;
2705 UCHAR len[2];
2706
2707 len[0] = (length >> 8) & 0xFF;
2708 len[1] = length & 0xFF;
2709
2710 /* Append message length field. */
2711 ret = nx_packet_data_append(packet_ptr, len, 2, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
2712
2713 if (ret)
2714 {
2715 return(ret);
2716 }
2717
2718 if (length)
2719 {
2720 /* Copy the string into the packet. */
2721 ret = nx_packet_data_append(packet_ptr, message, length,
2722 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
2723 }
2724
2725 return(ret);
2726 }
2727
2728
2729 /**************************************************************************/
2730 /* */
2731 /* FUNCTION RELEASE */
2732 /* */
2733 /* _nxd_mqtt_client_connection_end PORTABLE C */
2734 /* 6.2.0 */
2735 /* AUTHOR */
2736 /* */
2737 /* Yuxin Zhou, Microsoft Corporation */
2738 /* */
2739 /* DESCRIPTION */
2740 /* */
2741 /* This function is used to end the MQTT connection. */
2742 /* */
2743 /* INPUT */
2744 /* */
2745 /* client_ptr Pointer to MQTT Client */
2746 /* wait_option Wait option */
2747 /* */
2748 /* OUTPUT */
2749 /* */
2750 /* None */
2751 /* */
2752 /* CALLS */
2753 /* */
2754 /* nx_secure_tls_session_end End TLS session */
2755 /* nx_secure_tls_session_delete Delete TLS session */
2756 /* nx_tcp_socket_disconnect Close TCP connection */
2757 /* nx_tcp_client_socket_unbind Unbind TCP socket */
2758 /* tx_timer_delete Delete timer */
2759 /* */
2760 /* CALLED BY */
2761 /* */
2762 /* _nxd_mqtt_client_connect */
2763 /* _nxd_mqtt_process_disconnect */
2764 /* */
2765 /* RELEASE HISTORY */
2766 /* */
2767 /* DATE NAME DESCRIPTION */
2768 /* */
2769 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2770 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2771 /* resulting in version 6.1 */
2772 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
2773 /* mqtt over websocket, */
2774 /* resulting in version 6.2.0 */
2775 /* */
2776 /**************************************************************************/
_nxd_mqtt_client_connection_end(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)2777 VOID _nxd_mqtt_client_connection_end(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
2778 {
2779
2780 /* Obtain the mutex. */
2781 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
2782
2783 /* Mark the session as terminated. */
2784 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
2785
2786 /* Release the mutex. */
2787 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2788
2789 #ifdef NXD_MQTT_OVER_WEBSOCKET
2790 if (client_ptr -> nxd_mqtt_client_use_websocket)
2791 {
2792 nx_websocket_client_disconnect(&(client_ptr -> nxd_mqtt_client_websocket), wait_option);
2793 }
2794 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2795
2796 #ifdef NX_SECURE_ENABLE
2797 if (client_ptr -> nxd_mqtt_client_use_tls)
2798 {
2799 nx_secure_tls_session_end(&(client_ptr -> nxd_mqtt_tls_session), wait_option);
2800 nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
2801 }
2802 #endif
2803 nx_tcp_socket_disconnect(&(client_ptr -> nxd_mqtt_client_socket), wait_option);
2804 nx_tcp_client_socket_unbind(&(client_ptr -> nxd_mqtt_client_socket));
2805
2806 /* Disable timer if timer has been started. */
2807 if (client_ptr -> nxd_mqtt_keepalive)
2808 {
2809 tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
2810 }
2811 }
2812
2813
2814 static UINT _nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT *client_ptr, UCHAR header_value);
2815
2816
2817 /* MQTT internal function */
2818
2819 /**************************************************************************/
2820 /* */
2821 /* FUNCTION RELEASE */
2822 /* */
2823 /* _nxd_mqtt_periodic_timer_entry PORTABLE C */
2824 /* 6.1 */
2825 /* AUTHOR */
2826 /* */
2827 /* Yuxin Zhou, Microsoft Corporation */
2828 /* */
2829 /* DESCRIPTION */
2830 /* */
2831 /* This internal function is passed to MQTT client socket create call. */
2832 /* This callback function notifies MQTT client thread when the TCP */
2833 /* connection is lost. */
2834 /* */
2835 /* */
2836 /* INPUT */
2837 /* */
2838 /* socket_ptr Pointer to TCP socket that */
2839 /* disconnected. */
2840 /* */
2841 /* OUTPUT */
2842 /* */
2843 /* None */
2844 /* */
2845 /* CALLS */
2846 /* */
2847 /* None */
2848 /* */
2849 /* CALLED BY */
2850 /* */
2851 /* TCP socket disconnect callback */
2852 /* */
2853 /* RELEASE HISTORY */
2854 /* */
2855 /* DATE NAME DESCRIPTION */
2856 /* */
2857 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2858 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2859 /* resulting in version 6.1 */
2860 /* */
2861 /**************************************************************************/
_nxd_mqtt_periodic_timer_entry(ULONG client)2862 static VOID _nxd_mqtt_periodic_timer_entry(ULONG client)
2863 {
2864 /* Check if it is time to send out a ping message. */
2865 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)client;
2866
2867 /* If an outstanding ping response has not been received, and the client exceeds the time waiting for ping response,
2868 the client shall disconnect from the server. */
2869 if (client_ptr -> nxd_mqtt_ping_not_responded)
2870 {
2871 /* If current time is greater than the ping timeout */
2872 if ((tx_time_get() - client_ptr -> nxd_mqtt_ping_sent_time) >= client_ptr -> nxd_mqtt_ping_timeout)
2873 {
2874 /* Ping timed out. Need to terminate the connection. */
2875 #ifndef NXD_MQTT_CLOUD_ENABLE
2876 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_PING_TIMEOUT_EVENT, TX_OR);
2877 #else
2878 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PING_TIMEOUT_EVENT);
2879 #endif /* NXD_MQTT_CLOUD_ENABLE */
2880
2881 return;
2882 }
2883 }
2884
2885 /* About to timeout? */
2886 if ((client_ptr -> nxd_mqtt_timeout - tx_time_get()) <= client_ptr -> nxd_mqtt_timer_value)
2887 {
2888 /* Set the flag so the MQTT thread can send the ping. */
2889 #ifndef NXD_MQTT_CLOUD_ENABLE
2890 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_TIMEOUT_EVENT, TX_OR);
2891 #else
2892 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_TIMEOUT_EVENT);
2893 #endif /* NXD_MQTT_CLOUD_ENABLE */
2894 }
2895
2896 /* If keepalive is not enabled, just return. */
2897 return;
2898 }
2899
2900
2901
2902 /**************************************************************************/
2903 /* */
2904 /* FUNCTION RELEASE */
2905 /* */
2906 /* _nxd_mqtt_client_event_process PORTABLE C */
2907 /* 6.1 */
2908 /* AUTHOR */
2909 /* */
2910 /* Yuxin Zhou, Microsoft Corporation */
2911 /* */
2912 /* DESCRIPTION */
2913 /* */
2914 /* This internal function serves as the entry point for the MQTT */
2915 /* client thread. */
2916 /* */
2917 /* INPUT */
2918 /* */
2919 /* mqtt_client Pointer to MQTT Client */
2920 /* */
2921 /* OUTPUT */
2922 /* */
2923 /* None */
2924 /* */
2925 /* CALLS */
2926 /* */
2927 /* _nxd_mqtt_release_transmit_packet */
2928 /* _nxd_mqtt_release_receive_packet */
2929 /* _nxd_mqtt_send_simple_message */
2930 /* _nxd_mqtt_process_disconnect */
2931 /* _nxd_mqtt_packet_receive_process */
2932 /* tx_timer_delete */
2933 /* tx_event_flags_delete */
2934 /* nx_tcp_socket_delete */
2935 /* tx_timer_delete */
2936 /* tx_mutex_delete */
2937 /* */
2938 /* CALLED BY */
2939 /* */
2940 /* _nxd_mqtt_client_create */
2941 /* */
2942 /* RELEASE HISTORY */
2943 /* */
2944 /* DATE NAME DESCRIPTION */
2945 /* */
2946 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2947 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
2948 /* corrected mqtt client state,*/
2949 /* resulting in version 6.1 */
2950 /* */
2951 /**************************************************************************/
_nxd_mqtt_client_event_process(VOID * mqtt_client,ULONG common_events,ULONG module_own_events)2952 static VOID _nxd_mqtt_client_event_process(VOID *mqtt_client, ULONG common_events, ULONG module_own_events)
2953 {
2954 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)mqtt_client;
2955
2956
2957 /* Obtain the mutex. */
2958 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
2959
2960 /* Process common events. */
2961 NX_PARAMETER_NOT_USED(common_events);
2962
2963 if (module_own_events & MQTT_TIMEOUT_EVENT)
2964 {
2965 /* Send out PING only if the client is connected. */
2966 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2967 {
2968 _nxd_mqtt_send_simple_message(client_ptr, MQTT_CONTROL_PACKET_TYPE_PINGREQ);
2969 }
2970 }
2971
2972 if (module_own_events & MQTT_TCP_ESTABLISH_EVENT)
2973 {
2974 _nxd_mqtt_tcp_establish_process(client_ptr);
2975 }
2976
2977 if (module_own_events & MQTT_PACKET_RECEIVE_EVENT)
2978 {
2979 #ifdef NX_SECURE_ENABLE
2980 /* TLS in progress on async mode. */
2981 if (client_ptr -> nxd_mqtt_tls_in_progress)
2982 {
2983 _nxd_mqtt_tls_establish_process(client_ptr);
2984 }
2985 else
2986 #endif /* NX_SECURE_ENABLE */
2987
2988 _nxd_mqtt_packet_receive_process(client_ptr);
2989 }
2990
2991 if (module_own_events & MQTT_PING_TIMEOUT_EVENT)
2992 {
2993 /* The server/broker didn't respond to our ping request message. Disconnect from the server. */
2994 _nxd_mqtt_process_disconnect(client_ptr);
2995 }
2996 if (module_own_events & MQTT_NETWORK_DISCONNECT_EVENT)
2997 {
2998 /* The server closed TCP socket. We shall go through the disconnect code path. */
2999 _nxd_mqtt_process_disconnect(client_ptr);
3000 }
3001
3002 if (module_own_events & MQTT_DELETE_EVENT)
3003 {
3004
3005 /* Stop the client and disconnect from the server. */
3006 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
3007 {
3008 _nxd_mqtt_process_disconnect(client_ptr);
3009 }
3010
3011 /* Delete the timer. Check first if it is already deleted. */
3012 if ((client_ptr -> nxd_mqtt_timer).tx_timer_id != 0)
3013 tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
3014
3015 #ifndef NXD_MQTT_CLOUD_ENABLE
3016 /* Delete the event flag. Check first if it is already deleted. */
3017 if ((client_ptr -> nxd_mqtt_events).tx_event_flags_group_id != 0)
3018 tx_event_flags_delete(&client_ptr -> nxd_mqtt_events);
3019 #endif /* NXD_MQTT_CLOUD_ENABLE */
3020
3021 /* Release all the messages on the receive queue. */
3022 while (client_ptr -> message_receive_queue_head)
3023 {
3024 _nxd_mqtt_release_receive_packet(client_ptr, client_ptr -> message_receive_queue_head, NX_NULL);
3025 }
3026 client_ptr -> message_receive_queue_depth = 0;
3027
3028 /* Delete all the messages sitting in the receive and transmit queue. */
3029 while (client_ptr -> message_transmit_queue_head)
3030 {
3031 _nxd_mqtt_release_transmit_packet(client_ptr, client_ptr -> message_transmit_queue_head, NX_NULL);
3032 }
3033
3034 /* Release mutex */
3035 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3036
3037 #ifndef NXD_MQTT_CLOUD_ENABLE
3038 /* Delete the mutex. */
3039 tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3040 #endif /* NXD_MQTT_CLOUD_ENABLE */
3041
3042 /* Deleting the socket, (the socket ID is cleared); this signals it is ok to delete this thread. */
3043 nx_tcp_socket_delete(&client_ptr -> nxd_mqtt_client_socket);
3044 }
3045 else
3046 {
3047
3048 /* Release mutex */
3049 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3050 }
3051 }
3052
3053
3054 #ifndef NXD_MQTT_CLOUD_ENABLE
3055 /**************************************************************************/
3056 /* */
3057 /* FUNCTION RELEASE */
3058 /* */
3059 /* _nxd_mqtt_thread_entry PORTABLE C */
3060 /* 6.1 */
3061 /* AUTHOR */
3062 /* */
3063 /* Yuxin Zhou, Microsoft Corporation */
3064 /* */
3065 /* DESCRIPTION */
3066 /* */
3067 /* This internal function serves as the entry point for the MQTT */
3068 /* client thread. */
3069 /* */
3070 /* INPUT */
3071 /* */
3072 /* mqtt_client Pointer to MQTT Client */
3073 /* */
3074 /* OUTPUT */
3075 /* */
3076 /* None */
3077 /* */
3078 /* CALLS */
3079 /* */
3080 /* tx_event_flags_get */
3081 /* _nxd_mqtt_client_event_process */
3082 /* */
3083 /* CALLED BY */
3084 /* */
3085 /* _nxd_mqtt_client_create */
3086 /* */
3087 /* RELEASE HISTORY */
3088 /* */
3089 /* DATE NAME DESCRIPTION */
3090 /* */
3091 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3092 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3093 /* resulting in version 6.1 */
3094 /* */
3095 /**************************************************************************/
_nxd_mqtt_thread_entry(ULONG mqtt_client)3096 static VOID _nxd_mqtt_thread_entry(ULONG mqtt_client)
3097 {
3098 NXD_MQTT_CLIENT *client_ptr;
3099 ULONG events;
3100
3101 client_ptr = (NXD_MQTT_CLIENT *)mqtt_client;
3102
3103 /* Loop to process events on the MQTT client */
3104 for (;;)
3105 {
3106
3107 tx_event_flags_get(&client_ptr -> nxd_mqtt_events, MQTT_ALL_EVENTS, TX_OR_CLEAR, &events, TX_WAIT_FOREVER);
3108
3109 /* Call the event processing routine. */
3110 _nxd_mqtt_client_event_process(client_ptr, NX_NULL, events);
3111
3112 if (events & MQTT_DELETE_EVENT)
3113 {
3114 break;
3115 }
3116 }
3117 }
3118 #endif /* NXD_MQTT_CLOUD_ENABLE */
3119
3120
3121 /**************************************************************************/
3122 /* */
3123 /* FUNCTION RELEASE */
3124 /* */
3125 /* _mqtt_client_disconnect_callback PORTABLE C */
3126 /* 6.1 */
3127 /* AUTHOR */
3128 /* */
3129 /* Yuxin Zhou, Microsoft Corporation */
3130 /* */
3131 /* DESCRIPTION */
3132 /* */
3133 /* This internal function is passed to MQTT client socket create call. */
3134 /* This callback function notifies MQTT client thread when the TCP */
3135 /* connection is lost. */
3136 /* */
3137 /* */
3138 /* INPUT */
3139 /* */
3140 /* socket_ptr Pointer to TCP socket that */
3141 /* disconnected. */
3142 /* */
3143 /* OUTPUT */
3144 /* */
3145 /* None */
3146 /* */
3147 /* CALLS */
3148 /* */
3149 /* None */
3150 /* */
3151 /* CALLED BY */
3152 /* */
3153 /* TCP socket disconnect callback */
3154 /* */
3155 /* RELEASE HISTORY */
3156 /* */
3157 /* DATE NAME DESCRIPTION */
3158 /* */
3159 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3160 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3161 /* resulting in version 6.1 */
3162 /* */
3163 /**************************************************************************/
_mqtt_client_disconnect_callback(NX_TCP_SOCKET * socket_ptr)3164 static VOID _mqtt_client_disconnect_callback(NX_TCP_SOCKET *socket_ptr)
3165 {
3166 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)(socket_ptr -> nx_tcp_socket_reserved_ptr);
3167
3168 /* Set the MQTT_NETWORK_DISCONNECT event. This event indicates
3169 that the disconnect is initiated from the network. */
3170 #ifndef NXD_MQTT_CLOUD_ENABLE
3171 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_NETWORK_DISCONNECT_EVENT, TX_OR);
3172 #else
3173 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_NETWORK_DISCONNECT_EVENT);
3174 #endif /* NXD_MQTT_CLOUD_ENABLE */
3175
3176 return;
3177 }
3178
3179
3180 /**************************************************************************/
3181 /* */
3182 /* FUNCTION RELEASE */
3183 /* */
3184 /* _nxd_mqtt_client_create PORTABLE C */
3185 /* 6.1 */
3186 /* AUTHOR */
3187 /* */
3188 /* Yuxin Zhou, Microsoft Corporation */
3189 /* */
3190 /* DESCRIPTION */
3191 /* */
3192 /* This function creates an instance for MQTT Client. It initializes */
3193 /* MQTT Client control block, creates a thread, mutex and event queue */
3194 /* for MQTT Client, and creates the TCP socket for MQTT messaging. */
3195 /* */
3196 /* */
3197 /* INPUT */
3198 /* */
3199 /* client_ptr Pointer to MQTT Client */
3200 /* client_id Client ID for this instance */
3201 /* client_id_length Length of Client ID, in bytes */
3202 /* ip_ptr Pointer to IP instance */
3203 /* pool_ptr Pointer to packet pool */
3204 /* stack_ptr Client thread's stack pointer */
3205 /* stack_size Client thread's stack size */
3206 /* mqtt_thread_priority Priority for MQTT thread */
3207 /* memory_ptr Deprecated and not used */
3208 /* memory_size Deprecated and not used */
3209 /* */
3210 /* OUTPUT */
3211 /* */
3212 /* status Completion status */
3213 /* */
3214 /* CALLS */
3215 /* */
3216 /* _nxd_mqtt_client_create_internal Create mqtt client */
3217 /* */
3218 /* CALLED BY */
3219 /* */
3220 /* Application Code */
3221 /* */
3222 /* RELEASE HISTORY */
3223 /* */
3224 /* DATE NAME DESCRIPTION */
3225 /* */
3226 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3227 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
3228 /* corrected mqtt client state,*/
3229 /* resulting in version 6.1 */
3230 /* */
3231 /**************************************************************************/
_nxd_mqtt_client_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority,VOID * memory_ptr,ULONG memory_size)3232 UINT _nxd_mqtt_client_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
3233 CHAR *client_id, UINT client_id_length,
3234 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
3235 VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority,
3236 VOID *memory_ptr, ULONG memory_size)
3237 {
3238
3239 UINT status;
3240
3241
3242 NX_PARAMETER_NOT_USED(memory_ptr);
3243 NX_PARAMETER_NOT_USED(memory_size);
3244
3245 /* Create MQTT client. */
3246 status = _nxd_mqtt_client_create_internal(client_ptr, client_name, client_id, client_id_length, ip_ptr,
3247 pool_ptr, stack_ptr, stack_size, mqtt_thread_priority);
3248
3249 /* Check status. */
3250 if (status)
3251 {
3252 return(status);
3253 }
3254
3255 #ifdef NXD_MQTT_CLOUD_ENABLE
3256
3257 /* Create cloud helper. */
3258 status = nx_cloud_create(&(client_ptr -> nxd_mqtt_client_cloud), "Cloud Helper", stack_ptr, stack_size, mqtt_thread_priority);
3259
3260 /* Determine if an error occurred. */
3261 if (status != NX_SUCCESS)
3262 {
3263
3264 /* Delete internal resource created in _nxd_mqtt_client_create_internal(). */
3265
3266 /* Delete socket. */
3267 nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
3268
3269 return(NXD_MQTT_INTERNAL_ERROR);
3270 }
3271
3272 /* Save the cloud pointer. */
3273 client_ptr -> nxd_mqtt_client_cloud_ptr = &(client_ptr -> nxd_mqtt_client_cloud);
3274
3275 /* Save the mutex pointer. */
3276 client_ptr -> nxd_mqtt_client_mutex_ptr = &(client_ptr -> nxd_mqtt_client_cloud.nx_cloud_mutex);
3277
3278 /* Register MQTT on cloud helper. */
3279 status = nx_cloud_module_register(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module), client_name, NX_CLOUD_MODULE_MQTT_EVENT,
3280 _nxd_mqtt_client_event_process, client_ptr);
3281
3282 /* Determine if an error occurred. */
3283 if (status != NX_SUCCESS)
3284 {
3285
3286 /* Delete own created cloud. */
3287 nx_cloud_delete(&(client_ptr -> nxd_mqtt_client_cloud));
3288
3289 /* Delete internal resource created in _nxd_mqtt_client_create_internal(). */
3290
3291 /* Delete socket. */
3292 nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
3293
3294 return(NXD_MQTT_INTERNAL_ERROR);
3295 }
3296
3297 #endif /* NXD_MQTT_CLOUD_ENABLE */
3298
3299 /* Update state. */
3300 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
3301
3302 /* Return. */
3303 return(NXD_MQTT_SUCCESS);
3304 }
3305
3306
3307 /**************************************************************************/
3308 /* */
3309 /* FUNCTION RELEASE */
3310 /* */
3311 /* _nxd_mqtt_client_create_internal PORTABLE C */
3312 /* 6.1.12 */
3313 /* AUTHOR */
3314 /* */
3315 /* Yuxin Zhou, Microsoft Corporation */
3316 /* */
3317 /* DESCRIPTION */
3318 /* */
3319 /* This function creates an instance for MQTT Client. It initializes */
3320 /* MQTT Client control block, creates a thread, mutex and event queue */
3321 /* for MQTT Client, and creates the TCP socket for MQTT messaging. */
3322 /* */
3323 /* */
3324 /* INPUT */
3325 /* */
3326 /* client_ptr Pointer to MQTT Client */
3327 /* client_id Client ID for this instance */
3328 /* client_id_length Length of Client ID, in bytes */
3329 /* ip_ptr Pointer to IP instance */
3330 /* pool_ptr Pointer to packet pool */
3331 /* stack_ptr Client thread's stack pointer */
3332 /* stack_size Client thread's stack size */
3333 /* mqtt_thread_priority Priority for MQTT thread */
3334 /* */
3335 /* OUTPUT */
3336 /* */
3337 /* status Completion status */
3338 /* */
3339 /* CALLS */
3340 /* */
3341 /* tx_thread_create Create a thread */
3342 /* tx_mutex_create Create mutex */
3343 /* tx_mutex_get */
3344 /* tx_mutex_put */
3345 /* tx_event_flag_create Create event flag */
3346 /* */
3347 /* CALLED BY */
3348 /* */
3349 /* Application Code */
3350 /* */
3351 /* RELEASE HISTORY */
3352 /* */
3353 /* DATE NAME DESCRIPTION */
3354 /* */
3355 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3356 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
3357 /* corrected mqtt client state,*/
3358 /* resulting in version 6.1 */
3359 /* 07-29-2022 Spencer McDonough Modified comment(s), and */
3360 /* improved internal logic, */
3361 /* resulting in version 6.1.12 */
3362 /* */
3363 /**************************************************************************/
_nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority)3364 static UINT _nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
3365 CHAR *client_id, UINT client_id_length,
3366 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
3367 VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority)
3368 {
3369 #ifndef NXD_MQTT_CLOUD_ENABLE
3370 UINT status;
3371 #endif /* NXD_MQTT_CLOUD_ENABLE */
3372
3373 #ifdef NXD_MQTT_CLOUD_ENABLE
3374 NX_PARAMETER_NOT_USED(stack_ptr);
3375 NX_PARAMETER_NOT_USED(stack_size);
3376 NX_PARAMETER_NOT_USED(mqtt_thread_priority);
3377 #endif /* NXD_MQTT_CLOUD_ENABLE */
3378
3379 /* Clear the MQTT Client control block. */
3380 NXD_MQTT_SECURE_MEMSET((void *)client_ptr, 0, sizeof(NXD_MQTT_CLIENT));
3381
3382 #ifndef NXD_MQTT_CLOUD_ENABLE
3383
3384 /* Create MQTT mutex. */
3385 status = tx_mutex_create(&client_ptr -> nxd_mqtt_protection, client_name, TX_NO_INHERIT);
3386
3387 /* Determine if an error occurred. */
3388 if (status != TX_SUCCESS)
3389 {
3390
3391 return(NXD_MQTT_INTERNAL_ERROR);
3392 }
3393 client_ptr -> nxd_mqtt_client_mutex_ptr = &(client_ptr -> nxd_mqtt_protection);
3394
3395 /* Now create MQTT client thread */
3396 status = tx_thread_create(&(client_ptr -> nxd_mqtt_thread), client_name, _nxd_mqtt_thread_entry,
3397 (ULONG)client_ptr, stack_ptr, stack_size, mqtt_thread_priority, mqtt_thread_priority,
3398 NXD_MQTT_CLIENT_THREAD_TIME_SLICE, TX_DONT_START);
3399
3400 /* Determine if an error occurred. */
3401 if (status != TX_SUCCESS)
3402 {
3403 /* Delete the mutex. */
3404 tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3405
3406 /* Return error code. */
3407 return(NXD_MQTT_INTERNAL_ERROR);
3408 }
3409
3410 status = tx_event_flags_create(&(client_ptr -> nxd_mqtt_events), client_name);
3411
3412 if (status != TX_SUCCESS)
3413 {
3414 /* Delete the mutex. */
3415 tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3416
3417 /* Delete the thread. */
3418 tx_thread_delete(&(client_ptr -> nxd_mqtt_thread));
3419
3420 /* Return error code. */
3421 return(NXD_MQTT_INTERNAL_ERROR);
3422 }
3423 #endif /* NXD_MQTT_CLOUD_ENABLE */
3424
3425 /* Record the client ID information. */
3426 client_ptr -> nxd_mqtt_client_id = client_id;
3427 client_ptr -> nxd_mqtt_client_id_length = client_id_length;
3428 client_ptr -> nxd_mqtt_client_ip_ptr = ip_ptr;
3429 client_ptr -> nxd_mqtt_client_packet_pool_ptr = pool_ptr;
3430 client_ptr -> nxd_mqtt_client_name = client_name;
3431
3432 /* Create the socket. */
3433 nx_tcp_socket_create(client_ptr -> nxd_mqtt_client_ip_ptr, &(client_ptr -> nxd_mqtt_client_socket), client_ptr -> nxd_mqtt_client_name,
3434 NX_IP_NORMAL, NX_DONT_FRAGMENT, 0x80, NXD_MQTT_CLIENT_SOCKET_WINDOW_SIZE,
3435 NX_NULL, _mqtt_client_disconnect_callback);
3436
3437
3438 /* Record the client_ptr in the socket structure. */
3439 client_ptr -> nxd_mqtt_client_socket.nx_tcp_socket_reserved_ptr = (VOID *)client_ptr;
3440
3441 #ifndef NXD_MQTT_CLOUD_ENABLE
3442 /* Start MQTT thread. */
3443 tx_thread_resume(&(client_ptr -> nxd_mqtt_thread));
3444 #endif /* NXD_MQTT_CLOUD_ENABLE */
3445 return(NXD_MQTT_SUCCESS);
3446 }
3447
3448
3449 /**************************************************************************/
3450 /* */
3451 /* FUNCTION RELEASE */
3452 /* */
3453 /* _nxd_mqtt_client_login_set PORTABLE C */
3454 /* 6.1 */
3455 /* AUTHOR */
3456 /* */
3457 /* Yuxin Zhou, Microsoft Corporation */
3458 /* */
3459 /* DESCRIPTION */
3460 /* */
3461 /* This function sets optional MQTT username and password. Note */
3462 /* if the broker requires username and password, this information */
3463 /* must be set prior to calling nxd_mqtt_client_connect or */
3464 /* nxd_mqtt_client_secure_connect. */
3465 /* */
3466 /* */
3467 /* INPUT */
3468 /* */
3469 /* client_ptr Pointer to MQTT Client */
3470 /* username User name, or NULL if user */
3471 /* name is not required */
3472 /* username_length Length of the user name, or */
3473 /* 0 if user name is NULL */
3474 /* password Password string, or NULL if */
3475 /* password is not required */
3476 /* password_length Length of the password, or */
3477 /* 0 if password is NULL */
3478 /* */
3479 /* OUTPUT */
3480 /* */
3481 /* status Completion status */
3482 /* */
3483 /* CALLS */
3484 /* */
3485 /* tx_mutex_get */
3486 /* */
3487 /* CALLED BY */
3488 /* */
3489 /* Application Code */
3490 /* */
3491 /* RELEASE HISTORY */
3492 /* */
3493 /* DATE NAME DESCRIPTION */
3494 /* */
3495 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3496 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3497 /* resulting in version 6.1 */
3498 /* */
3499 /**************************************************************************/
_nxd_mqtt_client_login_set(NXD_MQTT_CLIENT * client_ptr,CHAR * username,UINT username_length,CHAR * password,UINT password_length)3500 UINT _nxd_mqtt_client_login_set(NXD_MQTT_CLIENT *client_ptr,
3501 CHAR *username, UINT username_length, CHAR *password, UINT password_length)
3502 {
3503 UINT status;
3504
3505 /* Obtain the mutex. */
3506 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3507
3508 if (status != TX_SUCCESS)
3509 {
3510 return(NXD_MQTT_MUTEX_FAILURE);
3511 }
3512 client_ptr -> nxd_mqtt_client_username = username;
3513 client_ptr -> nxd_mqtt_client_username_length = (USHORT)username_length;
3514 client_ptr -> nxd_mqtt_client_password = password;
3515 client_ptr -> nxd_mqtt_client_password_length = (USHORT)password_length;
3516
3517 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3518
3519 return(NX_SUCCESS);
3520 }
3521
3522 /**************************************************************************/
3523 /* */
3524 /* FUNCTION RELEASE */
3525 /* */
3526 /* _nxd_mqtt_client_will_message_set PORTABLE C */
3527 /* 6.1 */
3528 /* AUTHOR */
3529 /* */
3530 /* Yuxin Zhou, Microsoft Corporation */
3531 /* */
3532 /* DESCRIPTION */
3533 /* */
3534 /* This function sets optional MQTT will topic and will message. */
3535 /* Note that if will message is needed, application must set will */
3536 /* message prior to calling nxd_mqtt_client_connect or */
3537 /* nxd_mqtt_client_secure_connect. */
3538 /* */
3539 /* */
3540 /* INPUT */
3541 /* */
3542 /* client_ptr Pointer to MQTT Client */
3543 /* will_topic Will topic string. */
3544 /* will_topic_length Length of the will topic. */
3545 /* will_message Will message string. */
3546 /* will_message_length Length of the will message. */
3547 /* will_retain_flag Whether or not the will */
3548 /* message is to be retained */
3549 /* when it is published. */
3550 /* Valid values are NX_TRUE */
3551 /* NX_FALSE */
3552 /* will_QoS QoS level to be used when */
3553 /* publishing will message. */
3554 /* Valid values are 0, 1, 2 */
3555 /* */
3556 /* OUTPUT */
3557 /* */
3558 /* status Completion status */
3559 /* */
3560 /* CALLS */
3561 /* */
3562 /* tx_mutex_get */
3563 /* */
3564 /* CALLED BY */
3565 /* */
3566 /* Application Code */
3567 /* */
3568 /* RELEASE HISTORY */
3569 /* */
3570 /* DATE NAME DESCRIPTION */
3571 /* */
3572 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3573 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3574 /* resulting in version 6.1 */
3575 /* */
3576 /**************************************************************************/
_nxd_mqtt_client_will_message_set(NXD_MQTT_CLIENT * client_ptr,const UCHAR * will_topic,UINT will_topic_length,const UCHAR * will_message,UINT will_message_length,UINT will_retain_flag,UINT will_QoS)3577 UINT _nxd_mqtt_client_will_message_set(NXD_MQTT_CLIENT *client_ptr,
3578 const UCHAR *will_topic, UINT will_topic_length, const UCHAR *will_message,
3579 UINT will_message_length, UINT will_retain_flag, UINT will_QoS)
3580 {
3581 UINT status;
3582
3583 if (will_QoS == 2)
3584 {
3585 return(NXD_MQTT_QOS2_NOT_SUPPORTED);
3586 }
3587
3588 /* Obtain the mutex. */
3589 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3590
3591 if (status != TX_SUCCESS)
3592 {
3593 return(NXD_MQTT_MUTEX_FAILURE);
3594 }
3595
3596 client_ptr -> nxd_mqtt_client_will_topic = will_topic;
3597 client_ptr -> nxd_mqtt_client_will_topic_length = will_topic_length;
3598 client_ptr -> nxd_mqtt_client_will_message = will_message;
3599 client_ptr -> nxd_mqtt_client_will_message_length = will_message_length;
3600
3601 if (will_retain_flag == NX_TRUE)
3602 {
3603 client_ptr -> nxd_mqtt_client_will_qos_retain = 0x80;
3604 }
3605 client_ptr -> nxd_mqtt_client_will_qos_retain = (UCHAR)(client_ptr -> nxd_mqtt_client_will_qos_retain | will_QoS);
3606
3607 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3608
3609 return(NX_SUCCESS);
3610 }
3611
3612
3613 /**************************************************************************/
3614 /* */
3615 /* FUNCTION RELEASE */
3616 /* */
3617 /* _nxde_mqtt_client_will_message_set PORTABLE C */
3618 /* 6.1 */
3619 /* AUTHOR */
3620 /* */
3621 /* Yuxin Zhou, Microsoft Corporation */
3622 /* */
3623 /* DESCRIPTION */
3624 /* */
3625 /* This function checks for errors in the */
3626 /* nxd_mqtt_client_will_message_set call. */
3627 /* */
3628 /* */
3629 /* INPUT */
3630 /* */
3631 /* client_ptr Pointer to MQTT Client */
3632 /* will_topic Will topic string. */
3633 /* will_topic_length Length of the will topic. */
3634 /* will_message Will message string. */
3635 /* will_message_length Length of the will message. */
3636 /* will_retain_flag Whether or not the will */
3637 /* message is to be retained */
3638 /* when it is published. */
3639 /* Valid values are NX_TRUE */
3640 /* NX_FALSE */
3641 /* will_QoS QoS level to be used when */
3642 /* publishing will message. */
3643 /* Valid values are 0, 1, 2 */
3644 /* */
3645 /* OUTPUT */
3646 /* */
3647 /* status Completion status */
3648 /* */
3649 /* CALLS */
3650 /* */
3651 /* _nxd_mqtt_client_will_message_set */
3652 /* */
3653 /* CALLED BY */
3654 /* */
3655 /* Application Code */
3656 /* */
3657 /* RELEASE HISTORY */
3658 /* */
3659 /* DATE NAME DESCRIPTION */
3660 /* */
3661 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3662 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3663 /* resulting in version 6.1 */
3664 /* */
3665 /**************************************************************************/
_nxde_mqtt_client_will_message_set(NXD_MQTT_CLIENT * client_ptr,const UCHAR * will_topic,UINT will_topic_length,const UCHAR * will_message,UINT will_message_length,UINT will_retain_flag,UINT will_QoS)3666 UINT _nxde_mqtt_client_will_message_set(NXD_MQTT_CLIENT *client_ptr,
3667 const UCHAR *will_topic, UINT will_topic_length, const UCHAR *will_message,
3668 UINT will_message_length, UINT will_retain_flag, UINT will_QoS)
3669 {
3670
3671 if (client_ptr == NX_NULL)
3672 {
3673 return(NXD_MQTT_INVALID_PARAMETER);
3674 }
3675
3676 /* Valid will_topic string. The will topic string cannot be NULL. */
3677 if ((will_topic == NX_NULL) || (will_topic_length == 0))
3678 {
3679 return(NXD_MQTT_INVALID_PARAMETER);
3680 }
3681
3682 if ((will_retain_flag != NX_TRUE) && (will_retain_flag != NX_FALSE))
3683 {
3684 return(NXD_MQTT_INVALID_PARAMETER);
3685 }
3686
3687 if (will_QoS > 2)
3688 {
3689 return(NXD_MQTT_INVALID_PARAMETER);
3690 }
3691
3692 return(_nxd_mqtt_client_will_message_set(client_ptr, will_topic, will_topic_length, will_message,
3693 will_message_length, will_retain_flag, will_QoS));
3694 }
3695
3696 /**************************************************************************/
3697 /* */
3698 /* FUNCTION RELEASE */
3699 /* */
3700 /* _nxde_mqtt_client_login_set PORTABLE C */
3701 /* 6.1 */
3702 /* AUTHOR */
3703 /* */
3704 /* Yuxin Zhou, Microsoft Corporation */
3705 /* */
3706 /* DESCRIPTION */
3707 /* */
3708 /* This function checks for errors in the nxd_mqtt_client_login call. */
3709 /* */
3710 /* */
3711 /* INPUT */
3712 /* */
3713 /* client_ptr Pointer to MQTT Client */
3714 /* username User name, or NULL if user */
3715 /* name is not required */
3716 /* username_length Length of the user name, or */
3717 /* 0 if user name is NULL */
3718 /* password Password string, or NULL if */
3719 /* password is not required */
3720 /* password_length Length of the password, or */
3721 /* 0 if password is NULL */
3722 /* */
3723 /* OUTPUT */
3724 /* */
3725 /* status Completion status */
3726 /* */
3727 /* CALLS */
3728 /* */
3729 /* _nxd_mqtt_client_login_set */
3730 /* */
3731 /* CALLED BY */
3732 /* */
3733 /* Application Code */
3734 /* */
3735 /* RELEASE HISTORY */
3736 /* */
3737 /* DATE NAME DESCRIPTION */
3738 /* */
3739 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3740 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3741 /* resulting in version 6.1 */
3742 /* */
3743 /**************************************************************************/
_nxde_mqtt_client_login_set(NXD_MQTT_CLIENT * client_ptr,CHAR * username,UINT username_length,CHAR * password,UINT password_length)3744 UINT _nxde_mqtt_client_login_set(NXD_MQTT_CLIENT *client_ptr,
3745 CHAR *username, UINT username_length, CHAR *password, UINT password_length)
3746 {
3747 if (client_ptr == NX_NULL)
3748 {
3749 return(NXD_MQTT_INVALID_PARAMETER);
3750 }
3751
3752 /* Username and username length don't match,
3753 or password and password length don't match. */
3754 if (((username == NX_NULL) && (username_length != 0)) ||
3755 ((password == NX_NULL) && (password_length != 0)))
3756 {
3757 return(NXD_MQTT_INVALID_PARAMETER);
3758 }
3759
3760 return(_nxd_mqtt_client_login_set(client_ptr, username, username_length, password, password_length));
3761 }
3762
3763
3764 /**************************************************************************/
3765 /* */
3766 /* FUNCTION RELEASE */
3767 /* */
3768 /* _nxd_mqtt_client_retransmit_message PORTABLE C */
3769 /* 6.2.0 */
3770 /* AUTHOR */
3771 /* */
3772 /* Yuxin Zhou, Microsoft Corporation */
3773 /* */
3774 /* DESCRIPTION */
3775 /* */
3776 /* This function retransmit QoS1 messages upon reconnection, if the */
3777 /* connection is not set CLEAN_SESSION. */
3778 /* */
3779 /* */
3780 /* INPUT */
3781 /* */
3782 /* client_ptr Pointer to MQTT Client */
3783 /* wait_option Timeout value */
3784 /* */
3785 /* OUTPUT */
3786 /* */
3787 /* status Completion status */
3788 /* */
3789 /* CALLS */
3790 /* */
3791 /* tx_mutex_get */
3792 /* tx_mutex_put */
3793 /* _nxd_mqtt_packet_send */
3794 /* nx_packet_release */
3795 /* tx_time_get */
3796 /* nx_packet_copy */
3797 /* */
3798 /* CALLED BY */
3799 /* */
3800 /* Application Code */
3801 /* */
3802 /* RELEASE HISTORY */
3803 /* */
3804 /* DATE NAME DESCRIPTION */
3805 /* */
3806 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3807 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3808 /* resulting in version 6.1 */
3809 /* 10-31-2022 Bo Chen Modified comment(s), improved */
3810 /* the logic of sending packet,*/
3811 /* resulting in version 6.2.0 */
3812 /* */
3813 /**************************************************************************/
_nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)3814 static UINT _nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
3815 {
3816 NX_PACKET *transmit_packet_ptr;
3817 NX_PACKET *packet_ptr;
3818 UINT status = NXD_MQTT_SUCCESS;
3819 UINT mutex_status;
3820 UCHAR fixed_header;
3821
3822 transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
3823
3824 while (transmit_packet_ptr)
3825 {
3826 fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
3827
3828 if ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4))
3829 {
3830
3831 /* Retransmit publish packet only. */
3832 /* Obtain a NetX Packet. */
3833 status = nx_packet_copy(transmit_packet_ptr, &packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
3834
3835 if (status != NXD_MQTT_SUCCESS)
3836 {
3837 return(NXD_MQTT_PACKET_POOL_FAILURE);
3838 }
3839
3840 /* Release the mutex. */
3841 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3842
3843 /* Send packet to server. */
3844 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
3845
3846 if (status)
3847 {
3848 /* Release the packet. */
3849 nx_packet_release(packet_ptr);
3850
3851 status = NXD_MQTT_COMMUNICATION_FAILURE;
3852 }
3853 /* Obtain the mutex. */
3854 mutex_status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, wait_option);
3855
3856 if (mutex_status != TX_SUCCESS)
3857 {
3858 return(NXD_MQTT_MUTEX_FAILURE);
3859 }
3860 if (status)
3861 {
3862 return(status);
3863 }
3864 }
3865 transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
3866 }
3867
3868 /* Update the timeout value. */
3869 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
3870
3871 return(status);
3872 }
3873
3874 /**************************************************************************/
3875 /* */
3876 /* FUNCTION RELEASE */
3877 /* */
3878 /* _nxd_mqtt_client_connect PORTABLE C */
3879 /* 6.3.0 */
3880 /* AUTHOR */
3881 /* */
3882 /* Yuxin Zhou, Microsoft Corporation */
3883 /* */
3884 /* DESCRIPTION */
3885 /* */
3886 /* This function makes an initial connection to the MQTT server. */
3887 /* */
3888 /* */
3889 /* INPUT */
3890 /* */
3891 /* client_ptr Pointer to MQTT Client */
3892 /* server_ip Server IP address structure */
3893 /* server_port Server port number, in host */
3894 /* byte order */
3895 /* keepalive Keepalive flag */
3896 /* clean_session Clean session flag */
3897 /* wait_option Timeout value */
3898 /* */
3899 /* */
3900 /* OUTPUT */
3901 /* */
3902 /* status Completion status */
3903 /* */
3904 /* CALLS */
3905 /* */
3906 /* tx_mutex_get */
3907 /* nx_tcp_socket_create Create TCP socket */
3908 /* nx_tcp_socket_receive_notify */
3909 /* nx_tcp_client_socket_bind */
3910 /* nxd_tcp_client_socket_connect */
3911 /* nx_tcp_client_socket_unbind */
3912 /* tx_mutex_put */
3913 /* nx_secure_tls_session_start */
3914 /* nx_secure_tls_session_send */
3915 /* nx_secure_tls_session_receive */
3916 /* _nxd_mqtt_client_set_fixed_header */
3917 /* _nxd_mqtt_client_append_message */
3918 /* nx_tcp_socket_send */
3919 /* nx_packet_release */
3920 /* nx_tcp_socket_receive */
3921 /* tx_event_flag_set */
3922 /* _nxd_mqtt_release_transmit_packet */
3923 /* tx_timer_create */
3924 /* nx_secure_tls_session_receive */
3925 /* _nxd_mqtt_client_connection_end */
3926 /* */
3927 /* CALLED BY */
3928 /* */
3929 /* Application Code */
3930 /* */
3931 /* RELEASE HISTORY */
3932 /* */
3933 /* DATE NAME DESCRIPTION */
3934 /* */
3935 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3936 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
3937 /* fixed return value when it */
3938 /* is set in CONNACK, corrected*/
3939 /* mqtt client state, */
3940 /* resulting in version 6.1 */
3941 /* 08-02-2021 Yuxin Zhou Modified comment(s), and */
3942 /* corrected the logic for */
3943 /* non-blocking mode, */
3944 /* resulting in version 6.1.8 */
3945 /* 07-29-2022 Spencer McDonough Modified comment(s), and */
3946 /* improved internal logic, */
3947 /* resulting in version 6.1.12 */
3948 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
3949 /* mqtt over websocket, */
3950 /* resulting in version 6.2.0 */
3951 /* 10-31-2023 Haiqing Zhao Modified comment(s), */
3952 /* resulting in version 6.3.0 */
3953 /* */
3954 /**************************************************************************/
_nxd_mqtt_client_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT keepalive,UINT clean_session,ULONG wait_option)3955 UINT _nxd_mqtt_client_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
3956 UINT keepalive, UINT clean_session, ULONG wait_option)
3957 {
3958 NX_PACKET *packet_ptr;
3959 UINT status;
3960 TX_THREAD *thread_ptr;
3961 UINT new_priority;
3962 UINT old_priority;
3963
3964
3965 /* Obtain the mutex. */
3966 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3967
3968 if (status != TX_SUCCESS)
3969 {
3970 #ifdef NX_SECURE_ENABLE
3971 if (client_ptr -> nxd_mqtt_client_use_tls)
3972 {
3973 nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
3974 }
3975 #endif /* NX_SECURE_ENABLE */
3976
3977 return(NXD_MQTT_MUTEX_FAILURE);
3978 }
3979
3980 /* Do nothing if the client is already connected. */
3981 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
3982 {
3983 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3984 return(NXD_MQTT_ALREADY_CONNECTED);
3985 }
3986
3987 /* Check if client is connecting. */
3988 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTING)
3989 {
3990 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3991 return(NXD_MQTT_CONNECTING);
3992 }
3993
3994 /* Client state must be in IDLE. */
3995 if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_IDLE)
3996 {
3997 #ifdef NX_SECURE_ENABLE
3998 if (client_ptr -> nxd_mqtt_client_use_tls)
3999 {
4000 nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
4001 }
4002 #endif /* NX_SECURE_ENABLE */
4003 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4004 return(NXD_MQTT_INVALID_STATE);
4005 }
4006
4007 #if defined(NX_SECURE_ENABLE) && defined(NXD_MQTT_REQUIRE_TLS)
4008 if (!client_ptr -> nxd_mqtt_client_use_tls)
4009 {
4010
4011 /* NXD_MQTT_REQUIRE_TLS is defined but the application does not use TLS.
4012 This is security violation. Return with failure code. */
4013 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4014 return(NXD_MQTT_CONNECT_FAILURE);
4015 }
4016 #endif /* NX_SECURE_ENABLE && NXD_MQTT_REQUIRE_TLS*/
4017
4018 /* Record the keepalive value, converted to TX timer ticks. */
4019 client_ptr -> nxd_mqtt_keepalive = keepalive * NX_IP_PERIODIC_RATE;
4020 if (keepalive)
4021 {
4022 client_ptr -> nxd_mqtt_timer_value = NXD_MQTT_KEEPALIVE_TIMER_RATE;
4023 client_ptr -> nxd_mqtt_ping_timeout = NXD_MQTT_PING_TIMEOUT_DELAY;
4024
4025 /* Create timer */
4026 tx_timer_create(&(client_ptr -> nxd_mqtt_timer), "MQTT Timer", _nxd_mqtt_periodic_timer_entry, (ULONG)client_ptr,
4027 client_ptr -> nxd_mqtt_timer_value, client_ptr -> nxd_mqtt_timer_value, TX_AUTO_ACTIVATE);
4028 }
4029 else
4030 {
4031 client_ptr -> nxd_mqtt_timer_value = 0;
4032 client_ptr -> nxd_mqtt_ping_timeout = 0;
4033 }
4034
4035 /* Record the clean session flag. */
4036 client_ptr -> nxd_mqtt_clean_session = clean_session;
4037
4038 /* Set TCP connection establish notify for non-blocking mode. */
4039 if (wait_option == 0)
4040 {
4041 nx_tcp_socket_establish_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_tcp_establish_notify);
4042
4043 /* Set the receive callback. */
4044 nx_tcp_socket_receive_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_receive_callback);
4045
4046 #ifdef NXD_MQTT_OVER_WEBSOCKET
4047 if (client_ptr -> nxd_mqtt_client_use_websocket)
4048 {
4049
4050 /* Set the websocket connection callback. */
4051 nx_websocket_client_connection_status_callback_set(&client_ptr -> nxd_mqtt_client_websocket, client_ptr, _nxd_mqtt_client_websocket_connection_status_callback);
4052 }
4053 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4054 }
4055 else
4056 {
4057
4058 /* Clean receive callback. */
4059 client_ptr -> nxd_mqtt_client_socket.nx_tcp_receive_callback = NX_NULL;
4060
4061 #ifdef NXD_MQTT_OVER_WEBSOCKET
4062 if (client_ptr -> nxd_mqtt_client_use_websocket)
4063 {
4064
4065 /* Clean the websocket connection callback. */
4066 nx_websocket_client_connection_status_callback_set(&client_ptr -> nxd_mqtt_client_websocket, NX_NULL, NX_NULL);
4067 }
4068 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4069 }
4070
4071 /* Release mutex */
4072 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4073
4074 /* First attempt to bind the client socket. */
4075 nx_tcp_client_socket_bind(&(client_ptr -> nxd_mqtt_client_socket), NX_ANY_PORT, wait_option);
4076
4077 /* Obtain the mutex. */
4078 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4079
4080 /* Make state as NXD_MQTT_CLIENT_STATE_CONNECTING. */
4081 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_CONNECTING;
4082
4083 /* Release mutex. */
4084 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4085
4086 /* Connect to the MQTT server */
4087 status = nxd_tcp_client_socket_connect(&(client_ptr -> nxd_mqtt_client_socket), server_ip, server_port, wait_option);
4088 if ((status != NX_SUCCESS) && (status != NX_IN_PROGRESS))
4089 {
4090
4091 /* Obtain the mutex. */
4092 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4093
4094 /* Make state as NXD_MQTT_CLIENT_STATE_IDLE. */
4095 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
4096
4097 /* Release mutex. */
4098 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4099
4100 #ifdef NX_SECURE_ENABLE
4101 if (client_ptr -> nxd_mqtt_client_use_tls)
4102 {
4103 nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
4104 }
4105 #endif /* NX_SECURE_ENABLE */
4106 nx_tcp_client_socket_unbind(&(client_ptr -> nxd_mqtt_client_socket));
4107 tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
4108 return(NXD_MQTT_CONNECT_FAILURE);
4109 }
4110
4111 /* Just return for non-blocking mode. */
4112 if (wait_option == 0)
4113 {
4114 return(NX_IN_PROGRESS);
4115 }
4116
4117 /* Increase priority to the same of internal thread to avoid out of order packet process. */
4118 #ifndef NXD_MQTT_CLOUD_ENABLE
4119 thread_ptr = &(client_ptr -> nxd_mqtt_thread);
4120 #else
4121 thread_ptr = &(client_ptr -> nxd_mqtt_client_cloud_ptr -> nx_cloud_thread);
4122 #endif /* NXD_MQTT_CLOUD_ENABLE */
4123 tx_thread_info_get(thread_ptr, NX_NULL, NX_NULL, NX_NULL,
4124 &new_priority, NX_NULL, NX_NULL, NX_NULL, NX_NULL);
4125 tx_thread_priority_change(tx_thread_identify(), new_priority, &old_priority);
4126
4127 /* If TLS is enabled, start TLS */
4128 #ifdef NX_SECURE_ENABLE
4129 if (client_ptr -> nxd_mqtt_client_use_tls)
4130 {
4131
4132 status = nx_secure_tls_session_start(&(client_ptr -> nxd_mqtt_tls_session), &(client_ptr -> nxd_mqtt_client_socket), wait_option);
4133
4134 if (status != NX_SUCCESS)
4135 {
4136
4137 /* Revert thread priority. */
4138 tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4139
4140 /* End connection. */
4141 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4142
4143 return(NXD_MQTT_CONNECT_FAILURE);
4144 }
4145 }
4146 #endif /* NX_SECURE_ENABLE */
4147
4148 #ifdef NXD_MQTT_OVER_WEBSOCKET
4149 if (client_ptr -> nxd_mqtt_client_use_websocket)
4150 {
4151 #ifdef NX_SECURE_ENABLE
4152 if (client_ptr -> nxd_mqtt_client_use_tls)
4153 {
4154 status = nx_websocket_client_secure_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_tls_session),
4155 client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
4156 client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
4157 (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
4158 wait_option);
4159 }
4160 else
4161 #endif /* NX_SECURE_ENABLE */
4162 {
4163 status = nx_websocket_client_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_client_socket),
4164 client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
4165 client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
4166 (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
4167 wait_option);
4168 }
4169
4170 if (status != NX_SUCCESS)
4171 {
4172
4173 /* Revert thread priority. */
4174 tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4175
4176 /* End connection. */
4177 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4178
4179 return(NXD_MQTT_CONNECT_FAILURE);
4180 }
4181 }
4182
4183 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4184
4185 /* Start to send connect packet. */
4186 status = _nxd_mqtt_client_connect_packet_send(client_ptr, wait_option);
4187
4188 if (status != NX_SUCCESS)
4189 {
4190
4191 /* Revert thread priority. */
4192 tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4193
4194 /* End connection. */
4195 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4196 return(NXD_MQTT_CONNECT_FAILURE);
4197 }
4198
4199 /* Call a receive. */
4200 status = _nxd_mqtt_packet_receive(client_ptr, &packet_ptr, wait_option);
4201
4202 /* Revert thread priority. */
4203 tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4204
4205 /* Check status. */
4206 if (status)
4207 {
4208
4209 /* End connection. */
4210 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4211 return(NXD_MQTT_COMMUNICATION_FAILURE);
4212 }
4213
4214 /* Process CONNACK message. */
4215 status = _nxd_mqtt_process_connack(client_ptr, packet_ptr, wait_option);
4216
4217 /* Release the packet. */
4218 nx_packet_release(packet_ptr);
4219
4220 /* Check status. */
4221 if (status == NX_SUCCESS)
4222 {
4223
4224 /* Set the receive callback. */
4225 nx_tcp_socket_receive_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_receive_callback);
4226 }
4227
4228 return(status);
4229 }
4230
4231 /**************************************************************************/
4232 /* */
4233 /* FUNCTION RELEASE */
4234 /* */
4235 /* _nxd_mqtt_client_connect_packet_send PORTABLE C */
4236 /* 6.3.0 */
4237 /* AUTHOR */
4238 /* */
4239 /* Yuxin Zhou, Microsoft Corporation */
4240 /* */
4241 /* DESCRIPTION */
4242 /* */
4243 /* This function sends CONNECT packet to MQTT server. */
4244 /* */
4245 /* */
4246 /* INPUT */
4247 /* */
4248 /* client_ptr Pointer to MQTT Client */
4249 /* keepalive Keepalive flag */
4250 /* clean_session Clean session flag */
4251 /* wait_option Timeout value */
4252 /* */
4253 /* */
4254 /* OUTPUT */
4255 /* */
4256 /* status Completion status */
4257 /* */
4258 /* CALLS */
4259 /* */
4260 /* nx_packet_release */
4261 /* _nxd_mqtt_client_packet_allocate */
4262 /* _nxd_mqtt_release_transmit_packet */
4263 /* _nxd_mqtt_client_connection_end */
4264 /* _nxd_mqtt_client_set_fixed_header */
4265 /* _nxd_mqtt_client_append_message */
4266 /* _nxd_mqtt_packet_send */
4267 /* */
4268 /* CALLED BY */
4269 /* */
4270 /* _nxd_mqtt_client_connect */
4271 /* _nxd_mqtt_tcp_establish_process */
4272 /* _nxd_mqtt_tls_establish_process */
4273 /* */
4274 /* RELEASE HISTORY */
4275 /* */
4276 /* DATE NAME DESCRIPTION */
4277 /* */
4278 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4279 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4280 /* resulting in version 6.1 */
4281 /* 07-29-2022 Spencer McDonough Modified comment(s), */
4282 /* improved internal logic, */
4283 /* resulting in version 6.1.12 */
4284 /* 10-31-2022 Bo Chen Modified comment(s), improved */
4285 /* the logic of sending packet,*/
4286 /* resulting in version 6.2.0 */
4287 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
4288 /* internal logic, */
4289 /* resulting in version 6.3.0 */
4290 /* */
4291 /**************************************************************************/
_nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)4292 UINT _nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
4293 {
4294 NX_PACKET *packet_ptr;
4295 UINT status;
4296 UINT length = 0;
4297 UCHAR connection_flags = 0;
4298 UINT ret = NXD_MQTT_SUCCESS;
4299 UCHAR temp_data[4];
4300 UINT keepalive = (client_ptr -> nxd_mqtt_keepalive/NX_IP_PERIODIC_RATE);
4301
4302
4303 /* Construct connect flags by taking the connect flag user supplies, or'ing the username and
4304 password bits, if they are supplied. */
4305 if (client_ptr -> nxd_mqtt_client_username)
4306 {
4307 connection_flags |= MQTT_CONNECT_FLAGS_USERNAME;
4308
4309 /* Add the password flag only if username is supplied. */
4310 if (client_ptr -> nxd_mqtt_client_password)
4311 {
4312 connection_flags |= MQTT_CONNECT_FLAGS_PASSWORD;
4313 }
4314 }
4315
4316 if (client_ptr -> nxd_mqtt_client_will_topic)
4317 {
4318 connection_flags = connection_flags | MQTT_CONNECT_FLAGS_WILL_FLAG;
4319
4320
4321 if (client_ptr -> nxd_mqtt_client_will_qos_retain & 0x80)
4322 {
4323 connection_flags = connection_flags | MQTT_CONNECT_FLAGS_WILL_RETAIN;
4324 }
4325
4326 connection_flags = (UCHAR)(connection_flags | ((client_ptr -> nxd_mqtt_client_will_qos_retain & 0x3) << 3));
4327 }
4328
4329 if (client_ptr -> nxd_mqtt_clean_session == NX_TRUE)
4330 {
4331 connection_flags = connection_flags | MQTT_CONNECT_FLAGS_CLEAN_SESSION;
4332
4333 /* Clear any transmit blocks from the previous session. */
4334 while (client_ptr -> message_transmit_queue_head)
4335 {
4336 _nxd_mqtt_release_transmit_packet(client_ptr, client_ptr -> message_transmit_queue_head, NX_NULL);
4337 }
4338 }
4339
4340 /* Set the length of the packet. */
4341 length = 10;
4342
4343 /* Add the size of the client Identifier. */
4344 length += (client_ptr -> nxd_mqtt_client_id_length + 2);
4345
4346 /* Add the will topic, if present. */
4347 if (connection_flags & MQTT_CONNECT_FLAGS_WILL_FLAG)
4348 {
4349 length += (client_ptr -> nxd_mqtt_client_will_topic_length + 2);
4350 length += (client_ptr -> nxd_mqtt_client_will_message_length + 2);
4351 }
4352 if (connection_flags & MQTT_CONNECT_FLAGS_USERNAME)
4353 {
4354 length += (UINT)(client_ptr -> nxd_mqtt_client_username_length + 2);
4355 }
4356 if (connection_flags & MQTT_CONNECT_FLAGS_PASSWORD)
4357 {
4358 length += (UINT)(client_ptr -> nxd_mqtt_client_password_length + 2);
4359 }
4360
4361 /* Check for invalid length. */
4362 if (length > (127 * 127 * 127 * 127))
4363 {
4364 return(NXD_MQTT_INTERNAL_ERROR);
4365 }
4366
4367 status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, wait_option);
4368
4369 if (status)
4370 {
4371 return(status);
4372 }
4373
4374 /* Construct MQTT CONNECT message. */
4375 temp_data[0] = ((MQTT_CONTROL_PACKET_TYPE_CONNECT << 4) & 0xF0);
4376
4377 /* Fill in fixed header. */
4378 ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, temp_data[0], length, wait_option);
4379
4380 if (ret)
4381 {
4382 /* Release the packet. */
4383 nx_packet_release(packet_ptr);
4384 return(NXD_MQTT_PACKET_POOL_FAILURE);
4385 }
4386
4387 /* Fill in protocol name. */
4388 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, "MQTT", 4, wait_option);
4389
4390 if (ret)
4391 {
4392 /* Release the packet. */
4393 nx_packet_release(packet_ptr);
4394 return(NXD_MQTT_PACKET_POOL_FAILURE);
4395 }
4396
4397 /* Fill in protocol level, */
4398 temp_data[0] = MQTT_PROTOCOL_LEVEL;
4399
4400 /* Fill in byte 8: connect flags */
4401 temp_data[1] = connection_flags;
4402
4403 /* Fill in byte 9 and 10: keep alive */
4404 temp_data[2] = (keepalive >> 8) & 0xFF;
4405 temp_data[3] = (keepalive & 0xFF);
4406
4407 ret = nx_packet_data_append(packet_ptr, temp_data, 4, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4408
4409 if (ret)
4410 {
4411
4412 /* Release the packet. */
4413 nx_packet_release(packet_ptr);
4414
4415 return(NXD_MQTT_PACKET_POOL_FAILURE);
4416 }
4417
4418 /* Fill in payload area, in the order of: client identifier, will topic, will message,
4419 user name, and password. */
4420 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_id,
4421 client_ptr -> nxd_mqtt_client_id_length, wait_option);
4422
4423 /* Next fill will topic and will message if the will flag is set. */
4424 if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_WILL_FLAG))
4425 {
4426 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, (CHAR *)client_ptr -> nxd_mqtt_client_will_topic,
4427 client_ptr -> nxd_mqtt_client_will_topic_length, wait_option);
4428
4429 if (!ret)
4430 {
4431 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, (CHAR *)client_ptr -> nxd_mqtt_client_will_message,
4432 client_ptr -> nxd_mqtt_client_will_message_length, wait_option);
4433 }
4434 }
4435
4436 /* Fill username if username flag is set */
4437 if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_USERNAME))
4438 {
4439 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_username,
4440 client_ptr -> nxd_mqtt_client_username_length, wait_option);
4441 }
4442
4443 /* Fill password if password flag is set */
4444 if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_PASSWORD))
4445 {
4446 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_password,
4447 client_ptr -> nxd_mqtt_client_password_length, wait_option);
4448 }
4449
4450 if (ret)
4451 {
4452
4453 /* Release the packet. */
4454 nx_packet_release(packet_ptr);
4455
4456 return(NXD_MQTT_PACKET_POOL_FAILURE);
4457 }
4458
4459 /* Ready to send the connect message to the server. */
4460 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
4461
4462 if (status)
4463 {
4464
4465 /* Release the packet. */
4466 nx_packet_release(packet_ptr);
4467 }
4468
4469 /* Update the timeout value. */
4470 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
4471
4472 return(status);
4473 }
4474
4475 /**************************************************************************/
4476 /* */
4477 /* FUNCTION RELEASE */
4478 /* */
4479 /* _nxd_mqtt_client_secure_connect PORTABLE C */
4480 /* 6.1 */
4481 /* AUTHOR */
4482 /* */
4483 /* Yuxin Zhou, Microsoft Corporation */
4484 /* */
4485 /* DESCRIPTION */
4486 /* */
4487 /* This function makes an initial secure (TLS) connection to */
4488 /* the MQTT server. */
4489 /* */
4490 /* */
4491 /* INPUT */
4492 /* */
4493 /* client_ptr Pointer to MQTT Client */
4494 /* server_ip Server IP address structure */
4495 /* server_port Server port number, in host */
4496 /* byte order */
4497 /* tls_setup User-supplied callback */
4498 /* function to set up TLS */
4499 /* parameters. */
4500 /* username User name, or NULL if user */
4501 /* name is not required */
4502 /* username_length Length of the user name, or */
4503 /* 0 if user name is NULL */
4504 /* password Password string, or NULL if */
4505 /* password is not required */
4506 /* password_length Length of the password, or */
4507 /* 0 if password is NULL */
4508 /* connection_flag Flag passed to the server */
4509 /* timeout Timeout value */
4510 /* */
4511 /* */
4512 /* OUTPUT */
4513 /* */
4514 /* status Completion status */
4515 /* */
4516 /* CALLS */
4517 /* */
4518 /* _nxd_mqtt_client_connect Actual MQTT Client connect */
4519 /* call */
4520 /* */
4521 /* CALLED BY */
4522 /* */
4523 /* Application Code */
4524 /* */
4525 /* RELEASE HISTORY */
4526 /* */
4527 /* DATE NAME DESCRIPTION */
4528 /* */
4529 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4530 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4531 /* resulting in version 6.1 */
4532 /* */
4533 /**************************************************************************/
4534 #ifdef NX_SECURE_ENABLE
_nxd_mqtt_client_secure_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT (* tls_setup)(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION *,NX_SECURE_X509_CERT *,NX_SECURE_X509_CERT *),UINT keepalive,UINT clean_session,ULONG wait_option)4535 UINT _nxd_mqtt_client_secure_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
4536 UINT (*tls_setup)(NXD_MQTT_CLIENT *client_ptr, NX_SECURE_TLS_SESSION *,
4537 NX_SECURE_X509_CERT *, NX_SECURE_X509_CERT *),
4538 UINT keepalive, UINT clean_session, ULONG wait_option)
4539 {
4540 UINT ret;
4541
4542 /* Set up TLS session information. */
4543 ret = (*tls_setup)(client_ptr, &client_ptr -> nxd_mqtt_tls_session,
4544 &client_ptr -> nxd_mqtt_tls_certificate,
4545 &client_ptr -> nxd_mqtt_tls_trusted_certificate);
4546
4547 if (ret)
4548 {
4549 return(ret);
4550 }
4551
4552 /* Mark the connection as secure. */
4553 client_ptr -> nxd_mqtt_client_use_tls = 1;
4554
4555 ret = _nxd_mqtt_client_connect(client_ptr, server_ip, server_port, keepalive, clean_session, wait_option);
4556
4557 return(ret);
4558 }
4559
4560 #endif /* NX_SECURE_ENABLE */
4561
4562
4563 /**************************************************************************/
4564 /* */
4565 /* FUNCTION RELEASE */
4566 /* */
4567 /* _nxd_mqtt_client_delete PORTABLE C */
4568 /* 6.2.0 */
4569 /* AUTHOR */
4570 /* */
4571 /* Yuxin Zhou, Microsoft Corporation */
4572 /* */
4573 /* DESCRIPTION */
4574 /* */
4575 /* This function deletes a previously created MQTT client instance. */
4576 /* If the NXD_MQTT_SOCKET_TIMEOUT is set to NX_WAIT_FOREVER, this may */
4577 /* suspend infinitely. This is because the MQTT Client must */
4578 /* disconnect with the server, and if the network link is disabled or */
4579 /* the server is not responding, this will blocks this function from */
4580 /* completing. */
4581 /* */
4582 /* INPUT */
4583 /* */
4584 /* client_ptr Pointer to MQTT Client */
4585 /* */
4586 /* OUTPUT */
4587 /* */
4588 /* status Completion status */
4589 /* */
4590 /* CALLS */
4591 /* */
4592 /* tx_event_flags_set */
4593 /* */
4594 /* CALLED BY */
4595 /* */
4596 /* Application Code */
4597 /* */
4598 /* RELEASE HISTORY */
4599 /* */
4600 /* DATE NAME DESCRIPTION */
4601 /* */
4602 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4603 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4604 /* resulting in version 6.1 */
4605 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
4606 /* mqtt over websocket, */
4607 /* resulting in version 6.2.0 */
4608 /* */
4609 /**************************************************************************/
_nxd_mqtt_client_delete(NXD_MQTT_CLIENT * client_ptr)4610 UINT _nxd_mqtt_client_delete(NXD_MQTT_CLIENT *client_ptr)
4611 {
4612
4613
4614 /* Set the event flag for DELETE. Next time when the MQTT client thread
4615 wakes up, it will perform the deletion process. */
4616 #ifndef NXD_MQTT_CLOUD_ENABLE
4617 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_DELETE_EVENT, TX_OR);
4618 #else
4619 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_DELETE_EVENT);
4620 #endif /* NXD_MQTT_CLOUD_ENABLE */
4621
4622 /* Check if the MQTT client thread has completed. */
4623 while((client_ptr -> nxd_mqtt_client_socket).nx_tcp_socket_id != 0)
4624 {
4625 tx_thread_sleep(NX_IP_PERIODIC_RATE);
4626 }
4627
4628 #ifndef NXD_MQTT_CLOUD_ENABLE
4629 /* Now we can delete the Client instance. */
4630 tx_thread_delete(&(client_ptr -> nxd_mqtt_thread));
4631 #else
4632 /* Deregister mqtt module from cloud helper. */
4633 nx_cloud_module_deregister(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module));
4634
4635 /* Delete own created cloud. */
4636 if (client_ptr -> nxd_mqtt_client_cloud.nx_cloud_id == NX_CLOUD_ID)
4637 nx_cloud_delete(&(client_ptr -> nxd_mqtt_client_cloud));
4638 #endif /* NXD_MQTT_CLOUD_ENABLE */
4639
4640 #ifdef NXD_MQTT_OVER_WEBSOCKET
4641 if (client_ptr -> nxd_mqtt_client_use_websocket)
4642 {
4643 nx_websocket_client_delete(&client_ptr -> nxd_mqtt_client_websocket);
4644 client_ptr -> nxd_mqtt_client_use_websocket = NX_FALSE;
4645 }
4646 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4647
4648 return(NXD_MQTT_SUCCESS);
4649 }
4650
4651 /**************************************************************************/
4652 /* */
4653 /* FUNCTION RELEASE */
4654 /* */
4655 /* _nxd_mqtt_client_publish_packet_send PORTABLE C */
4656 /* 6.2.0 */
4657 /* AUTHOR */
4658 /* */
4659 /* Yuxin Zhou, Microsoft Corporation */
4660 /* */
4661 /* DESCRIPTION */
4662 /* */
4663 /* This function sends a publish packet to the connected broker. */
4664 /* */
4665 /* */
4666 /* INPUT */
4667 /* */
4668 /* client_ptr Pointer to MQTT Client */
4669 /* packet_ptr Pointer to publish packet */
4670 /* packet_id Current packet ID */
4671 /* QoS Quality of service */
4672 /* wait_option Suspension option */
4673 /* */
4674 /* OUTPUT */
4675 /* */
4676 /* status Completion status */
4677 /* */
4678 /* CALLS */
4679 /* */
4680 /* tx_mutex_get */
4681 /* tx_mutex_put */
4682 /* nx_packet_release */
4683 /* _nxd_mqtt_copy_transmit_packet */
4684 /* _nxd_mqtt_packet_send */
4685 /* */
4686 /* CALLED BY */
4687 /* */
4688 /* _nxd_mqtt_client_publish */
4689 /* */
4690 /* RELEASE HISTORY */
4691 /* */
4692 /* DATE NAME DESCRIPTION */
4693 /* */
4694 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4695 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4696 /* resulting in version 6.1 */
4697 /* 08-02-2021 Yuxin Zhou Modified comment(s), */
4698 /* supported maximum transmit */
4699 /* queue depth, */
4700 /* resulting in version 6.1.8 */
4701 /* 10-31-2022 Bo Chen Modified comment(s), improved */
4702 /* the logic of sending packet,*/
4703 /* resulting in version 6.2.0 */
4704 /* */
4705 /**************************************************************************/
_nxd_mqtt_client_publish_packet_send(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,USHORT packet_id,UINT QoS,ULONG wait_option)4706 UINT _nxd_mqtt_client_publish_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr,
4707 USHORT packet_id, UINT QoS, ULONG wait_option)
4708 {
4709
4710 UINT status;
4711 UINT ret = NXD_MQTT_SUCCESS;
4712
4713 if (QoS != 0)
4714 {
4715 /* This packet needs to be stored locally for possible retransmission. */
4716 NX_PACKET *transmit_packet_ptr;
4717
4718 /* Copy packet for retransmission. */
4719 if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
4720 packet_id, NX_TRUE, wait_option))
4721 {
4722 return(NXD_MQTT_PACKET_POOL_FAILURE);
4723 }
4724
4725 /* Obtain the mutex. */
4726 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4727
4728 if (status != TX_SUCCESS)
4729 {
4730 nx_packet_release(transmit_packet_ptr);
4731
4732 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
4733 /* Decrease the transmit queue depth. */
4734 client_ptr -> message_transmit_queue_depth--;
4735 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
4736 return(NXD_MQTT_MUTEX_FAILURE);
4737 }
4738
4739 if (client_ptr -> message_transmit_queue_head == NX_NULL)
4740 {
4741 client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
4742 }
4743 else
4744 {
4745 client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
4746 }
4747 client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
4748 }
4749 else
4750 {
4751
4752 /* Obtain the mutex. */
4753 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4754
4755 if (status != TX_SUCCESS)
4756 {
4757 return(NXD_MQTT_MUTEX_FAILURE);
4758 }
4759 }
4760
4761 /* Update the timeout value. */
4762 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
4763
4764
4765 /* Release the mutex. */
4766 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4767
4768 /* Ready to send the connect message to the server. */
4769 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
4770
4771 if (status)
4772 {
4773 ret = NXD_MQTT_COMMUNICATION_FAILURE;
4774 }
4775
4776 return(ret);
4777 }
4778
4779 /**************************************************************************/
4780 /* */
4781 /* FUNCTION RELEASE */
4782 /* */
4783 /* _nxd_mqtt_client_publish PORTABLE C */
4784 /* 6.3.0 */
4785 /* AUTHOR */
4786 /* */
4787 /* Yuxin Zhou, Microsoft Corporation */
4788 /* */
4789 /* DESCRIPTION */
4790 /* */
4791 /* This function publishes a message to the connected broker. */
4792 /* */
4793 /* */
4794 /* INPUT */
4795 /* */
4796 /* client_ptr Pointer to MQTT Client */
4797 /* topic_name Name of the topic */
4798 /* topic_name_length Length of the topic name */
4799 /* message Message string */
4800 /* message_length Length of the message, */
4801 /* in bytes */
4802 /* retain The retain flag, whether */
4803 /* or not the broker should */
4804 /* store this message */
4805 /* QoS Expected QoS level */
4806 /* wait_option Suspension option */
4807 /* */
4808 /* OUTPUT */
4809 /* */
4810 /* status Completion status */
4811 /* */
4812 /* CALLS */
4813 /* */
4814 /* tx_mutex_get */
4815 /* _nxd_mqtt_client_packet_allocate */
4816 /* _nxd_mqtt_client_set_fixed_header */
4817 /* _nxd_mqtt_client_append_message */
4818 /* tx_mutex_put */
4819 /* nx_packet_release */
4820 /* _nxd_mqtt_client_publish_packet_send */
4821 /* */
4822 /* CALLED BY */
4823 /* */
4824 /* Application Code */
4825 /* */
4826 /* RELEASE HISTORY */
4827 /* */
4828 /* DATE NAME DESCRIPTION */
4829 /* */
4830 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4831 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4832 /* resulting in version 6.1 */
4833 /* 07-29-2022 Spencer McDonough Modified comment(s), */
4834 /* improved internal logic, */
4835 /* resulting in version 6.1.12 */
4836 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
4837 /* internal logic, */
4838 /* resulting in version 6.3.0 */
4839 /* */
4840 /**************************************************************************/
_nxd_mqtt_client_publish(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,CHAR * message,UINT message_length,UINT retain,UINT QoS,ULONG wait_option)4841 UINT _nxd_mqtt_client_publish(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length,
4842 CHAR *message, UINT message_length, UINT retain, UINT QoS, ULONG wait_option)
4843 {
4844
4845 NX_PACKET *packet_ptr;
4846 UINT status;
4847 UINT length = 0;
4848 UCHAR flags;
4849 USHORT packet_id = 0;
4850 UINT ret = NXD_MQTT_SUCCESS;
4851
4852 if (QoS == 2)
4853 {
4854 return(NXD_MQTT_QOS2_NOT_SUPPORTED);
4855 }
4856
4857
4858 /* Do nothing if the client is already connected. */
4859 if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTED)
4860 {
4861 return(NXD_MQTT_NOT_CONNECTED);
4862 }
4863
4864 status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, wait_option);
4865
4866 if (status != NXD_MQTT_SUCCESS)
4867 {
4868 return(NXD_MQTT_PACKET_POOL_FAILURE);
4869 }
4870
4871 flags = (UCHAR)((MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4) | (QoS << 1));
4872
4873 if (retain)
4874 {
4875 flags = flags | MQTT_PUBLISH_RETAIN;
4876 }
4877
4878
4879 /* Compute the remaining length. */
4880
4881 /* Topic Name. */
4882 /* Compute Topic Name length. */
4883 length = topic_name_length + 2;
4884
4885 /* Count packet ID for QoS 1 or QoS 2 message. */
4886 if ((QoS == 1) || (QoS == 2))
4887 {
4888 length += 2;
4889 }
4890
4891 /* Count message. */
4892 if ((message != NX_NULL) && (message_length != 0))
4893 {
4894 length += message_length;
4895 }
4896
4897 /* Write out the control header and remaining length field. */
4898 ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, flags, length, wait_option);
4899
4900 if (ret)
4901 {
4902
4903 /* Release the packet. */
4904 nx_packet_release(packet_ptr);
4905
4906 return(NXD_MQTT_INTERNAL_ERROR);
4907 }
4908
4909
4910 /* Write out topic */
4911 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, topic_name, topic_name_length, wait_option);
4912
4913 if (ret)
4914 {
4915
4916 /* Release the packet. */
4917 nx_packet_release(packet_ptr);
4918
4919 return(NXD_MQTT_INTERNAL_ERROR);
4920 }
4921
4922 /* Append Packet Identifier for QoS level 1 or 2 MQTT 3.3.2.2 */
4923 if ((QoS == 1) || (QoS == 2))
4924 {
4925 UCHAR identifier[2];
4926
4927 /* Obtain the mutex. */
4928 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4929
4930 if (status != TX_SUCCESS)
4931 {
4932 return(NXD_MQTT_MUTEX_FAILURE);
4933 }
4934
4935 packet_id = (USHORT)client_ptr -> nxd_mqtt_client_packet_identifier;
4936 identifier[0] = (UCHAR)(client_ptr -> nxd_mqtt_client_packet_identifier >> 8);
4937 identifier[1] = (client_ptr -> nxd_mqtt_client_packet_identifier & 0xFF);
4938
4939 /* Update packet id. */
4940 client_ptr -> nxd_mqtt_client_packet_identifier = (client_ptr -> nxd_mqtt_client_packet_identifier + 1) & 0xFFFF;
4941
4942 /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
4943 if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
4944 client_ptr -> nxd_mqtt_client_packet_identifier = 1;
4945
4946 /* Release the mutex. */
4947 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4948
4949 ret = nx_packet_data_append(packet_ptr, identifier, 2,
4950 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4951
4952 if (ret)
4953 {
4954
4955 /* Release the packet. */
4956 nx_packet_release(packet_ptr);
4957
4958 return(NXD_MQTT_INTERNAL_ERROR);
4959 }
4960 }
4961
4962 /* Append message. */
4963 if ((message != NX_NULL) && (message_length) != 0)
4964 {
4965
4966 /* Use nx_packet_data_append to move user-supplied message data into the packet.
4967 nx_packet_data_append uses chained packet if the additional storage space is
4968 needed. */
4969 ret = nx_packet_data_append(packet_ptr, message, message_length,
4970 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4971 if(ret)
4972 {
4973 /* Unable to obtain a new packet to store the message. */
4974
4975 /* Release the packet. */
4976 nx_packet_release(packet_ptr);
4977
4978 return(NXD_MQTT_INTERNAL_ERROR);
4979 }
4980 }
4981
4982 /* Send publish packet. */
4983 ret = _nxd_mqtt_client_publish_packet_send(client_ptr, packet_ptr, packet_id, QoS, wait_option);
4984
4985 if (ret)
4986 {
4987
4988 /* Release the packet. */
4989 nx_packet_release(packet_ptr);
4990 }
4991 return(ret);
4992 }
4993
4994 /**************************************************************************/
4995 /* */
4996 /* FUNCTION RELEASE */
4997 /* */
4998 /* _nxd_mqtt_client_subscribe PORTABLE C */
4999 /* 6.1.2 */
5000 /* AUTHOR */
5001 /* */
5002 /* Yuxin Zhou, Microsoft Corporation */
5003 /* */
5004 /* DESCRIPTION */
5005 /* */
5006 /* This function sends a subscribe message to the broker. */
5007 /* */
5008 /* */
5009 /* INPUT */
5010 /* */
5011 /* client_ptr Pointer to MQTT Client */
5012 /* topic_name Pointer to the topic string */
5013 /* to subscribe to */
5014 /* topic_name_length Length of the topic string */
5015 /* in bytes */
5016 /* QoS Expected QoS level */
5017 /* */
5018 /* OUTPUT */
5019 /* */
5020 /* status Completion status */
5021 /* */
5022 /* CALLS */
5023 /* */
5024 /* _nxd_mqtt_client_sub_unsub The actual routine that */
5025 /* performs the sub/unsub */
5026 /* action. */
5027 /* */
5028 /* CALLED BY */
5029 /* */
5030 /* Application Code */
5031 /* */
5032 /* RELEASE HISTORY */
5033 /* */
5034 /* DATE NAME DESCRIPTION */
5035 /* */
5036 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5037 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5038 /* resulting in version 6.1 */
5039 /* 11-09-2020 Yuxin Zhou Modified comment(s), and */
5040 /* added packet id parameter, */
5041 /* resulting in version 6.1.2 */
5042 /* */
5043 /**************************************************************************/
_nxd_mqtt_client_subscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,UINT QoS)5044 UINT _nxd_mqtt_client_subscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length, UINT QoS)
5045 {
5046
5047 if (QoS == 2)
5048 {
5049 return(NXD_MQTT_QOS2_NOT_SUPPORTED);
5050 }
5051
5052 return(_nxd_mqtt_client_sub_unsub(client_ptr, (MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02,
5053 topic_name, topic_name_length, NX_NULL, QoS));
5054 }
5055
5056
5057
5058
5059 /**************************************************************************/
5060 /* */
5061 /* FUNCTION RELEASE */
5062 /* */
5063 /* _nxd_mqtt_client_unsubscribe PORTABLE C */
5064 /* 6.1.2 */
5065 /* AUTHOR */
5066 /* */
5067 /* Yuxin Zhou, Microsoft Corporation */
5068 /* */
5069 /* DESCRIPTION */
5070 /* */
5071 /* This function unsubscribes a topic from the broker. */
5072 /* */
5073 /* */
5074 /* INPUT */
5075 /* */
5076 /* client_ptr Pointer to MQTT Client */
5077 /* topic_name Pointer to the topic string */
5078 /* to subscribe to */
5079 /* topic_name_length Length of the topic string */
5080 /* in bytes */
5081 /* */
5082 /* OUTPUT */
5083 /* */
5084 /* status Completion status */
5085 /* */
5086 /* CALLS */
5087 /* */
5088 /* _nxd_mqtt_client_sub_unsub */
5089 /* */
5090 /* CALLED BY */
5091 /* */
5092 /* Application Code */
5093 /* */
5094 /* RELEASE HISTORY */
5095 /* */
5096 /* DATE NAME DESCRIPTION */
5097 /* */
5098 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5099 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5100 /* resulting in version 6.1 */
5101 /* 11-09-2020 Yuxin Zhou Modified comment(s), and */
5102 /* added packet id parameter, */
5103 /* resulting in version 6.1.2 */
5104 /* */
5105 /**************************************************************************/
_nxd_mqtt_client_unsubscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length)5106 UINT _nxd_mqtt_client_unsubscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length)
5107 {
5108 return(_nxd_mqtt_client_sub_unsub(client_ptr, (MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE << 4) | 0x02,
5109 topic_name, topic_name_length, NX_NULL, 0));
5110 }
5111
5112 /**************************************************************************/
5113 /* */
5114 /* FUNCTION RELEASE */
5115 /* */
5116 /* _nxd_mqtt_send_simple_message PORTABLE C */
5117 /* 6.3.0 */
5118 /* AUTHOR */
5119 /* */
5120 /* Yuxin Zhou, Microsoft Corporation */
5121 /* */
5122 /* DESCRIPTION */
5123 /* */
5124 /* This internal function handles the transmission of PINGREQ or */
5125 /* DISCONNECT message. */
5126 /* */
5127 /* */
5128 /* INPUT */
5129 /* */
5130 /* client_ptr Pointer to MQTT Client */
5131 /* header_value Value to be programmed into */
5132 /* MQTT header. */
5133 /* */
5134 /* OUTPUT */
5135 /* */
5136 /* status Completion status */
5137 /* */
5138 /* CALLS */
5139 /* */
5140 /* tx_mutex_get */
5141 /* _nxd_mqtt_client_packet_allocate */
5142 /* tx_mutex_put */
5143 /* _nxd_mqtt_packet_send */
5144 /* nx_packet_release */
5145 /* */
5146 /* CALLED BY */
5147 /* */
5148 /* _nxd_mqtt_client_ping */
5149 /* _nxd_mqtt_client_disconnect */
5150 /* */
5151 /* RELEASE HISTORY */
5152 /* */
5153 /* DATE NAME DESCRIPTION */
5154 /* */
5155 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5156 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5157 /* resulting in version 6.1 */
5158 /* 07-29-2022 Spencer McDonough Modified comment(s), */
5159 /* improved internal logic, */
5160 /* resulting in version 6.1.12 */
5161 /* 10-31-2022 Bo Chen Modified comment(s), improved */
5162 /* the logic of sending packet,*/
5163 /* resulting in version 6.2.0 */
5164 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
5165 /* internal logic, */
5166 /* resulting in version 6.3.0 */
5167 /* */
5168 /**************************************************************************/
_nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT * client_ptr,UCHAR header_value)5169 static UINT _nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT *client_ptr, UCHAR header_value)
5170 {
5171
5172 NX_PACKET *packet_ptr;
5173 UINT status;
5174 UINT status_mutex;
5175 UCHAR *byte;
5176
5177 status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
5178 if (status)
5179 {
5180 return(NXD_MQTT_INTERNAL_ERROR);
5181 }
5182
5183 if (2u > ((ULONG)(packet_ptr -> nx_packet_data_end) - (ULONG)(packet_ptr -> nx_packet_append_ptr)))
5184 {
5185 nx_packet_release(packet_ptr);
5186
5187 /* Packet buffer is too small to hold the message. */
5188 return(NX_SIZE_ERROR);
5189 }
5190
5191 byte = packet_ptr -> nx_packet_prepend_ptr;
5192
5193 *byte = (UCHAR)(header_value << 4);
5194 *(byte + 1) = 0;
5195
5196 packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + 2;
5197 packet_ptr -> nx_packet_length = 2;
5198
5199
5200 /* Release MQTT protection before making NetX/TLS calls. */
5201 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5202
5203 /* Send packet to server. */
5204 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
5205
5206 status_mutex = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5207 if (status)
5208 {
5209
5210 /* Release the packet. */
5211 nx_packet_release(packet_ptr);
5212
5213 status = NXD_MQTT_COMMUNICATION_FAILURE;
5214 }
5215 if (status_mutex)
5216 {
5217 return(NXD_MQTT_MUTEX_FAILURE);
5218 }
5219
5220 if (header_value == MQTT_CONTROL_PACKET_TYPE_PINGREQ)
5221 {
5222 /* Do not update the ping sent time if the outstanding ping has not been responded yet */
5223 if (client_ptr -> nxd_mqtt_ping_not_responded != 1)
5224 {
5225 /* Record the time we send out the PINGREG */
5226 client_ptr -> nxd_mqtt_ping_sent_time = tx_time_get();
5227 client_ptr -> nxd_mqtt_ping_not_responded = 1;
5228 }
5229 }
5230
5231 /* Update the timeout value. */
5232 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
5233
5234 return(status);
5235 }
5236
5237
5238 /**************************************************************************/
5239 /* */
5240 /* FUNCTION RELEASE */
5241 /* */
5242 /* _nxd_mqtt_client_disconnect PORTABLE C */
5243 /* 6.1 */
5244 /* AUTHOR */
5245 /* */
5246 /* Yuxin Zhou, Microsoft Corporation */
5247 /* */
5248 /* DESCRIPTION */
5249 /* */
5250 /* This function disconnects the MQTT client from a server. */
5251 /* */
5252 /* */
5253 /* INPUT */
5254 /* */
5255 /* client_ptr Pointer to MQTT Client */
5256 /* */
5257 /* OUTPUT */
5258 /* */
5259 /* status Completion status */
5260 /* */
5261 /* CALLS */
5262 /* */
5263 /* _nxd_mqtt_send_simple_message */
5264 /* _nxd_mqtt_process_disconnect */
5265 /* */
5266 /* CALLED BY */
5267 /* */
5268 /* Application Code */
5269 /* */
5270 /* RELEASE HISTORY */
5271 /* */
5272 /* DATE NAME DESCRIPTION */
5273 /* */
5274 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5275 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5276 /* resulting in version 6.1 */
5277 /* */
5278 /**************************************************************************/
_nxd_mqtt_client_disconnect(NXD_MQTT_CLIENT * client_ptr)5279 UINT _nxd_mqtt_client_disconnect(NXD_MQTT_CLIENT *client_ptr)
5280 {
5281 UINT status;
5282
5283 /* Obtain the mutex. */
5284 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
5285 if (status != TX_SUCCESS)
5286 {
5287 /* Disable timer if timer has been started. */
5288 if (client_ptr -> nxd_mqtt_keepalive)
5289 {
5290 tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
5291 }
5292
5293 return(NXD_MQTT_MUTEX_FAILURE);
5294 }
5295
5296 /* Let the server know we are ending the MQTT session. */
5297 _nxd_mqtt_send_simple_message(client_ptr, MQTT_CONTROL_PACKET_TYPE_DISCONNECT);
5298
5299 /* Call the disconnect routine to disconnect the socket,
5300 release transmit packets, release received packets,
5301 and delete the client timer. */
5302 _nxd_mqtt_process_disconnect(client_ptr);
5303
5304 /* Release the mutex. */
5305 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5306
5307 return(status);
5308 }
5309
5310 /**************************************************************************/
5311 /* */
5312 /* FUNCTION RELEASE */
5313 /* */
5314 /* _nxd_mqtt_client_receive_notify_set PORTABLE C */
5315 /* 6.1 */
5316 /* AUTHOR */
5317 /* */
5318 /* Yuxin Zhou, Microsoft Corporation */
5319 /* */
5320 /* DESCRIPTION */
5321 /* */
5322 /* This function installs the MQTT client publish notify callback */
5323 /* function. */
5324 /* */
5325 /* */
5326 /* INPUT */
5327 /* */
5328 /* client_ptr Pointer to MQTT Client */
5329 /* mqtt_client_receive_notify User-supplied callback */
5330 /* function, which is invoked */
5331 /* upon receiving a publish */
5332 /* message. */
5333 /* */
5334 /* OUTPUT */
5335 /* */
5336 /* status Completion status */
5337 /* */
5338 /* CALLS */
5339 /* */
5340 /* tx_mutex_get */
5341 /* tx_mutex_put */
5342 /* */
5343 /* CALLED BY */
5344 /* */
5345 /* Application Code */
5346 /* */
5347 /* RELEASE HISTORY */
5348 /* */
5349 /* DATE NAME DESCRIPTION */
5350 /* */
5351 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5352 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5353 /* resulting in version 6.1 */
5354 /* */
5355 /**************************************************************************/
_nxd_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* receive_notify)(NXD_MQTT_CLIENT * client_ptr,UINT message_count))5356 UINT _nxd_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT *client_ptr,
5357 VOID (*receive_notify)(NXD_MQTT_CLIENT *client_ptr, UINT message_count))
5358 {
5359
5360 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5361
5362 client_ptr -> nxd_mqtt_client_receive_notify = receive_notify;
5363
5364 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5365
5366 return(NXD_MQTT_SUCCESS);
5367 }
5368
5369 /**************************************************************************/
5370 /* */
5371 /* FUNCTION RELEASE */
5372 /* */
5373 /* _nxd_mqtt_client_message_get PORTABLE C */
5374 /* 6.1 */
5375 /* AUTHOR */
5376 /* */
5377 /* Yuxin Zhou, Microsoft Corporation */
5378 /* */
5379 /* DESCRIPTION */
5380 /* */
5381 /* This function retrieves a published MQTT message. */
5382 /* */
5383 /* */
5384 /* INPUT */
5385 /* */
5386 /* client_ptr Pointer to MQTT Client */
5387 /* topic_buffer Pointer to the topic buffer */
5388 /* where topic is copied to */
5389 /* topic_buffer_size Size of the topic buffer. */
5390 /* actual_topic_length Number of bytes copied into */
5391 /* topic_buffer */
5392 /* message_buffer Pointer to the buffer where */
5393 /* message is copied to */
5394 /* message_buffer_size Size of the message_buffer */
5395 /* actual_message_length Number of bytes copied into */
5396 /* the message buffer. */
5397 /* */
5398 /* OUTPUT */
5399 /* */
5400 /* status Completion status */
5401 /* */
5402 /* CALLS */
5403 /* */
5404 /* _nxd_mqtt_client_disconnect Actual MQTT Client disconnect */
5405 /* call */
5406 /* _nxd_mqtt_read_remaining_length Skip the remaining length */
5407 /* field */
5408 /* tx_mutex_get */
5409 /* tx_mutex_put */
5410 /* */
5411 /* CALLED BY */
5412 /* */
5413 /* Application Code */
5414 /* */
5415 /* RELEASE HISTORY */
5416 /* */
5417 /* DATE NAME DESCRIPTION */
5418 /* */
5419 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5420 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
5421 /* fixed uninitialized value, */
5422 /* resulting in version 6.1 */
5423 /* */
5424 /**************************************************************************/
_nxd_mqtt_client_message_get(NXD_MQTT_CLIENT * client_ptr,UCHAR * topic_buffer,UINT topic_buffer_size,UINT * actual_topic_length,UCHAR * message_buffer,UINT message_buffer_size,UINT * actual_message_length)5425 UINT _nxd_mqtt_client_message_get(NXD_MQTT_CLIENT *client_ptr, UCHAR *topic_buffer, UINT topic_buffer_size, UINT *actual_topic_length,
5426 UCHAR *message_buffer, UINT message_buffer_size, UINT *actual_message_length)
5427 {
5428
5429 UINT status;
5430 NX_PACKET *packet_ptr;
5431 ULONG topic_offset;
5432 USHORT topic_length;
5433 ULONG message_offset;
5434 ULONG message_length;
5435
5436 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5437 while (client_ptr -> message_receive_queue_depth)
5438 {
5439 packet_ptr = client_ptr -> message_receive_queue_head;
5440 status = _nxd_mqtt_process_publish_packet(packet_ptr, &topic_offset, &topic_length, &message_offset, &message_length);
5441 if (status == NXD_MQTT_SUCCESS)
5442 {
5443 if ((topic_buffer_size < topic_length) ||
5444 (message_buffer_size < message_length))
5445 {
5446 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5447 return(NXD_MQTT_INSUFFICIENT_BUFFER_SPACE);
5448 }
5449 }
5450
5451 client_ptr -> message_receive_queue_head = packet_ptr -> nx_packet_queue_next;
5452 if (client_ptr -> message_receive_queue_tail == packet_ptr)
5453 {
5454 client_ptr -> message_receive_queue_tail = NX_NULL;
5455 }
5456 client_ptr -> message_receive_queue_depth--;
5457
5458 if (status == NXD_MQTT_SUCCESS)
5459 {
5460
5461 /* Set topic and message lengths to avoid uninitialized value. */
5462 *actual_topic_length = 0;
5463 *actual_message_length = 0;
5464 nx_packet_data_extract_offset(packet_ptr, topic_offset, topic_buffer,
5465 topic_length, (ULONG *)actual_topic_length);
5466 nx_packet_data_extract_offset(packet_ptr, message_offset, message_buffer,
5467 message_length, (ULONG *)actual_message_length);
5468 nx_packet_release(packet_ptr);
5469
5470 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5471 return(NXD_MQTT_SUCCESS);
5472 }
5473 nx_packet_release(packet_ptr);
5474 }
5475 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5476 return(NXD_MQTT_NO_MESSAGE);
5477 }
5478
5479 /**************************************************************************/
5480 /* */
5481 /* FUNCTION RELEASE */
5482 /* */
5483 /* _nxde_mqtt_client_create PORTABLE C */
5484 /* 6.1 */
5485 /* AUTHOR */
5486 /* */
5487 /* Yuxin Zhou, Microsoft Corporation */
5488 /* */
5489 /* DESCRIPTION */
5490 /* */
5491 /* This function checks for errors in nxd_mqt_client_create call. */
5492 /* */
5493 /* */
5494 /* INPUT */
5495 /* */
5496 /* client_ptr Pointer to MQTT Client */
5497 /* client_name Name string used in by the */
5498 /* client */
5499 /* client_id Client ID used by the client */
5500 /* client_id_length Length of Client ID, in bytes */
5501 /* ip_ptr Pointer to IP instance */
5502 /* pool_ptr Pointer to packet pool */
5503 /* stack_ptr Client thread's stack pointer */
5504 /* stack_size Client thread's stack size */
5505 /* mqtt_thread_priority Priority for MQTT thread */
5506 /* memory_ptr Deprecated and not used */
5507 /* memory_size Deprecated and not used */
5508 /* */
5509 /* OUTPUT */
5510 /* */
5511 /* status Completion status */
5512 /* */
5513 /* CALLS */
5514 /* */
5515 /* _nxd_mqtt_client_create Actual client create call */
5516 /* */
5517 /* CALLED BY */
5518 /* */
5519 /* Application Code */
5520 /* */
5521 /* RELEASE HISTORY */
5522 /* */
5523 /* DATE NAME DESCRIPTION */
5524 /* */
5525 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5526 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5527 /* resulting in version 6.1 */
5528 /* */
5529 /**************************************************************************/
_nxde_mqtt_client_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority,VOID * memory_ptr,ULONG memory_size)5530 UINT _nxde_mqtt_client_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name, CHAR *client_id, UINT client_id_length,
5531 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
5532 VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority,
5533 VOID *memory_ptr, ULONG memory_size)
5534 {
5535
5536
5537 /* Check for invalid input pointers. */
5538 if ((client_ptr == NX_NULL) || (ip_ptr == NX_NULL) || (ip_ptr -> nx_ip_id != NX_IP_ID) ||
5539 (stack_ptr == NX_NULL) || (stack_size == 0) || (pool_ptr == NX_NULL))
5540 {
5541 return(NX_PTR_ERROR);
5542 }
5543
5544 return(_nxd_mqtt_client_create(client_ptr, client_name, client_id, client_id_length, ip_ptr,
5545 pool_ptr, stack_ptr, stack_size, mqtt_thread_priority,
5546 memory_ptr, memory_size));
5547 }
5548
5549
5550 /**************************************************************************/
5551 /* */
5552 /* FUNCTION RELEASE */
5553 /* */
5554 /* _nxde_mqtt_client_connect PORTABLE C */
5555 /* 6.1 */
5556 /* AUTHOR */
5557 /* */
5558 /* Yuxin Zhou, Microsoft Corporation */
5559 /* */
5560 /* DESCRIPTION */
5561 /* */
5562 /* This function checks for errors in MQTT client stop call. */
5563 /* */
5564 /* */
5565 /* INPUT */
5566 /* */
5567 /* client_ptr Pointer to MQTT Client */
5568 /* server_ip Server IP address structure */
5569 /* server_port Server port number, in host */
5570 /* byte order */
5571 /* keepalive The MQTT keepalive timer */
5572 /* clean_session Clean session flag */
5573 /* wait_option Suspension option */
5574 /* */
5575 /* */
5576 /* OUTPUT */
5577 /* */
5578 /* status Completion status */
5579 /* */
5580 /* CALLS */
5581 /* */
5582 /* _nxd_mqtt_client_connect Actual MQTT Client connect */
5583 /* call */
5584 /* */
5585 /* CALLED BY */
5586 /* */
5587 /* Application Code */
5588 /* */
5589 /* RELEASE HISTORY */
5590 /* */
5591 /* DATE NAME DESCRIPTION */
5592 /* */
5593 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5594 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
5595 /* corrected mqtt client state,*/
5596 /* resulting in version 6.1 */
5597 /* */
5598 /**************************************************************************/
_nxde_mqtt_client_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT keepalive,UINT clean_session,ULONG wait_option)5599 UINT _nxde_mqtt_client_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
5600 UINT keepalive, UINT clean_session, ULONG wait_option)
5601 {
5602
5603 UINT status;
5604
5605 if (client_ptr == NX_NULL)
5606 {
5607 return(NX_PTR_ERROR);
5608 }
5609
5610 if (server_ip == NX_NULL)
5611 {
5612 return(NX_PTR_ERROR);
5613 }
5614
5615 /* Test for IP version flag. */
5616 if ((server_ip -> nxd_ip_version != 4) && (server_ip -> nxd_ip_version != 6))
5617 {
5618 return(NXD_MQTT_INVALID_PARAMETER);
5619 }
5620
5621 if (server_port == 0)
5622 {
5623 return(NX_INVALID_PORT);
5624 }
5625
5626 status = _nxd_mqtt_client_connect(client_ptr, server_ip, server_port, keepalive, clean_session, wait_option);
5627
5628 return(status);
5629 }
5630
5631 /**************************************************************************/
5632 /* */
5633 /* FUNCTION RELEASE */
5634 /* */
5635 /* _nxde_mqtt_client_secure_connect PORTABLE C */
5636 /* 6.1 */
5637 /* AUTHOR */
5638 /* */
5639 /* Yuxin Zhou, Microsoft Corporation */
5640 /* */
5641 /* DESCRIPTION */
5642 /* */
5643 /* This function checks for errors in MQTT client TLS secure connect. */
5644 /* */
5645 /* */
5646 /* INPUT */
5647 /* */
5648 /* client_ptr Pointer to MQTT Client */
5649 /* server_ip Server IP address structure */
5650 /* server_port Server port number, in host */
5651 /* byte order */
5652 /* tls_setup User-supplied callback */
5653 /* function to set up TLS */
5654 /* parameters. */
5655 /* keepalive The MQTT keepalive timer */
5656 /* wait_option Suspension option */
5657 /* */
5658 /* */
5659 /* OUTPUT */
5660 /* */
5661 /* status Completion status */
5662 /* */
5663 /* CALLS */
5664 /* */
5665 /* _nxd_mqtt_client_connect Actual MQTT Client connect */
5666 /* call */
5667 /* */
5668 /* CALLED BY */
5669 /* */
5670 /* Application Code */
5671 /* */
5672 /* RELEASE HISTORY */
5673 /* */
5674 /* DATE NAME DESCRIPTION */
5675 /* */
5676 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5677 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
5678 /* corrected mqtt client state,*/
5679 /* resulting in version 6.1 */
5680 /* */
5681 /**************************************************************************/
5682 #ifdef NX_SECURE_ENABLE
5683
_nxde_mqtt_client_secure_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT (* tls_setup)(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION *,NX_SECURE_X509_CERT *,NX_SECURE_X509_CERT *),UINT keepalive,UINT clean_session,ULONG wait_option)5684 UINT _nxde_mqtt_client_secure_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
5685 UINT (*tls_setup)(NXD_MQTT_CLIENT *client_ptr, NX_SECURE_TLS_SESSION *,
5686 NX_SECURE_X509_CERT *, NX_SECURE_X509_CERT *),
5687 UINT keepalive, UINT clean_session, ULONG wait_option)
5688 {
5689
5690 UINT status;
5691
5692 if (client_ptr == NX_NULL)
5693 {
5694 return(NX_PTR_ERROR);
5695 }
5696
5697 if (server_ip == NX_NULL)
5698 {
5699 return(NX_PTR_ERROR);
5700 }
5701
5702 if (server_port == 0)
5703 {
5704 return(NX_INVALID_PORT);
5705 }
5706
5707 status = _nxd_mqtt_client_secure_connect(client_ptr, server_ip, server_port, tls_setup,
5708 keepalive, clean_session, wait_option);
5709
5710 return(status);
5711 }
5712 #endif /* NX_SECURE_ENABLE */
5713
5714
5715 /**************************************************************************/
5716 /* */
5717 /* FUNCTION RELEASE */
5718 /* */
5719 /* _nxde_mqtt_client_delete PORTABLE C */
5720 /* 6.1 */
5721 /* AUTHOR */
5722 /* */
5723 /* Yuxin Zhou, Microsoft Corporation */
5724 /* */
5725 /* DESCRIPTION */
5726 /* */
5727 /* This function checks for errors in MQTT client delete call. */
5728 /* */
5729 /* */
5730 /* INPUT */
5731 /* */
5732 /* client_ptr Pointer to MQTT Client */
5733 /* */
5734 /* OUTPUT */
5735 /* */
5736 /* status Completion status */
5737 /* */
5738 /* CALLS */
5739 /* */
5740 /* _nxd_mqtt_client_delete Actual MQTT Client delete */
5741 /* call. */
5742 /* */
5743 /* CALLED BY */
5744 /* */
5745 /* Application Code */
5746 /* */
5747 /* RELEASE HISTORY */
5748 /* */
5749 /* DATE NAME DESCRIPTION */
5750 /* */
5751 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5752 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5753 /* resulting in version 6.1 */
5754 /* */
5755 /**************************************************************************/
_nxde_mqtt_client_delete(NXD_MQTT_CLIENT * client_ptr)5756 UINT _nxde_mqtt_client_delete(NXD_MQTT_CLIENT *client_ptr)
5757 {
5758
5759 UINT status;
5760
5761 if (client_ptr == NX_NULL)
5762 {
5763 return(NX_PTR_ERROR);
5764 }
5765
5766 status = _nxd_mqtt_client_delete(client_ptr);
5767
5768 return(status);
5769 }
5770
5771
5772
5773 /**************************************************************************/
5774 /* */
5775 /* FUNCTION RELEASE */
5776 /* */
5777 /* _nxde_mqtt_client_publish PORTABLE C */
5778 /* 6.1 */
5779 /* AUTHOR */
5780 /* */
5781 /* Yuxin Zhou, Microsoft Corporation */
5782 /* */
5783 /* DESCRIPTION */
5784 /* */
5785 /* This function performs error checking to the publish service. */
5786 /* */
5787 /* */
5788 /* INPUT */
5789 /* */
5790 /* client_ptr Pointer to MQTT Client */
5791 /* topic_name Name of the topic */
5792 /* topic_name_length Length of the topic name */
5793 /* message Message string */
5794 /* message_length Length of the message, */
5795 /* in bytes */
5796 /* retain The retain flag, whether */
5797 /* or not the broker should */
5798 /* store this message */
5799 /* QoS Expected QoS level */
5800 /* wait_option Suspension option */
5801 /* */
5802 /* OUTPUT */
5803 /* */
5804 /* status Completion status */
5805 /* */
5806 /* CALLS */
5807 /* */
5808 /* */
5809 /* CALLED BY */
5810 /* */
5811 /* Application Code */
5812 /* */
5813 /* RELEASE HISTORY */
5814 /* */
5815 /* DATE NAME DESCRIPTION */
5816 /* */
5817 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5818 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5819 /* resulting in version 6.1 */
5820 /* */
5821 /**************************************************************************/
_nxde_mqtt_client_publish(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,CHAR * message,UINT message_length,UINT retain,UINT QoS,ULONG wait_option)5822 UINT _nxde_mqtt_client_publish(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length,
5823 CHAR *message, UINT message_length, UINT retain, UINT QoS, ULONG wait_option)
5824 {
5825 /* Validate client_ptr */
5826 if (client_ptr == NX_NULL)
5827 {
5828 return(NX_PTR_ERROR);
5829 }
5830
5831 /* Validate topic_name */
5832 if ((topic_name == NX_NULL) || (topic_name_length == 0))
5833 {
5834 return(NXD_MQTT_INVALID_PARAMETER);
5835 }
5836
5837 /* Validate message length. */
5838 if (message && (message_length == 0))
5839 {
5840 return(NXD_MQTT_INVALID_PARAMETER);
5841 }
5842
5843 /* Validate QoS value. */
5844 if (QoS > 3)
5845 {
5846 return(NXD_MQTT_INVALID_PARAMETER);
5847 }
5848
5849 return(_nxd_mqtt_client_publish(client_ptr, topic_name, topic_name_length, message, message_length, retain, QoS, wait_option));
5850 }
5851
5852
5853 /**************************************************************************/
5854 /* */
5855 /* FUNCTION RELEASE */
5856 /* */
5857 /* _nxde_mqtt_client_subscribe PORTABLE C */
5858 /* 6.1 */
5859 /* AUTHOR */
5860 /* */
5861 /* Yuxin Zhou, Microsoft Corporation */
5862 /* */
5863 /* DESCRIPTION */
5864 /* */
5865 /* This function performs error checking to the subscribe service. */
5866 /* */
5867 /* */
5868 /* INPUT */
5869 /* */
5870 /* client_ptr Pointer to MQTT Client */
5871 /* topic_name Pointer to the topic string */
5872 /* to subscribe to */
5873 /* topic_name_length Length of the topic string */
5874 /* in bytes */
5875 /* */
5876 /* OUTPUT */
5877 /* */
5878 /* client_ptr Pointer to MQTT Client */
5879 /* topic_name Pointer to the topic string */
5880 /* to subscribe to */
5881 /* topic_name_length Length of the topic string */
5882 /* in bytes */
5883 /* QoS Expected QoS level */
5884 /* */
5885 /* CALLS */
5886 /* */
5887 /* _nxd_mqtt_client_subscribe */
5888 /* */
5889 /* CALLED BY */
5890 /* */
5891 /* Application Code */
5892 /* */
5893 /* RELEASE HISTORY */
5894 /* */
5895 /* DATE NAME DESCRIPTION */
5896 /* */
5897 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5898 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5899 /* resulting in version 6.1 */
5900 /* */
5901 /**************************************************************************/
_nxde_mqtt_client_subscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,UINT QoS)5902 UINT _nxde_mqtt_client_subscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length, UINT QoS)
5903 {
5904
5905
5906 /* Validate client_ptr */
5907 if (client_ptr == NX_NULL)
5908 {
5909 return(NX_PTR_ERROR);
5910 }
5911
5912 /* Validate topic_name */
5913 if ((topic_name == NX_NULL) || (topic_name_length == 0))
5914 {
5915 return(NXD_MQTT_INVALID_PARAMETER);
5916 }
5917
5918 /* Validate QoS value. */
5919 if (QoS > 2)
5920 {
5921 return(NXD_MQTT_INVALID_PARAMETER);
5922 }
5923
5924 return(_nxd_mqtt_client_subscribe(client_ptr, topic_name, topic_name_length, QoS));
5925 }
5926
5927
5928
5929
5930 /**************************************************************************/
5931 /* */
5932 /* FUNCTION RELEASE */
5933 /* */
5934 /* _nxde_mqtt_client_unsubscribe PORTABLE C */
5935 /* 6.1 */
5936 /* AUTHOR */
5937 /* */
5938 /* Yuxin Zhou, Microsoft Corporation */
5939 /* */
5940 /* DESCRIPTION */
5941 /* */
5942 /* This function performs error checking to the unsubscribe service. */
5943 /* */
5944 /* */
5945 /* INPUT */
5946 /* */
5947 /* client_ptr Pointer to MQTT Client */
5948 /* topic_name Pointer to the topic string */
5949 /* to unsubscribe */
5950 /* topic_name_length Length of the topic string */
5951 /* in bytes */
5952 /* */
5953 /* OUTPUT */
5954 /* */
5955 /* status Completion status */
5956 /* */
5957 /* CALLS */
5958 /* */
5959 /* */
5960 /* CALLED BY */
5961 /* */
5962 /* Application Code */
5963 /* */
5964 /* RELEASE HISTORY */
5965 /* */
5966 /* DATE NAME DESCRIPTION */
5967 /* */
5968 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5969 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5970 /* resulting in version 6.1 */
5971 /* */
5972 /**************************************************************************/
_nxde_mqtt_client_unsubscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length)5973 UINT _nxde_mqtt_client_unsubscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length)
5974 {
5975 /* Validate client_ptr */
5976 if (client_ptr == NX_NULL)
5977 {
5978 return(NX_PTR_ERROR);
5979 }
5980
5981 /* Validate topic_name */
5982 if ((topic_name == NX_NULL) || (topic_name_length == 0))
5983 {
5984 return(NXD_MQTT_INVALID_PARAMETER);
5985 }
5986
5987 return(_nxd_mqtt_client_unsubscribe(client_ptr, topic_name, topic_name_length));
5988 }
5989
5990
5991 /**************************************************************************/
5992 /* */
5993 /* FUNCTION RELEASE */
5994 /* */
5995 /* _nxde_mqtt_client_disconnect PORTABLE C */
5996 /* 6.1 */
5997 /* AUTHOR */
5998 /* */
5999 /* Yuxin Zhou, Microsoft Corporation */
6000 /* */
6001 /* DESCRIPTION */
6002 /* */
6003 /* This function checks for errors in MQTT client disconnect call. */
6004 /* */
6005 /* */
6006 /* INPUT */
6007 /* */
6008 /* client_ptr Pointer to MQTT Client */
6009 /* */
6010 /* OUTPUT */
6011 /* */
6012 /* status Completion status */
6013 /* */
6014 /* CALLS */
6015 /* */
6016 /* _nxd_mqtt_client_disconnect Actual MQTT Client disconnect */
6017 /* call */
6018 /* */
6019 /* CALLED BY */
6020 /* */
6021 /* Application Code */
6022 /* */
6023 /* RELEASE HISTORY */
6024 /* */
6025 /* DATE NAME DESCRIPTION */
6026 /* */
6027 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6028 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6029 /* resulting in version 6.1 */
6030 /* */
6031 /**************************************************************************/
_nxde_mqtt_client_disconnect(NXD_MQTT_CLIENT * client_ptr)6032 UINT _nxde_mqtt_client_disconnect(NXD_MQTT_CLIENT *client_ptr)
6033 {
6034 /* Validate client_ptr */
6035 if (client_ptr == NX_NULL)
6036 {
6037 return(NX_PTR_ERROR);
6038 }
6039
6040 return(_nxd_mqtt_client_disconnect(client_ptr));
6041 }
6042
6043
6044 /**************************************************************************/
6045 /* */
6046 /* FUNCTION RELEASE */
6047 /* */
6048 /* _nxde_mqtt_client_message_get PORTABLE C */
6049 /* 6.1 */
6050 /* AUTHOR */
6051 /* */
6052 /* Yuxin Zhou, Microsoft Corporation */
6053 /* */
6054 /* DESCRIPTION */
6055 /* */
6056 /* This function checks for errors in MQTT client message get call. */
6057 /* */
6058 /* */
6059 /* INPUT */
6060 /* */
6061 /* client_ptr Pointer to MQTT Client */
6062 /* topic_buffer Pointer to the topic buffer */
6063 /* where topic is copied to */
6064 /* topic_buffer_size Size of the topic buffer. */
6065 /* actual_topic_length Number of bytes copied into */
6066 /* topic_buffer */
6067 /* message_buffer Pointer to the buffer where */
6068 /* message is copied to */
6069 /* message_buffer_size Size of the message_buffer */
6070 /* actual_message_length Number of bytes copied into */
6071 /* the message buffer. */
6072 /* */
6073 /* OUTPUT */
6074 /* */
6075 /* status Completion status */
6076 /* */
6077 /* CALLS */
6078 /* */
6079 /* _nxd_mqtt_client_message_get */
6080 /* */
6081 /* */
6082 /* CALLED BY */
6083 /* */
6084 /* Application Code */
6085 /* */
6086 /* RELEASE HISTORY */
6087 /* */
6088 /* DATE NAME DESCRIPTION */
6089 /* */
6090 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6091 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6092 /* resulting in version 6.1 */
6093 /* */
6094 /**************************************************************************/
_nxde_mqtt_client_message_get(NXD_MQTT_CLIENT * client_ptr,UCHAR * topic_buffer,UINT topic_buffer_size,UINT * actual_topic_length,UCHAR * message_buffer,UINT message_buffer_size,UINT * actual_message_length)6095 UINT _nxde_mqtt_client_message_get(NXD_MQTT_CLIENT *client_ptr, UCHAR *topic_buffer, UINT topic_buffer_size, UINT *actual_topic_length,
6096 UCHAR *message_buffer, UINT message_buffer_size, UINT *actual_message_length)
6097 {
6098
6099 /* Validate client_ptr */
6100 if (client_ptr == NX_NULL)
6101 {
6102 return(NX_PTR_ERROR);
6103 }
6104
6105 /* Topic and topic_length can be NULL if caller does not care the topic string. */
6106
6107 /* Validate message. Message_length can be NULL if caller does not care message length. */
6108 if ((message_buffer == NX_NULL) || (topic_buffer == NX_NULL))
6109 {
6110 return(NXD_MQTT_INVALID_PARAMETER);
6111 }
6112
6113
6114 return(_nxd_mqtt_client_message_get(client_ptr, topic_buffer, topic_buffer_size, actual_topic_length,
6115 message_buffer, message_buffer_size, actual_message_length));
6116 }
6117
6118
6119 /**************************************************************************/
6120 /* */
6121 /* FUNCTION RELEASE */
6122 /* */
6123 /* _nxde_mqtt_client_receive_notify_set PORTABLE C */
6124 /* 6.1 */
6125 /* AUTHOR */
6126 /* */
6127 /* Yuxin Zhou, Microsoft Corporation */
6128 /* */
6129 /* DESCRIPTION */
6130 /* */
6131 /* This function checks for errors in MQTT client publish notify call. */
6132 /* */
6133 /* */
6134 /* INPUT */
6135 /* */
6136 /* client_ptr Pointer to MQTT Client */
6137 /* receive_notify User-supplied callback */
6138 /* function, which is invoked */
6139 /* upon receiving a publish */
6140 /* message. */
6141 /* */
6142 /* OUTPUT */
6143 /* */
6144 /* status Completion status */
6145 /* */
6146 /* CALLS */
6147 /* */
6148 /* _nxd_mqtt_client_receive_notify_set */
6149 /* */
6150 /* */
6151 /* CALLED BY */
6152 /* */
6153 /* Application Code */
6154 /* */
6155 /* RELEASE HISTORY */
6156 /* */
6157 /* DATE NAME DESCRIPTION */
6158 /* */
6159 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6160 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6161 /* resulting in version 6.1 */
6162 /* */
6163 /**************************************************************************/
_nxde_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* receive_notify)(NXD_MQTT_CLIENT * client_ptr,UINT message_count))6164 UINT _nxde_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT *client_ptr,
6165 VOID (*receive_notify)(NXD_MQTT_CLIENT *client_ptr, UINT message_count))
6166 {
6167 /* Validate client_ptr */
6168 if (client_ptr == NX_NULL)
6169 {
6170 return(NX_PTR_ERROR);
6171 }
6172
6173 if (receive_notify == NX_NULL)
6174 {
6175 return(NX_PTR_ERROR);
6176 }
6177
6178 return(_nxd_mqtt_client_receive_notify_set(client_ptr, receive_notify));
6179 }
6180
6181
6182
6183 /**************************************************************************/
6184 /* */
6185 /* FUNCTION RELEASE */
6186 /* */
6187 /* _nxd_mqtt_client_disconnect_notify_set PORTABLE C */
6188 /* 6.1 */
6189 /* AUTHOR */
6190 /* */
6191 /* Yuxin Zhou, Microsoft Corporation */
6192 /* */
6193 /* DESCRIPTION */
6194 /* */
6195 /* This function sets the notify function for the disconnect event. */
6196 /* */
6197 /* INPUT */
6198 /* */
6199 /* client_ptr Pointer to MQTT Client */
6200 /* disconnect_notify The notify function to be */
6201 /* used when the client is */
6202 /* disconnected from the */
6203 /* server. */
6204 /* */
6205 /* OUTPUT */
6206 /* */
6207 /* status Completion status */
6208 /* */
6209 /* CALLS */
6210 /* */
6211 /* None */
6212 /* */
6213 /* CALLED BY */
6214 /* */
6215 /* Application Code */
6216 /* */
6217 /* RELEASE HISTORY */
6218 /* */
6219 /* DATE NAME DESCRIPTION */
6220 /* */
6221 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6222 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6223 /* resulting in version 6.1 */
6224 /* */
6225 /**************************************************************************/
_nxd_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* disconnect_notify)(NXD_MQTT_CLIENT *))6226 UINT _nxd_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT *client_ptr, VOID (*disconnect_notify)(NXD_MQTT_CLIENT *))
6227 {
6228
6229 client_ptr -> nxd_mqtt_disconnect_notify = disconnect_notify;
6230
6231 return(NXD_MQTT_SUCCESS);
6232 }
6233
6234
6235 /**************************************************************************/
6236 /* */
6237 /* FUNCTION RELEASE */
6238 /* */
6239 /* _nxde_mqtt_client_disconnect_notify_set PORTABLE C */
6240 /* 6.1 */
6241 /* AUTHOR */
6242 /* */
6243 /* Yuxin Zhou, Microsoft Corporation */
6244 /* */
6245 /* DESCRIPTION */
6246 /* */
6247 /* This function checks for errors in setting MQTT client disconnect */
6248 /* callback function. */
6249 /* */
6250 /* INPUT */
6251 /* */
6252 /* client_ptr Pointer to MQTT Client */
6253 /* disconnect_callback The callback function to be */
6254 /* used when an on-going */
6255 /* connection is disconnected. */
6256 /* */
6257 /* OUTPUT */
6258 /* */
6259 /* status Completion status */
6260 /* */
6261 /* CALLS */
6262 /* */
6263 /* _nxd_mqtt_client_disconnect_notify_set */
6264 /* */
6265 /* CALLED BY */
6266 /* */
6267 /* Application Code */
6268 /* */
6269 /* RELEASE HISTORY */
6270 /* */
6271 /* DATE NAME DESCRIPTION */
6272 /* */
6273 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6274 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6275 /* resulting in version 6.1 */
6276 /* */
6277 /**************************************************************************/
_nxde_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* disconnect_notify)(NXD_MQTT_CLIENT *))6278 UINT _nxde_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT *client_ptr, VOID (*disconnect_notify)(NXD_MQTT_CLIENT *))
6279 {
6280
6281 /* Validate client_ptr */
6282 if (client_ptr == NX_NULL)
6283 {
6284 return(NX_PTR_ERROR);
6285 }
6286 return(_nxd_mqtt_client_disconnect_notify_set(client_ptr, disconnect_notify));
6287 }
6288
6289
6290 #ifdef NXD_MQTT_CLOUD_ENABLE
6291 /**************************************************************************/
6292 /* */
6293 /* FUNCTION RELEASE */
6294 /* */
6295 /* _nxd_mqtt_client_cloud_create PORTABLE C */
6296 /* 6.1 */
6297 /* AUTHOR */
6298 /* */
6299 /* Yuxin Zhou, Microsoft Corporation */
6300 /* */
6301 /* DESCRIPTION */
6302 /* */
6303 /* This function creates mqtt client running on cloud helper thread. */
6304 /* */
6305 /* */
6306 /* INPUT */
6307 /* */
6308 /* client_ptr Pointer to MQTT Client */
6309 /* client_name Name string used in by the */
6310 /* client */
6311 /* client_id Client ID used by the client */
6312 /* client_id_length Length of Client ID, in bytes */
6313 /* ip_ptr Pointer to IP instance */
6314 /* pool_ptr Pointer to packet pool */
6315 /* cloud_ptr Pointer to Cloud instance */
6316 /* */
6317 /* OUTPUT */
6318 /* */
6319 /* status Completion status */
6320 /* */
6321 /* CALLS */
6322 /* */
6323 /* _nxd_mqtt_client_create Actual client create call */
6324 /* */
6325 /* CALLED BY */
6326 /* */
6327 /* Application Code */
6328 /* */
6329 /* RELEASE HISTORY */
6330 /* */
6331 /* DATE NAME DESCRIPTION */
6332 /* */
6333 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6334 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
6335 /* corrected mqtt client state,*/
6336 /* resulting in version 6.1 */
6337 /* */
6338 /**************************************************************************/
_nxd_mqtt_client_cloud_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,NX_CLOUD * cloud_ptr)6339 UINT _nxd_mqtt_client_cloud_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name, CHAR *client_id, UINT client_id_length,
6340 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr, NX_CLOUD *cloud_ptr)
6341 {
6342
6343 UINT status;
6344
6345
6346 /* Check for invalid input pointers. */
6347 if ((client_ptr == NX_NULL) || (ip_ptr == NX_NULL) || (ip_ptr -> nx_ip_id != NX_IP_ID) ||
6348 (pool_ptr == NX_NULL) || (cloud_ptr == NX_NULL) || (cloud_ptr -> nx_cloud_id != NX_CLOUD_ID))
6349 {
6350 return(NX_PTR_ERROR);
6351 }
6352
6353 /* Create MQTT client. */
6354 status = _nxd_mqtt_client_create_internal(client_ptr, client_name, client_id, client_id_length, ip_ptr,
6355 pool_ptr, NX_NULL, 0, 0);
6356
6357 /* Check status. */
6358 if (status)
6359 {
6360 return(status);
6361 }
6362
6363 /* Save the cloud pointer. */
6364 client_ptr -> nxd_mqtt_client_cloud_ptr = cloud_ptr;
6365
6366 /* Save the mutex pointer. */
6367 client_ptr -> nxd_mqtt_client_mutex_ptr = &(cloud_ptr -> nx_cloud_mutex);
6368
6369 /* Register MQTT on cloud helper. */
6370 status = nx_cloud_module_register(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module), client_name, NX_CLOUD_MODULE_MQTT_EVENT,
6371 _nxd_mqtt_client_event_process, client_ptr);
6372
6373 /* Determine if an error occurred. */
6374 if (status != NX_SUCCESS)
6375 {
6376
6377 /* Delete internal resource created in _nxd_mqtt_client_create_internal(). */
6378
6379 /* Delete socket. */
6380 nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
6381
6382 return(NXD_MQTT_INTERNAL_ERROR);
6383 }
6384
6385 /* Update state. */
6386 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
6387
6388 return(NXD_MQTT_SUCCESS);
6389 }
6390 #endif /* NXD_MQTT_CLOUD_ENABLE */
6391
6392 #ifdef NXD_MQTT_OVER_WEBSOCKET
6393 /**************************************************************************/
6394 /* */
6395 /* FUNCTION RELEASE */
6396 /* */
6397 /* _nxd_mqtt_client_websocket_connection_status_callback */
6398 /* PORTABLE C */
6399 /* 6.2.0 */
6400 /* AUTHOR */
6401 /* */
6402 /* Yuxin Zhou, Microsoft Corporation */
6403 /* */
6404 /* DESCRIPTION */
6405 /* */
6406 /* This function is the websocket connection status callback. */
6407 /* */
6408 /* INPUT */
6409 /* */
6410 /* websocket_client_ptr Pointer to websocket client */
6411 /* context Pointer to MQTT client */
6412 /* status Websocket connection status */
6413 /* */
6414 /* OUTPUT */
6415 /* */
6416 /* None */
6417 /* */
6418 /* CALLS */
6419 /* */
6420 /* _nxd_mqtt_client_connect_packet_send */
6421 /* _nxd_mqtt_client_connection_end */
6422 /* */
6423 /* CALLED BY */
6424 /* */
6425 /* _nxd_mqtt_client_event_process */
6426 /* */
6427 /* RELEASE HISTORY */
6428 /* */
6429 /* DATE NAME DESCRIPTION */
6430 /* */
6431 /* 10-31-2022 Yuxin Zhou Initial Version 6.2.0 */
6432 /* */
6433 /**************************************************************************/
_nxd_mqtt_client_websocket_connection_status_callback(NX_WEBSOCKET_CLIENT * websocket_client_ptr,VOID * context,UINT status)6434 VOID _nxd_mqtt_client_websocket_connection_status_callback(NX_WEBSOCKET_CLIENT *websocket_client_ptr, VOID *context, UINT status)
6435 {
6436 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)context;
6437
6438
6439 NX_PARAMETER_NOT_USED(websocket_client_ptr);
6440
6441 if (status == NX_SUCCESS)
6442 {
6443
6444 /* Start to send MQTT connect packet. */
6445 status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
6446 }
6447
6448 /* If an error occurs. */
6449 if (status)
6450 {
6451
6452 /* End connection. */
6453 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
6454
6455 /* Check callback function. */
6456 if (client_ptr -> nxd_mqtt_connect_notify)
6457 {
6458 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
6459 }
6460 }
6461 }
6462
6463 /**************************************************************************/
6464 /* */
6465 /* FUNCTION RELEASE */
6466 /* */
6467 /* _nxd_mqtt_client_websocket_set PORTABLE C */
6468 /* 6.2.0 */
6469 /* AUTHOR */
6470 /* */
6471 /* Yuxin Zhou, Microsoft Corporation */
6472 /* */
6473 /* DESCRIPTION */
6474 /* */
6475 /* This function sets the websocket. */
6476 /* */
6477 /* INPUT */
6478 /* */
6479 /* client_ptr Pointer to MQTT Client */
6480 /* host Host used by the client */
6481 /* host_length Length of host, in bytes */
6482 /* uri_path URI path used by the client */
6483 /* uri_path_length Length of uri path, in bytes */
6484 /* */
6485 /* OUTPUT */
6486 /* */
6487 /* status Completion status */
6488 /* */
6489 /* CALLS */
6490 /* */
6491 /* None */
6492 /* */
6493 /* CALLED BY */
6494 /* */
6495 /* Application Code */
6496 /* */
6497 /* RELEASE HISTORY */
6498 /* */
6499 /* DATE NAME DESCRIPTION */
6500 /* */
6501 /* 10-31-2022 Yuxin Zhou Initial Version 6.2.0 */
6502 /* */
6503 /**************************************************************************/
_nxd_mqtt_client_websocket_set(NXD_MQTT_CLIENT * client_ptr,UCHAR * host,UINT host_length,UCHAR * uri_path,UINT uri_path_length)6504 UINT _nxd_mqtt_client_websocket_set(NXD_MQTT_CLIENT *client_ptr, UCHAR *host, UINT host_length, UCHAR *uri_path, UINT uri_path_length)
6505 {
6506 UINT status;
6507
6508 /* Obtain the mutex. */
6509 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
6510
6511 if (status != TX_SUCCESS)
6512 {
6513 return(NXD_MQTT_MUTEX_FAILURE);
6514 }
6515
6516 /* Set the host info. */
6517 client_ptr -> nxd_mqtt_client_websocket_host = host;
6518 client_ptr -> nxd_mqtt_client_websocket_host_length = host_length;
6519 client_ptr -> nxd_mqtt_client_websocket_uri_path = uri_path;
6520 client_ptr -> nxd_mqtt_client_websocket_uri_path_length = uri_path_length;
6521
6522 /* Create WebSocket. */
6523 status = nx_websocket_client_create(&client_ptr -> nxd_mqtt_client_websocket, (UCHAR *)"",
6524 client_ptr -> nxd_mqtt_client_ip_ptr,
6525 client_ptr -> nxd_mqtt_client_packet_pool_ptr);
6526
6527 /* Check status. */
6528 if (status)
6529 {
6530 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
6531 return(status);
6532 }
6533
6534 client_ptr -> nxd_mqtt_client_use_websocket = NX_TRUE;
6535
6536 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
6537 return(NXD_MQTT_SUCCESS);
6538 }
6539
6540
6541 /**************************************************************************/
6542 /* */
6543 /* FUNCTION RELEASE */
6544 /* */
6545 /* _nxde_mqtt_client_websocket_set PORTABLE C */
6546 /* 6.2.0 */
6547 /* AUTHOR */
6548 /* */
6549 /* Yuxin Zhou, Microsoft Corporation */
6550 /* */
6551 /* DESCRIPTION */
6552 /* */
6553 /* This function checks for errors in setting MQTT client websocket. */
6554 /* */
6555 /* INPUT */
6556 /* */
6557 /* client_ptr Pointer to MQTT Client */
6558 /* host Host used by the client */
6559 /* host_length Length of host, in bytes */
6560 /* uri_path URI path used by the client */
6561 /* uri_path_length Length of uri path, in bytes */
6562 /* */
6563 /* OUTPUT */
6564 /* */
6565 /* status Completion status */
6566 /* */
6567 /* CALLS */
6568 /* */
6569 /* _nxd_mqtt_client_websocket_set */
6570 /* */
6571 /* CALLED BY */
6572 /* */
6573 /* Application Code */
6574 /* */
6575 /* RELEASE HISTORY */
6576 /* */
6577 /* DATE NAME DESCRIPTION */
6578 /* */
6579 /* 10-31-2022 Yuxin Zhou Initial Version 6.2.0 */
6580 /* */
6581 /**************************************************************************/
_nxde_mqtt_client_websocket_set(NXD_MQTT_CLIENT * client_ptr,UCHAR * host,UINT host_length,UCHAR * uri_path,UINT uri_path_length)6582 UINT _nxde_mqtt_client_websocket_set(NXD_MQTT_CLIENT *client_ptr, UCHAR *host, UINT host_length, UCHAR *uri_path, UINT uri_path_length)
6583 {
6584
6585 /* Validate the parameters. */
6586 if ((client_ptr == NX_NULL) || (host == NX_NULL) || (host_length == 0) ||
6587 (uri_path == NX_NULL) || (uri_path_length == 0))
6588 {
6589 return(NX_PTR_ERROR);
6590 }
6591
6592 return(_nxd_mqtt_client_websocket_set(client_ptr, host, host_length, uri_path, uri_path_length));
6593 }
6594 #endif /* NXD_MQTT_OVER_WEBSOCKET */
6595