1 /**************************************************************************/
2 /* */
3 /* Copyright (c) Microsoft Corporation. All rights reserved. */
4 /* */
5 /* This software is licensed under the Microsoft Software License */
6 /* Terms for Microsoft Azure RTOS. Full text of the license can be */
7 /* found in the LICENSE file at https://aka.ms/AzureRTOS_EULA */
8 /* and in the root directory of this software. */
9 /* */
10 /**************************************************************************/
11
12
13 /**************************************************************************/
14 /**************************************************************************/
15 /** */
16 /** NetX Component */
17 /** */
18 /** MQTT (MQTT) */
19 /** */
20 /**************************************************************************/
21 /**************************************************************************/
22
23 #define NXD_MQTT_CLIENT_SOURCE_CODE
24
25
26 /* Force error checking to be disabled in this module */
27
28 #ifndef NX_DISABLE_ERROR_CHECKING
29 #define NX_DISABLE_ERROR_CHECKING
30 #endif
31
32
33 /* Include necessary system files. */
34
35 #include "tx_api.h"
36 #include "nx_api.h"
37 #include "nx_ip.h"
38 #include "nxd_mqtt_client.h"
39
40 /* Bring in externals for caller checking code. */
41
42 #define MQTT_ALL_EVENTS ((ULONG)0xFFFFFFFF)
43 #define MQTT_TIMEOUT_EVENT ((ULONG)0x00000001)
44 #define MQTT_PACKET_RECEIVE_EVENT ((ULONG)0x00000002)
45 #define MQTT_START_EVENT ((ULONG)0x00000004)
46 #define MQTT_DELETE_EVENT ((ULONG)0x00000008)
47 #define MQTT_PING_TIMEOUT_EVENT ((ULONG)0x00000010)
48 #define MQTT_NETWORK_DISCONNECT_EVENT ((ULONG)0x00000020)
49 #define MQTT_TCP_ESTABLISH_EVENT ((ULONG)0x00000040)
50
51 static UINT _nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
52 CHAR *client_id, UINT client_id_length,
53 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
54 VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority);
55 static UINT _nxd_mqtt_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UINT wait_option);
56 static UINT _nxd_mqtt_packet_receive(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, UINT wait_option);
57 static UINT _nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET **new_packet_ptr,
58 USHORT packet_id, UCHAR set_duplicate_flag, UINT wait_option);
59 static VOID _nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr);
60 static VOID _nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr);
61 static UINT _nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option);
62 static UINT _nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option);
63
64 /**************************************************************************/
65 /* */
66 /* FUNCTION RELEASE */
67 /* */
68 /* _nxd_mqtt_client_set_fixed_header PORTABLE C */
69 /* 6.1 */
70 /* AUTHOR */
71 /* */
72 /* Yuxin Zhou, Microsoft Corporation */
73 /* */
74 /* DESCRIPTION */
75 /* */
76 /* This function writes the fixed header filed in the outgoing */
77 /* MQTT packet. */
78 /* */
79 /* This function follows the logic outlined in 2.2 in MQTT */
80 /* specification. */
81 /* */
82 /* INPUT */
83 /* */
84 /* client_ptr Pointer to MQTT Client */
85 /* packet_ptr Outgoing MQTT packet */
86 /* control_header Control byte */
87 /* length Remaining length in bytes */
88 /* wait_option Wait option */
89 /* */
90 /* OUTPUT */
91 /* */
92 /* status */
93 /* */
94 /* CALLS */
95 /* */
96 /* nx_packet_data_append Append packet data */
97 /* */
98 /* CALLED BY */
99 /* */
100 /* _nxd_mqtt_client_sub_unsub */
101 /* _nxd_mqtt_client_connect */
102 /* _nxd_mqtt_client_publish */
103 /* */
104 /* RELEASE HISTORY */
105 /* */
106 /* DATE NAME DESCRIPTION */
107 /* */
108 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
109 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
110 /* resulting in version 6.1 */
111 /* */
112 /**************************************************************************/
_nxd_mqtt_client_set_fixed_header(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,UCHAR control_header,UINT length,UINT wait_option)113 UINT _nxd_mqtt_client_set_fixed_header(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UCHAR control_header, UINT length, UINT wait_option)
114 {
115 UCHAR fixed_header[5];
116 UCHAR *byte = fixed_header;
117 UINT count = 0;
118 UINT ret;
119
120 *byte = control_header;
121 byte++;
122
123 do
124 {
125 if (length & 0xFFFFFF80)
126 {
127 *(byte + count) = (UCHAR)((length & 0x7F) | 0x80);
128 }
129 else
130 {
131 *(byte + count) = length & 0x7F;
132 }
133 length = length >> 7;
134
135 count++;
136 } while (length != 0);
137
138 ret = nx_packet_data_append(packet_ptr, fixed_header, count + 1,
139 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
140
141 return(ret);
142 }
143
144
145
146 /**************************************************************************/
147 /* */
148 /* FUNCTION RELEASE */
149 /* */
150 /* _nxd_mqtt_read_remaining_length PORTABLE C */
151 /* 6.1 */
152 /* AUTHOR */
153 /* */
154 /* Yuxin Zhou, Microsoft Corporation */
155 /* */
156 /* DESCRIPTION */
157 /* */
158 /* This function parses the remaining length filed in the incoming */
159 /* MQTT packet. */
160 /* */
161 /* This function follows the logic outlined in 2.2.3 in MQTT */
162 /* specification */
163 /* */
164 /* INPUT */
165 /* */
166 /* packet_ptr Incoming MQTT packet. */
167 /* remaining_length remaining length in bytes, */
168 /* this is the return value. */
169 /* offset Pointer to offset of the */
170 /* remaining data */
171 /* */
172 /* OUTPUT */
173 /* */
174 /* status */
175 /* */
176 /* CALLS */
177 /* */
178 /* None */
179 /* */
180 /* CALLED BY */
181 /* */
182 /* _nxd_mqtt_process_publish */
183 /* _nxd_mqtt_client_message_get */
184 /* _nxd_mqtt_process_sub_unsub_ack */
185 /* */
186 /* RELEASE HISTORY */
187 /* */
188 /* DATE NAME DESCRIPTION */
189 /* */
190 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
191 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
192 /* resulting in version 6.1 */
193 /* */
194 /**************************************************************************/
_nxd_mqtt_read_remaining_length(NX_PACKET * packet_ptr,UINT * remaining_length,ULONG * offset_ptr)195 UINT _nxd_mqtt_read_remaining_length(NX_PACKET *packet_ptr, UINT *remaining_length, ULONG *offset_ptr)
196 {
197 UINT value = 0;
198 UCHAR bytes[4] = {0};
199 UINT multiplier = 1;
200 UINT byte_count = 0;
201 ULONG bytes_copied;
202
203 if (nx_packet_data_extract_offset(packet_ptr, 1, &bytes, sizeof(bytes), &bytes_copied))
204 {
205
206 /* Packet is incomplete. */
207 return(NXD_MQTT_PARTIAL_PACKET);
208 }
209
210 do
211 {
212 if (byte_count >= bytes_copied)
213 {
214 if (byte_count == 4)
215 {
216 return(NXD_MQTT_INTERNAL_ERROR);
217 }
218 else
219 {
220
221 /* Packet is incomplete. */
222 return(NXD_MQTT_PARTIAL_PACKET);
223 }
224 }
225 value += (((bytes[byte_count]) & 0x7F) * multiplier);
226 multiplier = multiplier << 7;
227 } while ((bytes[byte_count++]) & 0x80);
228
229 if ((1 + byte_count + value) > packet_ptr -> nx_packet_length)
230 {
231
232 /* Packet is incomplete. */
233 /* Remaining length is larger than packet size. */
234 return(NXD_MQTT_PARTIAL_PACKET);
235 }
236
237 *remaining_length = value;
238 *offset_ptr = (1 + byte_count);
239
240 return(NXD_MQTT_SUCCESS);
241 }
242
243
244
245 /**************************************************************************/
246 /* */
247 /* FUNCTION RELEASE */
248 /* */
249 /* _nxd_mqtt_client_sub_unsub PORTABLE C */
250 /* 6.3.0 */
251 /* AUTHOR */
252 /* */
253 /* Yuxin Zhou, Microsoft Corporation */
254 /* */
255 /* DESCRIPTION */
256 /* */
257 /* This function sends a subscribe or unsubscribe message to the */
258 /* broker. */
259 /* */
260 /* */
261 /* INPUT */
262 /* */
263 /* client_ptr Pointer to MQTT Client */
264 /* op Subscribe or Unsubscribe */
265 /* topic_name Pointer to the topic string */
266 /* to subscribe to */
267 /* topic_name_length Length of the topic string */
268 /* in bytes */
269 /* packet_id_ptr Pointer to packet id that */
270 /* will be filled with */
271 /* assigned packet id for */
272 /* sub/unsub message */
273 /* QoS Expected QoS level */
274 /* */
275 /* OUTPUT */
276 /* */
277 /* status Completion status */
278 /* */
279 /* CALLS */
280 /* */
281 /* tx_mutex_get */
282 /* _nxd_mqtt_client_packet_allocate */
283 /* _nxd_mqtt_client_set_fixed_header */
284 /* _nxd_mqtt_client_append_message */
285 /* tx_mutex_put */
286 /* _nxd_mqtt_packet_send */
287 /* nx_packet_release */
288 /* _nxd_mqtt_copy_transmit_packet */
289 /* */
290 /* CALLED BY */
291 /* */
292 /* _nxd_mqtt_client_subscribe */
293 /* _nxd_mqtt_client_unsubscribe */
294 /* */
295 /* RELEASE HISTORY */
296 /* */
297 /* DATE NAME DESCRIPTION */
298 /* */
299 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
300 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
301 /* resulting in version 6.1 */
302 /* 11-09-2020 Yuxin Zhou Modified comment(s), and */
303 /* added packet id parameter, */
304 /* resulting in version 6.1.2 */
305 /* 07-29-2022 Spencer McDonough Modified comment(s), */
306 /* improved internal logic, */
307 /* resulting in version 6.1.12 */
308 /* 10-31-2022 Bo Chen Modified comment(s), improved */
309 /* the logic of sending packet,*/
310 /* resulting in version 6.2.0 */
311 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
312 /* internal logic, */
313 /* resulting in version 6.3.0 */
314 /* */
315 /**************************************************************************/
_nxd_mqtt_client_sub_unsub(NXD_MQTT_CLIENT * client_ptr,UINT op,CHAR * topic_name,UINT topic_name_length,USHORT * packet_id_ptr,UINT QoS)316 UINT _nxd_mqtt_client_sub_unsub(NXD_MQTT_CLIENT *client_ptr, UINT op,
317 CHAR *topic_name, UINT topic_name_length,
318 USHORT *packet_id_ptr, UINT QoS)
319 {
320
321
322 NX_PACKET *packet_ptr;
323 NX_PACKET *transmit_packet_ptr;
324 UINT status;
325 UINT length = 0;
326 UINT ret = NXD_MQTT_SUCCESS;
327 UCHAR temp_data[2];
328
329 /* Obtain the mutex. */
330 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
331
332 if (status != TX_SUCCESS)
333 {
334 return(NXD_MQTT_MUTEX_FAILURE);
335 }
336
337 /* Do nothing if the client is already connected. */
338 if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTED)
339 {
340 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
341 return(NXD_MQTT_NOT_CONNECTED);
342 }
343
344 status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
345 if (status)
346 {
347 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
348 return(status);
349 }
350
351 /* Compute the remaining length field, starting with 2 bytes of packet ID */
352 length = 2;
353
354 /* Count the topic. */
355 length += (2 + topic_name_length);
356
357 if (op == ((MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02))
358 {
359 /* Count one byte for QoS */
360 length++;
361 }
362
363 /* Write out the control header and remaining length field. */
364 ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, (UCHAR )op, length, NX_WAIT_FOREVER);
365
366 if (ret)
367 {
368
369 /* Release the mutex. */
370 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
371
372 /* Release the packet. */
373 nx_packet_release(packet_ptr);
374
375 return(NXD_MQTT_PACKET_POOL_FAILURE);
376 }
377
378 temp_data[0] = (UCHAR)(client_ptr -> nxd_mqtt_client_packet_identifier >> 8);
379 temp_data[1] = (client_ptr -> nxd_mqtt_client_packet_identifier & 0xFF);
380
381 if (packet_id_ptr)
382 {
383 *packet_id_ptr = (USHORT)(client_ptr -> nxd_mqtt_client_packet_identifier & 0xFFFF);
384 }
385
386 /* Append packet ID. */
387 ret = nx_packet_data_append(packet_ptr, temp_data, 2, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_WAIT_FOREVER);
388
389 if (ret)
390 {
391
392 /* Release the mutex. */
393 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
394
395 /* Release the packet. */
396 nx_packet_release(packet_ptr);
397
398 return(NXD_MQTT_PACKET_POOL_FAILURE);
399 }
400
401 /* Append topic name */
402 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, topic_name, topic_name_length, NX_WAIT_FOREVER);
403
404 if (ret)
405 {
406
407 /* Release the mutex. */
408 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
409
410 /* Release the packet. */
411 nx_packet_release(packet_ptr);
412
413 return(NXD_MQTT_PACKET_POOL_FAILURE);
414 }
415
416 if (op == ((MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02))
417 {
418 /* Fill in QoS value. */
419 temp_data[0] = QoS & 0x3;
420
421 ret = nx_packet_data_append(packet_ptr, temp_data, 1, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_WAIT_FOREVER);
422
423 if (ret)
424 {
425
426 /* Release the mutex. */
427 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
428
429 /* Release the packet. */
430 nx_packet_release(packet_ptr);
431
432 return(NXD_MQTT_PACKET_POOL_FAILURE);
433 }
434 }
435
436 /* Copy packet for retransmission. */
437 if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
438 (USHORT)(client_ptr -> nxd_mqtt_client_packet_identifier),
439 NX_FALSE, NX_WAIT_FOREVER))
440 {
441 /* Release the mutex. */
442 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
443
444 /* Release the packet. */
445 nx_packet_release(packet_ptr);
446
447 return(NXD_MQTT_PACKET_POOL_FAILURE);
448 }
449
450 if (client_ptr -> message_transmit_queue_head == NX_NULL)
451 {
452 client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
453 }
454 else
455 {
456 client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
457 }
458 client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
459
460 client_ptr -> nxd_mqtt_client_packet_identifier = (client_ptr -> nxd_mqtt_client_packet_identifier + 1) & 0xFFFF;
461
462 /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
463 if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
464 client_ptr -> nxd_mqtt_client_packet_identifier = 1;
465
466 /* Update the timeout value. */
467 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
468
469 /* Release the mutex. */
470 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
471
472 /* Ready to send the connect message to the server. */
473 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
474
475 if (status)
476 {
477 /* Release the packet. */
478 nx_packet_release(packet_ptr);
479
480 ret = NXD_MQTT_COMMUNICATION_FAILURE;
481 }
482
483 return(ret);
484 }
485
486
487 /**************************************************************************/
488 /* */
489 /* FUNCTION RELEASE */
490 /* */
491 /* _nxd_mqtt_client_packet_allocate PORTABLE C */
492 /* 6.3.0 */
493 /* AUTHOR */
494 /* */
495 /* Yuxin Zhou, Microsoft Corporation */
496 /* */
497 /* DESCRIPTION */
498 /* */
499 /* This function allocates a packet for transmitting MQTT message. */
500 /* Special care has to be taken for accommodating IPv4/IPv6 header, */
501 /* and possibly TLS record if TLS is being used. On failure, the */
502 /* TLS mutex is released and the caller can simply return. */
503 /* */
504 /* */
505 /* INPUT */
506 /* */
507 /* client_ptr Pointer to MQTT Client */
508 /* packet_ptr Allocated packet to be */
509 /* returned to the caller. */
510 /* */
511 /* OUTPUT */
512 /* */
513 /* status Completion status */
514 /* */
515 /* CALLS */
516 /* */
517 /* nx_secure_tls_packet_allocate Allocate packet for MQTT */
518 /* over TLS socket */
519 /* nx_packet_allocate Allocate a packet for MQTT */
520 /* over regular TCP socket */
521 /* tx_mutex_put Release a mutex */
522 /* */
523 /* CALLED BY */
524 /* */
525 /* _nxd_mqtt_process_publish */
526 /* _nxd_mqtt_client_connect */
527 /* _nxd_mqtt_client_publish */
528 /* _nxd_mqtt_client_subscribe */
529 /* _nxd_mqtt_client_unsubscribe */
530 /* _nxd_mqtt_client_send_simple_message */
531 /* */
532 /* RELEASE HISTORY */
533 /* */
534 /* DATE NAME DESCRIPTION */
535 /* */
536 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
537 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
538 /* resulting in version 6.1 */
539 /* 07-29-2022 Spencer McDonough Modified comment(s), and */
540 /* improved internal logic, */
541 /* resulting in version 6.1.12 */
542 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
543 /* mqtt over websocket, */
544 /* resulting in version 6.2.0 */
545 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
546 /* internal logic, */
547 /* resulting in version 6.3.0 */
548 /* */
549 /**************************************************************************/
_nxd_mqtt_client_packet_allocate(NXD_MQTT_CLIENT * client_ptr,NX_PACKET ** packet_ptr,ULONG wait_option)550 UINT _nxd_mqtt_client_packet_allocate(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, ULONG wait_option)
551 {
552 UINT status = NXD_MQTT_SUCCESS;
553
554 #ifdef NXD_MQTT_OVER_WEBSOCKET
555 if (client_ptr -> nxd_mqtt_client_use_websocket)
556 {
557
558 /* Use WebSocket packet allocate since it is able to count for WebSocket-related header space */
559 status = nx_websocket_client_packet_allocate(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, wait_option);
560 }
561 else
562 #endif /* NXD_MQTT_OVER_WEBSOCKET */
563 #ifdef NX_SECURE_ENABLE
564 if (client_ptr -> nxd_mqtt_client_use_tls)
565 {
566 /* Use TLS packet allocate. The TLS packet allocate is able to count for
567 TLS-related header space including crypto initial vector area. */
568 status = nx_secure_tls_packet_allocate(&client_ptr -> nxd_mqtt_tls_session, client_ptr -> nxd_mqtt_client_packet_pool_ptr,
569 packet_ptr, wait_option);
570 }
571 /* Allocate a packet */
572 else
573 {
574 #endif /* NX_SECURE_ENABLE */
575 if (client_ptr -> nxd_mqtt_client_socket.nx_tcp_socket_connect_ip.nxd_ip_version == NX_IP_VERSION_V4)
576 {
577 status = nx_packet_allocate(client_ptr -> nxd_mqtt_client_packet_pool_ptr, packet_ptr, NX_IPv4_TCP_PACKET,
578 wait_option);
579 }
580 else
581 {
582 status = nx_packet_allocate(client_ptr -> nxd_mqtt_client_packet_pool_ptr, packet_ptr, NX_IPv6_TCP_PACKET,
583 wait_option);
584 }
585 #ifdef NX_SECURE_ENABLE
586 }
587 #endif /* NX_SECURE_ENABLE */
588
589 if (status != NX_SUCCESS)
590 {
591 return(NXD_MQTT_PACKET_POOL_FAILURE);
592 }
593
594 return(NXD_MQTT_SUCCESS);
595 }
596
597 /**************************************************************************/
598 /* */
599 /* FUNCTION RELEASE */
600 /* */
601 /* _nxd_mqtt_packet_send PORTABLE C */
602 /* 6.2.0 */
603 /* AUTHOR */
604 /* */
605 /* Yuxin Zhou, Microsoft Corporation */
606 /* */
607 /* DESCRIPTION */
608 /* */
609 /* This function sends out a packet. */
610 /* */
611 /* */
612 /* INPUT */
613 /* */
614 /* client_ptr Pointer to MQTT Client */
615 /* packet_ptr Pointer to packet */
616 /* wait_option Timeout value */
617 /* */
618 /* OUTPUT */
619 /* */
620 /* status Completion status */
621 /* */
622 /* CALLS */
623 /* */
624 /* nx_websocket_client_send */
625 /* nx_secure_tls_session_send */
626 /* nx_tcp_socket_send */
627 /* */
628 /* CALLED BY */
629 /* */
630 /* */
631 /* RELEASE HISTORY */
632 /* */
633 /* DATE NAME DESCRIPTION */
634 /* */
635 /* 10-31-2022 Bo Chen Initial Version 6.2.0 */
636 /* */
637 /**************************************************************************/
_nxd_mqtt_packet_send(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,UINT wait_option)638 static UINT _nxd_mqtt_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UINT wait_option)
639 {
640 UINT status = NXD_MQTT_SUCCESS;
641
642 #ifdef NXD_MQTT_OVER_WEBSOCKET
643 if (client_ptr -> nxd_mqtt_client_use_websocket)
644 {
645 status = nx_websocket_client_send(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, NX_WEBSOCKET_OPCODE_BINARY_FRAME, NX_TRUE, wait_option);
646 }
647 else
648 #endif /* NXD_MQTT_OVER_WEBSOCKET */
649
650 #ifdef NX_SECURE_ENABLE
651 if (client_ptr -> nxd_mqtt_client_use_tls)
652 {
653 status = nx_secure_tls_session_send(&(client_ptr -> nxd_mqtt_tls_session), packet_ptr, wait_option);
654 }
655 else
656 #endif /* NX_SECURE_ENABLE */
657 {
658 status = nx_tcp_socket_send(&client_ptr -> nxd_mqtt_client_socket, packet_ptr, wait_option);
659 }
660
661 return(status);
662 }
663
664 /**************************************************************************/
665 /* */
666 /* FUNCTION RELEASE */
667 /* */
668 /* _nxd_mqtt_packet_receive PORTABLE C */
669 /* 6.2.0 */
670 /* AUTHOR */
671 /* */
672 /* Yuxin Zhou, Microsoft Corporation */
673 /* */
674 /* DESCRIPTION */
675 /* */
676 /* This function receives a packet. */
677 /* */
678 /* */
679 /* INPUT */
680 /* */
681 /* client_ptr Pointer to MQTT Client */
682 /* packet_ptr Pointer to packet */
683 /* wait_option Timeout value */
684 /* */
685 /* OUTPUT */
686 /* */
687 /* status Completion status */
688 /* */
689 /* CALLS */
690 /* */
691 /* nx_websocket_client_receive */
692 /* nx_secure_tls_session_receive */
693 /* nx_tcp_socket_receive */
694 /* */
695 /* CALLED BY */
696 /* */
697 /* */
698 /* RELEASE HISTORY */
699 /* */
700 /* DATE NAME DESCRIPTION */
701 /* */
702 /* 10-31-2022 Bo Chen Initial Version 6.2.0 */
703 /* */
704 /**************************************************************************/
_nxd_mqtt_packet_receive(NXD_MQTT_CLIENT * client_ptr,NX_PACKET ** packet_ptr,UINT wait_option)705 static UINT _nxd_mqtt_packet_receive(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, UINT wait_option)
706 {
707 UINT status = NXD_MQTT_SUCCESS;
708 #ifdef NXD_MQTT_OVER_WEBSOCKET
709 UINT op_code = 0;
710 #endif /* NXD_MQTT_OVER_WEBSOCKET*/
711
712 #ifdef NXD_MQTT_OVER_WEBSOCKET
713 if (client_ptr -> nxd_mqtt_client_use_websocket)
714 {
715 status = nx_websocket_client_receive(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, &op_code, wait_option);
716 if ((status == NX_SUCCESS) && (op_code != NX_WEBSOCKET_OPCODE_BINARY_FRAME))
717 {
718 return(NX_INVALID_PACKET);
719 }
720 }
721 else
722 #endif /* NXD_MQTT_OVER_WEBSOCKET */
723
724 #ifdef NX_SECURE_ENABLE
725 if (client_ptr -> nxd_mqtt_client_use_tls)
726 {
727 status = nx_secure_tls_session_receive(&(client_ptr -> nxd_mqtt_tls_session), packet_ptr, wait_option);
728 }
729 else
730 #endif /* NX_SECURE_ENABLE */
731 {
732 status = nx_tcp_socket_receive(&(client_ptr -> nxd_mqtt_client_socket), packet_ptr, wait_option);
733 }
734
735 return(status);
736 }
737
738 /**************************************************************************/
739 /* */
740 /* FUNCTION RELEASE */
741 /* */
742 /* _nxd_mqtt_tcp_establish_notify PORTABLE C */
743 /* 6.1 */
744 /* AUTHOR */
745 /* */
746 /* Yuxin Zhou, Microsoft Corporation */
747 /* */
748 /* DESCRIPTION */
749 /* */
750 /* This internal function is installed as TCP connection establish */
751 /* callback function. */
752 /* */
753 /* INPUT */
754 /* */
755 /* socket_ptr The socket that receives */
756 /* the message. */
757 /* */
758 /* OUTPUT */
759 /* */
760 /* None */
761 /* */
762 /* CALLS */
763 /* */
764 /* tx_event_flags_set */
765 /* */
766 /* CALLED BY */
767 /* */
768 /* _nxd_mqtt_thread_entry */
769 /* _nxd_mqtt_client_event_process */
770 /* */
771 /* RELEASE HISTORY */
772 /* */
773 /* DATE NAME DESCRIPTION */
774 /* */
775 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
776 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
777 /* resulting in version 6.1 */
778 /* */
779 /**************************************************************************/
_nxd_mqtt_tcp_establish_notify(NX_TCP_SOCKET * socket_ptr)780 static VOID _nxd_mqtt_tcp_establish_notify(NX_TCP_SOCKET *socket_ptr)
781 {
782 NXD_MQTT_CLIENT *client_ptr;
783
784 client_ptr = (NXD_MQTT_CLIENT *)socket_ptr -> nx_tcp_socket_reserved_ptr;
785
786 if (&(client_ptr -> nxd_mqtt_client_socket) == socket_ptr)
787 {
788
789 /* Set the event flag. */
790 #ifndef NXD_MQTT_CLOUD_ENABLE
791 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_TCP_ESTABLISH_EVENT, TX_OR);
792 #else
793 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_TCP_ESTABLISH_EVENT);
794 #endif /* NXD_MQTT_CLOUD_ENABLE */
795 }
796 }
797
798 /**************************************************************************/
799 /* */
800 /* FUNCTION RELEASE */
801 /* */
802 /* _nxd_mqtt_receive_callback PORTABLE C */
803 /* 6.1 */
804 /* AUTHOR */
805 /* */
806 /* Yuxin Zhou, Microsoft Corporation */
807 /* */
808 /* DESCRIPTION */
809 /* */
810 /* This internal function is installed as TCP receive callback */
811 /* function. On receiving a TCP message, the callback function */
812 /* sets an event flag to trigger MQTT client to process received */
813 /* message. */
814 /* */
815 /* */
816 /* INPUT */
817 /* */
818 /* socket_ptr The socket that receives */
819 /* the message. */
820 /* */
821 /* OUTPUT */
822 /* */
823 /* None */
824 /* */
825 /* CALLS */
826 /* */
827 /* tx_event_flags_set */
828 /* */
829 /* CALLED BY */
830 /* */
831 /* _nxd_mqtt_thread_entry */
832 /* _nxd_mqtt_client_event_process */
833 /* */
834 /* RELEASE HISTORY */
835 /* */
836 /* DATE NAME DESCRIPTION */
837 /* */
838 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
839 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
840 /* resulting in version 6.1 */
841 /* */
842 /**************************************************************************/
_nxd_mqtt_receive_callback(NX_TCP_SOCKET * socket_ptr)843 static VOID _nxd_mqtt_receive_callback(NX_TCP_SOCKET *socket_ptr)
844 {
845 NXD_MQTT_CLIENT *client_ptr;
846
847 client_ptr = (NXD_MQTT_CLIENT *)socket_ptr -> nx_tcp_socket_reserved_ptr;
848
849 if (&(client_ptr -> nxd_mqtt_client_socket) == socket_ptr)
850 {
851 /* Set the event flag. */
852 #ifndef NXD_MQTT_CLOUD_ENABLE
853 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_PACKET_RECEIVE_EVENT, TX_OR);
854 #else
855 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PACKET_RECEIVE_EVENT);
856 #endif /* NXD_MQTT_CLOUD_ENABLE */
857 }
858 }
859
860 /**************************************************************************/
861 /* */
862 /* FUNCTION RELEASE */
863 /* */
864 /* _nxd_mqtt_copy_transmit_packet PORTABLE C */
865 /* 6.1.8 */
866 /* AUTHOR */
867 /* */
868 /* Yuxin Zhou, Microsoft Corporation */
869 /* */
870 /* DESCRIPTION */
871 /* */
872 /* This internal function saves a transmit packet. */
873 /* A transmit packet is allocated to store QoS 1 and 2 messages. */
874 /* Upon a message being properly acknowledged, the packet will */
875 /* be released. */
876 /* */
877 /* */
878 /* INPUT */
879 /* */
880 /* client_ptr Pointer to MQTT Client */
881 /* packet_ptr Pointer to the MQTT message */
882 /* packet to be saved */
883 /* new_packet_ptr Return a copied packet */
884 /* packet_id Current packet ID */
885 /* set_duplicate_flag Set duplicate flag for fixed */
886 /* header or not */
887 /* wait_option Timeout value */
888 /* */
889 /* OUTPUT */
890 /* */
891 /* status */
892 /* */
893 /* CALLS */
894 /* */
895 /* nx_packet_copy */
896 /* */
897 /* CALLED BY */
898 /* */
899 /* _nxd_mqtt_client_sub_unsub */
900 /* _nxd_mqtt_process_publish */
901 /* */
902 /* RELEASE HISTORY */
903 /* */
904 /* DATE NAME DESCRIPTION */
905 /* */
906 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
907 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
908 /* resulting in version 6.1 */
909 /* 08-02-2021 Yuxin Zhou Modified comment(s), */
910 /* supported maximum transmit */
911 /* queue depth, */
912 /* resulting in version 6.1.8 */
913 /* */
914 /**************************************************************************/
_nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET ** new_packet_ptr,USHORT packet_id,UCHAR set_duplicate_flag,UINT wait_option)915 static UINT _nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET **new_packet_ptr,
916 USHORT packet_id, UCHAR set_duplicate_flag, UINT wait_option)
917 {
918 UINT status;
919
920 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
921 if (client_ptr -> message_transmit_queue_depth >= NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH)
922 {
923
924 /* Hit the transmit queue depeth. No more packets should be queued. */
925 return(NX_TX_QUEUE_DEPTH);
926 }
927 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
928
929 /* Copy current packet. */
930 status = nx_packet_copy(packet_ptr, new_packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
931 if (status)
932 {
933
934 /* No available packet to be stored. */
935 return(NXD_MQTT_PACKET_POOL_FAILURE);
936 }
937
938 /* Save packet_id at the beginning of packet. */
939 *((USHORT *)(*new_packet_ptr) -> nx_packet_data_start) = packet_id;
940
941 if (set_duplicate_flag)
942 {
943
944 /* Update duplicate flag in fixed header. */
945 *((*new_packet_ptr) -> nx_packet_prepend_ptr) = (*((*new_packet_ptr) -> nx_packet_prepend_ptr)) | MQTT_PUBLISH_DUP_FLAG;
946 }
947
948 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
949 /* Increase the transmit queue depth. */
950 client_ptr -> message_transmit_queue_depth++;
951 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
952
953 return(NXD_MQTT_SUCCESS);
954 }
955
956 /**************************************************************************/
957 /* */
958 /* FUNCTION RELEASE */
959 /* */
960 /* _nxd_mqtt_release_transmit_packet PORTABLE C */
961 /* 6.1.8 */
962 /* AUTHOR */
963 /* */
964 /* Yuxin Zhou, Microsoft Corporation */
965 /* */
966 /* DESCRIPTION */
967 /* */
968 /* This internal function releases a transmit packet. */
969 /* A transmit packet is allocated to store QoS 1 and 2 messages. */
970 /* Upon a message being properly acknowledged, the packet can */
971 /* be released. */
972 /* */
973 /* */
974 /* INPUT */
975 /* */
976 /* client_ptr Pointer to MQTT Client */
977 /* packet_ptr Pointer to the MQTT message */
978 /* packet to be removed */
979 /* previous_packet_ptr Pointer to the previous packet*/
980 /* or NULL if none exists */
981 /* */
982 /* OUTPUT */
983 /* */
984 /* None */
985 /* */
986 /* CALLS */
987 /* */
988 /* None */
989 /* */
990 /* CALLED BY */
991 /* */
992 /* _nxd_mqtt_thread_entry */
993 /* _nxd_mqtt_client_event_process */
994 /* */
995 /* RELEASE HISTORY */
996 /* */
997 /* DATE NAME DESCRIPTION */
998 /* */
999 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1000 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1001 /* resulting in version 6.1 */
1002 /* 08-02-2021 Yuxin Zhou Modified comment(s), */
1003 /* supported maximum transmit */
1004 /* queue depth, */
1005 /* resulting in version 6.1.8 */
1006 /* */
1007 /**************************************************************************/
_nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET * previous_packet_ptr)1008 static VOID _nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr)
1009 {
1010
1011 if (previous_packet_ptr)
1012 {
1013 previous_packet_ptr -> nx_packet_queue_next = packet_ptr -> nx_packet_queue_next;
1014 }
1015 else
1016 {
1017 client_ptr -> message_transmit_queue_head = packet_ptr -> nx_packet_queue_next;
1018 }
1019
1020 if (packet_ptr == client_ptr -> message_transmit_queue_tail)
1021 {
1022 client_ptr -> message_transmit_queue_tail = previous_packet_ptr;
1023 }
1024 nx_packet_release(packet_ptr);
1025
1026 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
1027 client_ptr -> message_transmit_queue_depth--;
1028 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
1029 }
1030
1031 /**************************************************************************/
1032 /* */
1033 /* FUNCTION RELEASE */
1034 /* */
1035 /* _nxd_mqtt_release_receive_packet PORTABLE C */
1036 /* 6.1 */
1037 /* AUTHOR */
1038 /* */
1039 /* Yuxin Zhou, Microsoft Corporation */
1040 /* */
1041 /* DESCRIPTION */
1042 /* */
1043 /* This internal function releases a receive packet. */
1044 /* A receive packet is allocated to store QoS 1 and 2 messages. */
1045 /* Upon a message being properly acknowledged, the packet can */
1046 /* be released. */
1047 /* */
1048 /* */
1049 /* INPUT */
1050 /* */
1051 /* client_ptr Pointer to MQTT Client */
1052 /* packet_ptr Pointer to the MQTT message */
1053 /* packet to be removed */
1054 /* previous_packet_ptr Pointer to the previous packet*/
1055 /* or NULL if none exists */
1056 /* */
1057 /* OUTPUT */
1058 /* */
1059 /* None */
1060 /* */
1061 /* CALLS */
1062 /* */
1063 /* None */
1064 /* */
1065 /* CALLED BY */
1066 /* */
1067 /* _nxd_mqtt_thread_entry */
1068 /* _nxd_mqtt_client_event_process */
1069 /* */
1070 /* RELEASE HISTORY */
1071 /* */
1072 /* DATE NAME DESCRIPTION */
1073 /* */
1074 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1075 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1076 /* resulting in version 6.1 */
1077 /* */
1078 /**************************************************************************/
_nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET * previous_packet_ptr)1079 static VOID _nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr)
1080 {
1081
1082 if (previous_packet_ptr)
1083 {
1084 previous_packet_ptr -> nx_packet_queue_next = packet_ptr -> nx_packet_queue_next;
1085 }
1086 else
1087 {
1088 client_ptr -> message_receive_queue_head = packet_ptr -> nx_packet_queue_next;
1089 }
1090
1091 if (packet_ptr == client_ptr -> message_receive_queue_tail)
1092 {
1093 client_ptr -> message_receive_queue_tail = previous_packet_ptr;
1094 }
1095
1096 client_ptr -> message_receive_queue_depth--;
1097 }
1098
1099 /**************************************************************************/
1100 /* */
1101 /* FUNCTION RELEASE */
1102 /* */
1103 /* _nxd_mqtt_process_connack PORTABLE C */
1104 /* 6.1 */
1105 /* AUTHOR */
1106 /* */
1107 /* Yuxin Zhou, Microsoft Corporation */
1108 /* */
1109 /* DESCRIPTION */
1110 /* */
1111 /* This internal function processes a CONNACK message from the broker. */
1112 /* */
1113 /* */
1114 /* INPUT */
1115 /* */
1116 /* client_ptr Pointer to MQTT Client */
1117 /* packet_ptr Pointer to the packet */
1118 /* wait_option Timeout value */
1119 /* */
1120 /* OUTPUT */
1121 /* */
1122 /* status */
1123 /* */
1124 /* CALLS */
1125 /* */
1126 /* [nxd_mqtt_connect_notify] User supplied connect */
1127 /* callback function */
1128 /* tx_mutex_get */
1129 /* tx_mutex_put */
1130 /* _nxd_mqtt_client_retransmit_message */
1131 /* _nxd_mqtt_client_connection_end */
1132 /* */
1133 /* */
1134 /* CALLED BY */
1135 /* */
1136 /* _nxd_mqtt_packet_receive_process */
1137 /* */
1138 /* RELEASE HISTORY */
1139 /* */
1140 /* DATE NAME DESCRIPTION */
1141 /* */
1142 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1143 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1144 /* resulting in version 6.1 */
1145 /* */
1146 /**************************************************************************/
_nxd_mqtt_process_connack(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,ULONG wait_option)1147 static UINT _nxd_mqtt_process_connack(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, ULONG wait_option)
1148 {
1149
1150 UINT ret = NXD_MQTT_COMMUNICATION_FAILURE;
1151 MQTT_PACKET_CONNACK *connack_packet_ptr = (MQTT_PACKET_CONNACK *)(packet_ptr -> nx_packet_prepend_ptr);
1152
1153
1154 /* Check the length. */
1155 if ((packet_ptr -> nx_packet_length != sizeof(MQTT_PACKET_CONNACK)) ||
1156 (connack_packet_ptr -> mqtt_connack_packet_header >> 4 != MQTT_CONTROL_PACKET_TYPE_CONNACK))
1157 {
1158 /* Invalid packet length. Free the packet and process error. */
1159 ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1160 }
1161 else
1162 {
1163
1164 /* Check remaining length. */
1165 if (connack_packet_ptr -> mqtt_connack_packet_remaining_length != 2)
1166 {
1167 ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1168 }
1169 /* Follow MQTT-3.2.2-1 rule. */
1170 else if ((client_ptr -> nxd_mqtt_clean_session) && (connack_packet_ptr -> mqtt_connack_packet_ack_flags & MQTT_CONNACK_CONNECT_FLAGS_SP))
1171 {
1172
1173 /* Client requested clean session, and server responded with Session Present. This is a violation. */
1174 ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1175 }
1176 else if (connack_packet_ptr -> mqtt_connack_packet_return_code > MQTT_CONNACK_CONNECT_RETURN_CODE_NOT_AUTHORIZED)
1177 {
1178 ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1179 }
1180 else if (connack_packet_ptr -> mqtt_connack_packet_return_code > 0)
1181 {
1182
1183 /* Pass the server return code to the application. */
1184 ret = (UINT)(NXD_MQTT_ERROR_CONNECT_RETURN_CODE + connack_packet_ptr -> mqtt_connack_packet_return_code);
1185 }
1186 else
1187 {
1188 ret = NXD_MQTT_SUCCESS;
1189
1190 /* Obtain mutex before we modify client control block. */
1191 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
1192
1193 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_CONNECTED;
1194
1195 /* Initialize the packet identification field. */
1196 client_ptr -> nxd_mqtt_client_packet_identifier = NXD_MQTT_INITIAL_PACKET_ID_VALUE;
1197
1198 /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
1199 if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
1200 client_ptr -> nxd_mqtt_client_packet_identifier = 1;
1201
1202 /* Release mutex */
1203 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1204 }
1205 }
1206
1207 /* Check callback function. */
1208 if (client_ptr -> nxd_mqtt_connect_notify)
1209 {
1210 client_ptr -> nxd_mqtt_connect_notify(client_ptr, ret, client_ptr -> nxd_mqtt_connect_context);
1211 }
1212
1213 if (ret == NXD_MQTT_SUCCESS)
1214 {
1215
1216 /* If client doesn't start with Clean Session, and there are un-acked PUBLISH messages,
1217 we shall re-publish these messages. */
1218 if ((client_ptr -> nxd_mqtt_clean_session != NX_TRUE) && (client_ptr -> message_transmit_queue_head))
1219 {
1220
1221 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
1222
1223 /* There are messages from the previous session that has not been acknowledged. */
1224 ret = _nxd_mqtt_client_retransmit_message(client_ptr, wait_option);
1225
1226 /* Release mutex */
1227 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1228 }
1229 }
1230 else
1231 {
1232
1233 /* End connection. */
1234 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
1235 }
1236
1237 return(ret);
1238 }
1239
1240 /**************************************************************************/
1241 /* */
1242 /* FUNCTION RELEASE */
1243 /* */
1244 /* _nxd_mqtt_process_publish_packet PORTABLE C */
1245 /* 6.1 */
1246 /* AUTHOR */
1247 /* */
1248 /* Yuxin Zhou, Microsoft Corporation */
1249 /* */
1250 /* DESCRIPTION */
1251 /* */
1252 /* This internal function processes a packet and parses topic and */
1253 /* message. */
1254 /* */
1255 /* */
1256 /* INPUT */
1257 /* */
1258 /* packet_ptr Pointer to the packet */
1259 /* topic_offset_ptr Return topic offset */
1260 /* topic_length_ptr Return topic length */
1261 /* message_offset_ptr Return message offset */
1262 /* message_length_ptr Return message length */
1263 /* */
1264 /* OUTPUT */
1265 /* */
1266 /* Status */
1267 /* */
1268 /* CALLS */
1269 /* */
1270 /* _nxd_mqtt_read_remaining_length */
1271 /* nx_packet_data_extract_offset */
1272 /* */
1273 /* */
1274 /* CALLED BY */
1275 /* */
1276 /* _nxd_mqtt_process_publish */
1277 /* */
1278 /* RELEASE HISTORY */
1279 /* */
1280 /* DATE NAME DESCRIPTION */
1281 /* */
1282 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1283 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1284 /* resulting in version 6.1 */
1285 /* */
1286 /**************************************************************************/
_nxd_mqtt_process_publish_packet(NX_PACKET * packet_ptr,ULONG * topic_offset_ptr,USHORT * topic_length_ptr,ULONG * message_offset_ptr,ULONG * message_length_ptr)1287 UINT _nxd_mqtt_process_publish_packet(NX_PACKET *packet_ptr, ULONG *topic_offset_ptr, USHORT *topic_length_ptr,
1288 ULONG *message_offset_ptr, ULONG *message_length_ptr)
1289 {
1290 UCHAR QoS;
1291 UINT remaining_length = 0;
1292 UINT topic_length;
1293 ULONG offset;
1294 UCHAR bytes[2];
1295 ULONG bytes_copied;
1296
1297
1298 QoS = (UCHAR)((*(packet_ptr -> nx_packet_prepend_ptr) & MQTT_PUBLISH_QOS_LEVEL_FIELD) >> 1);
1299
1300 if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1301 {
1302 return(NXD_MQTT_INVALID_PACKET);
1303 }
1304
1305 if (remaining_length < 2)
1306 {
1307 return(NXD_MQTT_INVALID_PACKET);
1308 }
1309
1310 /* Get topic length fields. */
1311 if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1312 (bytes_copied != sizeof(bytes)))
1313 {
1314 return(NXD_MQTT_INVALID_PACKET);
1315 }
1316
1317 topic_length = (UINT)(*(bytes) << 8) | (*(bytes + 1));
1318
1319 if (topic_length > remaining_length - 2u)
1320 {
1321 return(NXD_MQTT_INVALID_PACKET);
1322 }
1323
1324 *topic_offset_ptr = offset + 2;
1325 *topic_length_ptr = (USHORT)topic_length;
1326
1327 remaining_length = remaining_length - topic_length - 2;
1328 if ((QoS == 1) || (QoS == 2))
1329 {
1330 offset += 2 + 2 + topic_length;
1331
1332 if (remaining_length < 2)
1333 {
1334 return(NXD_MQTT_INVALID_PACKET);
1335 }
1336 remaining_length = remaining_length - 2;
1337 }
1338 else
1339 {
1340 offset += 2 + topic_length;
1341 }
1342
1343 *message_offset_ptr = offset;
1344 *message_length_ptr = (ULONG)remaining_length;
1345
1346 /* Return */
1347 return(NXD_MQTT_SUCCESS);
1348 }
1349 /**************************************************************************/
1350 /* */
1351 /* FUNCTION RELEASE */
1352 /* */
1353 /* _nxd_mqtt_process_publish PORTABLE C */
1354 /* 6.3.0 */
1355 /* AUTHOR */
1356 /* */
1357 /* Yuxin Zhou, Microsoft Corporation */
1358 /* */
1359 /* DESCRIPTION */
1360 /* */
1361 /* This internal function process a publish message from the broker. */
1362 /* */
1363 /* */
1364 /* INPUT */
1365 /* */
1366 /* client_ptr Pointer to MQTT Client */
1367 /* packet_ptr Pointer to the packet */
1368 /* */
1369 /* OUTPUT */
1370 /* */
1371 /* NX_TRUE - packet is consumed */
1372 /* NX_FALSE - packet is not consumed */
1373 /* */
1374 /* CALLS */
1375 /* */
1376 /* [receive_notify] User supplied receive */
1377 /* callback function */
1378 /* _nxd_mqtt_client_packet_allocate */
1379 /* _nxd_mqtt_packet_send */
1380 /* nx_packet_release */
1381 /* nx_secure_tls_session_send */
1382 /* _nxd_mqtt_process_publish_packet */
1383 /* _nxd_mqtt_copy_transmit_packet */
1384 /* */
1385 /* */
1386 /* CALLED BY */
1387 /* */
1388 /* _nxd_mqtt_packet_receive_process */
1389 /* */
1390 /* RELEASE HISTORY */
1391 /* */
1392 /* DATE NAME DESCRIPTION */
1393 /* */
1394 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1395 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1396 /* resulting in version 6.1 */
1397 /* 10-31-2022 Bo Chen Modified comment(s), improved */
1398 /* the logic of sending packet,*/
1399 /* resulting in version 6.2.0 */
1400 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
1401 /* internal logic, */
1402 /* resulting in version 6.3.0 */
1403 /* */
1404 /**************************************************************************/
_nxd_mqtt_process_publish(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1405 static UINT _nxd_mqtt_process_publish(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1406 {
1407 MQTT_PACKET_PUBLISH_RESPONSE *pubresp_ptr;
1408 UINT status;
1409 USHORT packet_id = 0;
1410 UCHAR QoS;
1411 UINT enqueue_message = 0;
1412 NX_PACKET *transmit_packet_ptr;
1413 UINT remaining_length = 0;
1414 UINT packet_consumed = NX_FALSE;
1415 UCHAR fixed_header;
1416 USHORT transmit_packet_id;
1417 UINT topic_length;
1418 ULONG offset;
1419 UCHAR bytes[2];
1420 ULONG bytes_copied;
1421
1422 QoS = (UCHAR)((*(packet_ptr -> nx_packet_prepend_ptr) & MQTT_PUBLISH_QOS_LEVEL_FIELD) >> 1);
1423
1424 if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1425 {
1426 return(NX_FALSE);
1427 }
1428
1429 if (remaining_length < 2)
1430 {
1431 return(NXD_MQTT_INVALID_PACKET);
1432 }
1433
1434 /* Get topic length fields. */
1435 if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1436 (bytes_copied != sizeof(bytes)))
1437 {
1438 return(NXD_MQTT_INVALID_PACKET);
1439 }
1440
1441 topic_length = (UINT)(*(bytes) << 8) | (*(bytes + 1));
1442
1443 if (topic_length > remaining_length - 2u)
1444 {
1445 return(NXD_MQTT_INVALID_PACKET);
1446 }
1447
1448 if (QoS == 0)
1449 {
1450 enqueue_message = 1;
1451 }
1452 else
1453 {
1454 /* QoS 1 or QoS 2 messages. */
1455 /* Get packet id fields. */
1456 if (nx_packet_data_extract_offset(packet_ptr, offset + 2 + topic_length, &bytes, sizeof(bytes), &bytes_copied))
1457 {
1458 return(NXD_MQTT_INVALID_PACKET);
1459 }
1460
1461 packet_id = (USHORT)(((*bytes) << 8) | (*(bytes + 1)));
1462
1463 /* Look for an existing transmit packets with the same packet id */
1464 transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1465
1466 while (transmit_packet_ptr)
1467 {
1468 fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1469 transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1470 if ((transmit_packet_id == packet_id) &&
1471 ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_PUBREC << 4)))
1472 {
1473
1474 /* Found a packet containing the packet_id */
1475 break;
1476 }
1477 transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1478 }
1479
1480 if (transmit_packet_ptr)
1481 {
1482 /* This published data is already in our system. No need to deliver this message to the application. */
1483 enqueue_message = 0;
1484 }
1485 else
1486 {
1487 enqueue_message = 1;
1488 }
1489 }
1490
1491 if (enqueue_message)
1492 {
1493 if (packet_ptr -> nx_packet_length > (offset + remaining_length))
1494 {
1495
1496 /* This packet contains multiple messages. */
1497 if (nx_packet_copy(packet_ptr, &packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_NO_WAIT))
1498 {
1499
1500 /* No packet is available. */
1501 return(NX_FALSE);
1502 }
1503 }
1504 else
1505 {
1506 packet_consumed = NX_TRUE;
1507 }
1508
1509 /* Increment the queue depth counter. */
1510 client_ptr -> message_receive_queue_depth++;
1511
1512 if (client_ptr -> message_receive_queue_head == NX_NULL)
1513 {
1514 client_ptr -> message_receive_queue_head = packet_ptr;
1515 }
1516 else
1517 {
1518 client_ptr -> message_receive_queue_tail -> nx_packet_queue_next = packet_ptr;
1519 }
1520 client_ptr -> message_receive_queue_tail = packet_ptr;
1521
1522 /* Invoke the user-defined receive notify function if it is set. */
1523 if (client_ptr -> nxd_mqtt_client_receive_notify)
1524 {
1525 (*(client_ptr -> nxd_mqtt_client_receive_notify))(client_ptr, client_ptr -> message_receive_queue_depth);
1526 }
1527 }
1528
1529 /* If the message QoS level is 0, we are done. */
1530 if (QoS == 0)
1531 {
1532 /* Return */
1533 return(packet_consumed);
1534 }
1535
1536 /* Send out proper ACKs for QoS 1 and 2 messages. */
1537 /* Allocate a new packet so we can send out a response. */
1538 status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
1539 if (status)
1540 {
1541 /* Packet allocation fails. */
1542 return(packet_consumed);
1543 }
1544
1545 /* Fill in the packet ID */
1546 pubresp_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)(packet_ptr -> nx_packet_prepend_ptr);
1547 pubresp_ptr -> mqtt_publish_response_packet_remaining_length = 2;
1548 pubresp_ptr -> mqtt_publish_response_packet_packet_identifier_msb = (UCHAR)(packet_id >> 8);
1549 pubresp_ptr -> mqtt_publish_response_packet_packet_identifier_lsb = (UCHAR)(packet_id & 0xFF);
1550
1551 if (QoS == 1)
1552 {
1553
1554 pubresp_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBACK << 4;
1555 }
1556 else
1557 {
1558 pubresp_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBREC << 4;
1559 }
1560
1561 packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + sizeof(MQTT_PACKET_PUBLISH_RESPONSE);
1562 packet_ptr -> nx_packet_length = sizeof(MQTT_PACKET_PUBLISH_RESPONSE);
1563
1564 if (QoS == 2)
1565 {
1566
1567 /* Copy packet for checking duplicate publish packet. */
1568 if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
1569 packet_id, NX_FALSE, NX_WAIT_FOREVER))
1570 {
1571
1572 /* Release the packet. */
1573 nx_packet_release(packet_ptr);
1574 return(packet_consumed);
1575 }
1576 if (client_ptr -> message_transmit_queue_head == NX_NULL)
1577 {
1578 client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
1579 }
1580 else
1581 {
1582 client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
1583 }
1584 client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
1585 }
1586
1587 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1588
1589 /* Send packet to server. */
1590 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
1591
1592 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
1593 if (status)
1594 {
1595
1596 /* Release the packet. */
1597 nx_packet_release(packet_ptr);
1598 }
1599 else
1600 {
1601 /* Update the timeout value. */
1602 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
1603 }
1604
1605 /* Return */
1606 return(packet_consumed);
1607 }
1608
1609 /**************************************************************************/
1610 /* */
1611 /* FUNCTION RELEASE */
1612 /* */
1613 /* _nxd_mqtt_process_publish_response PORTABLE C */
1614 /* 6.3.0 */
1615 /* AUTHOR */
1616 /* */
1617 /* Yuxin Zhou, Microsoft Corporation */
1618 /* */
1619 /* DESCRIPTION */
1620 /* */
1621 /* This internal function process a publish response messages. */
1622 /* Publish Response messages are: PUBACK, PUBREC, PUBREL */
1623 /* */
1624 /* INPUT */
1625 /* */
1626 /* client_ptr Pointer to MQTT Client */
1627 /* packet_ptr Pointer to the packet */
1628 /* */
1629 /* OUTPUT */
1630 /* */
1631 /* Status */
1632 /* */
1633 /* CALLS */
1634 /* */
1635 /* [nxd_mqtt_client_receive_notify] User supplied publish */
1636 /* callback function */
1637 /* _nxd_mqtt_release_transmit_packet */
1638 /* _nxd_mqtt_packet_send */
1639 /* _nxd_mqtt_client_packet_allocate */
1640 /* */
1641 /* */
1642 /* CALLED BY */
1643 /* */
1644 /* _nxd_mqtt_packet_receive_process */
1645 /* */
1646 /* RELEASE HISTORY */
1647 /* */
1648 /* DATE NAME DESCRIPTION */
1649 /* */
1650 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1651 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
1652 /* added ack receive notify, */
1653 /* resulting in version 6.1 */
1654 /* 10-31-2022 Bo Chen Modified comment(s), improved */
1655 /* the logic of sending packet,*/
1656 /* resulting in version 6.2.0 */
1657 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
1658 /* internal logic, */
1659 /* resulting in version 6.3.0 */
1660 /* */
1661 /**************************************************************************/
_nxd_mqtt_process_publish_response(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1662 static UINT _nxd_mqtt_process_publish_response(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1663 {
1664 MQTT_PACKET_PUBLISH_RESPONSE *response_ptr;
1665 USHORT packet_id;
1666 NX_PACKET *previous_packet_ptr;
1667 NX_PACKET *transmit_packet_ptr;
1668 NX_PACKET *response_packet;
1669 UINT ret;
1670 UCHAR fixed_header;
1671 USHORT transmit_packet_id;
1672
1673 response_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)(packet_ptr -> nx_packet_prepend_ptr);
1674
1675 /* Validate the packet. */
1676 if (response_ptr -> mqtt_publish_response_packet_remaining_length != 2)
1677 {
1678 /* Invalid remaining_length value. Return 1 so the caller can release
1679 the packet. */
1680
1681 return(1);
1682 }
1683
1684 packet_id = (USHORT)((response_ptr -> mqtt_publish_response_packet_packet_identifier_msb << 8) |
1685 (response_ptr -> mqtt_publish_response_packet_packet_identifier_lsb));
1686
1687 /* Search all the outstanding transmitted packets for a match. */
1688 previous_packet_ptr = NX_NULL;
1689 transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1690 while (transmit_packet_ptr)
1691 {
1692 fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1693 transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1694 if (transmit_packet_id == packet_id)
1695 {
1696
1697 /* Found the matching packet id */
1698 if (((response_ptr -> mqtt_publish_response_packet_header) >> 4) == MQTT_CONTROL_PACKET_TYPE_PUBACK)
1699 {
1700
1701 /* PUBACK is the response to a PUBLISH packet with QoS Level 1*/
1702 /* Therefore we verify that packet contains PUBLISH packet with QoS level 1*/
1703 if ((fixed_header & 0xF6) == ((MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4) | MQTT_PUBLISH_QOS_LEVEL_1))
1704 {
1705
1706 /* Check ack notify function. */
1707 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1708 {
1709
1710 /* Call notify function. Note: user routine should not release the packet. */
1711 client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_PUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1712 }
1713
1714 /* QoS Level1 message receives an ACK. */
1715 /* This message can be released. */
1716 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1717
1718 /* Return with value 1, so the caller will release packet_ptr */
1719 return(1);
1720 }
1721 }
1722 else if (((response_ptr -> mqtt_publish_response_packet_header) >> 4) == MQTT_CONTROL_PACKET_TYPE_PUBREL)
1723 {
1724
1725 /* QoS 2 publish Release received, part 2. */
1726 /* Therefore we verify that packet contains PUBLISH packet with QoS level 2*/
1727 if ((fixed_header & 0xF6) == (MQTT_CONTROL_PACKET_TYPE_PUBREC << 4))
1728 {
1729
1730 /* QoS Level2 message receives an ACK. */
1731 /* This message can be released. */
1732 /* Send PUBCOMP */
1733
1734 /* Allocate a packet to send the response. */
1735 ret = _nxd_mqtt_client_packet_allocate(client_ptr, &response_packet, NX_WAIT_FOREVER);
1736 if (ret)
1737 {
1738 return(1);
1739 }
1740
1741 if (4u > ((ULONG)(response_packet -> nx_packet_data_end) - (ULONG)(response_packet -> nx_packet_append_ptr)))
1742 {
1743 nx_packet_release(response_packet);
1744
1745 /* Packet buffer is too small to hold the message. */
1746 return(NX_SIZE_ERROR);
1747 }
1748
1749 response_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)response_packet -> nx_packet_prepend_ptr;
1750
1751 response_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBCOMP << 4;
1752 response_ptr -> mqtt_publish_response_packet_remaining_length = 2;
1753
1754 /* Fill in packet ID */
1755 response_packet -> nx_packet_prepend_ptr[3] = packet_ptr -> nx_packet_prepend_ptr[3];
1756 response_packet -> nx_packet_prepend_ptr[4] = packet_ptr -> nx_packet_prepend_ptr[4];
1757 response_packet -> nx_packet_append_ptr = response_packet -> nx_packet_prepend_ptr + 4;
1758 response_packet -> nx_packet_length = 4;
1759
1760 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1761
1762 /* Send packet to server. */
1763 ret = _nxd_mqtt_packet_send(client_ptr, response_packet, NX_WAIT_FOREVER);
1764
1765 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
1766
1767 /* Update the timeout value. */
1768 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
1769
1770 if (ret)
1771 {
1772 nx_packet_release(response_packet);
1773 }
1774
1775 /* Check ack notify function. */
1776 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1777 {
1778
1779 /* Call notify function. Note: user routine should not release the packet. */
1780 client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_PUBREL, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1781 }
1782
1783 /* This packet can be released. */
1784 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1785
1786 /* Return with value 1, so the caller will release packet_ptr */
1787 return(1);
1788 }
1789 }
1790 }
1791
1792 /* Move on to the next packet */
1793 previous_packet_ptr = transmit_packet_ptr;
1794 transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1795 }
1796
1797 /* nothing is found. Return 1 to release the packet.*/
1798 return(1);
1799 }
1800
1801 /**************************************************************************/
1802 /* */
1803 /* FUNCTION RELEASE */
1804 /* */
1805 /* _nxd_mqtt_process_sub_unsub_ack PORTABLE C */
1806 /* 6.1 */
1807 /* AUTHOR */
1808 /* */
1809 /* Yuxin Zhou, Microsoft Corporation */
1810 /* */
1811 /* DESCRIPTION */
1812 /* */
1813 /* This internal function process an ACK message for subscribe */
1814 /* or unsubscribe request. */
1815 /* */
1816 /* INPUT */
1817 /* */
1818 /* client_ptr Pointer to MQTT Client */
1819 /* packet_ptr Pointer to the packet */
1820 /* */
1821 /* OUTPUT */
1822 /* */
1823 /* Status */
1824 /* */
1825 /* CALLS */
1826 /* */
1827 /* [nxd_mqtt_client_receive_notify] User supplied publish */
1828 /* callback function */
1829 /* _nxd_mqtt_release_transmit_packet */
1830 /* Release the memory block */
1831 /* _nxd_mqtt_read_remaining_length Skip the remaining length */
1832 /* field */
1833 /* */
1834 /* CALLED BY */
1835 /* */
1836 /* _nxd_mqtt_packet_receive_process */
1837 /* */
1838 /* RELEASE HISTORY */
1839 /* */
1840 /* DATE NAME DESCRIPTION */
1841 /* */
1842 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1843 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
1844 /* added ack receive notify, */
1845 /* resulting in version 6.1 */
1846 /* */
1847 /**************************************************************************/
_nxd_mqtt_process_sub_unsub_ack(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1848 static UINT _nxd_mqtt_process_sub_unsub_ack(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1849 {
1850
1851 USHORT packet_id;
1852 NX_PACKET *previous_packet_ptr;
1853 NX_PACKET *transmit_packet_ptr;
1854 UCHAR response_header;
1855 UCHAR fixed_header;
1856 USHORT transmit_packet_id;
1857 UINT remaining_length;
1858 ULONG offset;
1859 UCHAR bytes[2];
1860 ULONG bytes_copied;
1861
1862
1863 response_header = *(packet_ptr -> nx_packet_prepend_ptr);
1864
1865 if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1866 {
1867
1868 /* Unable to process the sub/unsub ack. Simply return and release the packet. */
1869 return(NXD_MQTT_INVALID_PACKET);
1870 }
1871
1872 /* Get packet id fields. */
1873 if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1874 (bytes_copied != sizeof(bytes)))
1875 {
1876 return(NXD_MQTT_INVALID_PACKET);
1877 }
1878
1879 packet_id = (USHORT)(((*bytes) << 8) | (*(bytes + 1)));
1880
1881 /* Search all the outstanding transmitted packets for a match. */
1882 previous_packet_ptr = NX_NULL;
1883 transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1884 while (transmit_packet_ptr)
1885 {
1886 fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1887 transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1888 if (transmit_packet_id == packet_id)
1889 {
1890
1891 /* Found the matching packet id */
1892 if (((response_header >> 4) == MQTT_CONTROL_PACKET_TYPE_SUBACK) &&
1893 ((fixed_header >> 4) == MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE))
1894 {
1895 /* Validate the packet. */
1896 if (remaining_length != 3)
1897 {
1898 /* Invalid remaining_length value. */
1899 return(1);
1900 }
1901
1902 /* Check ack notify function. */
1903 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1904 {
1905
1906 /* Call notify function. Note: user routine should not release the packet. */
1907 client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_SUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1908 }
1909
1910 /* Release the transmit packet. */
1911 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1912
1913 return(1);
1914 }
1915 else if (((response_header >> 4) == MQTT_CONTROL_PACKET_TYPE_UNSUBACK) &&
1916 ((fixed_header >> 4) == MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE))
1917 {
1918 /* Validate the packet. */
1919 if (remaining_length != 2)
1920 {
1921 /* Invalid remaining_length value. */
1922 return(1);
1923 }
1924
1925 /* Check ack notify function. */
1926 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1927 {
1928
1929 /* Call notify function. Note: user routine should not release the packet. */
1930 client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_UNSUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1931 }
1932
1933 /* Unsubscribe succeeded. */
1934 /* Release the transmit packet. */
1935 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1936
1937 return(1);
1938 }
1939 }
1940
1941 /* Move on to the next packet */
1942 previous_packet_ptr = transmit_packet_ptr;
1943 transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1944 }
1945 return(1);
1946 }
1947
1948
1949 /**************************************************************************/
1950 /* */
1951 /* FUNCTION RELEASE */
1952 /* */
1953 /* _nxd_mqtt_process_pingresp PORTABLE C */
1954 /* 6.1 */
1955 /* AUTHOR */
1956 /* */
1957 /* Yuxin Zhou, Microsoft Corporation */
1958 /* */
1959 /* DESCRIPTION */
1960 /* */
1961 /* This internal function process a PINGRESP message. */
1962 /* */
1963 /* INPUT */
1964 /* */
1965 /* client_ptr Pointer to MQTT Client */
1966 /* packet_ptr Pointer to the packet */
1967 /* */
1968 /* OUTPUT */
1969 /* */
1970 /* Status */
1971 /* */
1972 /* CALLS */
1973 /* */
1974 /* None */
1975 /* callback function */
1976 /* */
1977 /* CALLED BY */
1978 /* */
1979 /* _nxd_mqtt_packet_receive_process */
1980 /* */
1981 /* RELEASE HISTORY */
1982 /* */
1983 /* DATE NAME DESCRIPTION */
1984 /* */
1985 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1986 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1987 /* resulting in version 6.1 */
1988 /* */
1989 /**************************************************************************/
_nxd_mqtt_process_pingresp(NXD_MQTT_CLIENT * client_ptr)1990 static VOID _nxd_mqtt_process_pingresp(NXD_MQTT_CLIENT *client_ptr)
1991 {
1992
1993
1994 /* If there is an outstanding ping, mark it as responded. */
1995 if (client_ptr -> nxd_mqtt_ping_not_responded == NX_TRUE)
1996 {
1997 client_ptr -> nxd_mqtt_ping_not_responded = NX_FALSE;
1998
1999 client_ptr -> nxd_mqtt_ping_sent_time = 0;
2000 }
2001
2002 return;
2003 }
2004
2005
2006 /**************************************************************************/
2007 /* */
2008 /* FUNCTION RELEASE */
2009 /* */
2010 /* _nxd_mqtt_process_disconnect PORTABLE C */
2011 /* 6.1 */
2012 /* AUTHOR */
2013 /* */
2014 /* Yuxin Zhou, Microsoft Corporation */
2015 /* */
2016 /* DESCRIPTION */
2017 /* */
2018 /* This internal function process a DISCONNECT message. */
2019 /* */
2020 /* INPUT */
2021 /* */
2022 /* client_ptr Pointer to MQTT Client */
2023 /* packet_ptr Pointer to the packet */
2024 /* */
2025 /* OUTPUT */
2026 /* */
2027 /* None */
2028 /* */
2029 /* CALLS */
2030 /* */
2031 /* nx_secure_tls_session_send */
2032 /* nx_tcp_socket_disconnect */
2033 /* nx_tcp_client_socket_unbind */
2034 /* _nxd_mqtt_release_transmit_packet */
2035 /* _nxd_mqtt_release_receive_packet */
2036 /* _nxd_mqtt_client_connection_end */
2037 /* */
2038 /* CALLED BY */
2039 /* */
2040 /* _nxd_mqtt_packet_receive_process */
2041 /* */
2042 /* RELEASE HISTORY */
2043 /* */
2044 /* DATE NAME DESCRIPTION */
2045 /* */
2046 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2047 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2048 /* resulting in version 6.1 */
2049 /* */
2050 /**************************************************************************/
_nxd_mqtt_process_disconnect(NXD_MQTT_CLIENT * client_ptr)2051 static VOID _nxd_mqtt_process_disconnect(NXD_MQTT_CLIENT *client_ptr)
2052 {
2053 NX_PACKET *previous = NX_NULL;
2054 NX_PACKET *current;
2055 NX_PACKET *next;
2056 UINT disconnect_callback = NX_FALSE;
2057 UINT status;
2058 UCHAR fixed_header;
2059
2060 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2061 {
2062 /* State changes from CONNECTED TO IDLE. Call disconnect notify callback
2063 if the function is set. */
2064 disconnect_callback = NX_TRUE;
2065 }
2066 else if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTING)
2067 {
2068
2069 /* If state isn't CONNECTED or CONNECTING, just return. */
2070 return;
2071 }
2072
2073 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2074
2075 /* End connection. */
2076 _nxd_mqtt_client_connection_end(client_ptr, NXD_MQTT_SOCKET_TIMEOUT);
2077
2078 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
2079
2080 /* Free up sub/unsub packets on the transmit queue. */
2081 current = client_ptr -> message_transmit_queue_head;
2082
2083 while (current)
2084 {
2085 next = current -> nx_packet_queue_next;
2086 fixed_header = *(current -> nx_packet_prepend_ptr);
2087
2088 if (((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4)) ||
2089 ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE << 4)))
2090 {
2091 _nxd_mqtt_release_transmit_packet(client_ptr, current, previous);
2092 }
2093 else
2094 {
2095 previous = current;
2096 }
2097 current = next;
2098 }
2099
2100 /* If a callback notification is defined, call it now. */
2101 if ((disconnect_callback == NX_TRUE) && (client_ptr -> nxd_mqtt_disconnect_notify))
2102 {
2103 client_ptr -> nxd_mqtt_disconnect_notify(client_ptr);
2104 }
2105
2106 /* If a connect callback notification is defined and is still in connecting stage, call it now. */
2107 if ((disconnect_callback == NX_FALSE) && (client_ptr -> nxd_mqtt_connect_notify))
2108 {
2109 client_ptr -> nxd_mqtt_connect_notify(client_ptr, NXD_MQTT_CONNECT_FAILURE, client_ptr -> nxd_mqtt_connect_context);
2110 }
2111
2112 if (status == TX_SUCCESS)
2113 {
2114 /* Remove all the packets in the receive queue. */
2115 while (client_ptr -> message_receive_queue_head)
2116 {
2117 _nxd_mqtt_release_receive_packet(client_ptr, client_ptr -> message_receive_queue_head, NX_NULL);
2118 }
2119 client_ptr -> message_receive_queue_depth = 0;
2120
2121 /* Clear the MQTT_PACKET_RECEIVE_EVENT */
2122 #ifndef NXD_MQTT_CLOUD_ENABLE
2123 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, ~MQTT_PACKET_RECEIVE_EVENT, TX_AND);
2124 #else
2125 nx_cloud_module_event_clear(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PACKET_RECEIVE_EVENT);
2126 #endif /* NXD_MQTT_CLOUD_ENABLE */
2127 }
2128
2129 /* Clear flags if keep alive is enabled. */
2130 if (client_ptr -> nxd_mqtt_keepalive)
2131 {
2132 client_ptr -> nxd_mqtt_ping_not_responded = 0;
2133 client_ptr -> nxd_mqtt_ping_sent_time = 0;
2134 }
2135
2136 /* Clean up the information when disconnecting. */
2137 client_ptr -> nxd_mqtt_client_username = NX_NULL;
2138 client_ptr -> nxd_mqtt_client_password = NX_NULL;
2139 client_ptr -> nxd_mqtt_client_will_topic = NX_NULL;
2140 client_ptr -> nxd_mqtt_client_will_message = NX_NULL;
2141 client_ptr -> nxd_mqtt_client_will_qos_retain = 0;
2142
2143 /* Release current processing packet. */
2144 if (client_ptr -> nxd_mqtt_client_processing_packet)
2145 {
2146 nx_packet_release(client_ptr -> nxd_mqtt_client_processing_packet);
2147 client_ptr -> nxd_mqtt_client_processing_packet = NX_NULL;
2148 }
2149
2150 return;
2151 }
2152
2153
2154 /**************************************************************************/
2155 /* */
2156 /* FUNCTION RELEASE */
2157 /* */
2158 /* _nxd_mqtt_packet_receive_process PORTABLE C */
2159 /* 6.2.0 */
2160 /* AUTHOR */
2161 /* */
2162 /* Yuxin Zhou, Microsoft Corporation */
2163 /* */
2164 /* DESCRIPTION */
2165 /* */
2166 /* This internal function process MQTT message. */
2167 /* NOTE: MQTT Mutex is NOT obtained on entering this function. */
2168 /* Therefore it shouldn't hold the mutex when it exists this function. */
2169 /* */
2170 /* INPUT */
2171 /* */
2172 /* client_ptr Pointer to MQTT Client */
2173 /* */
2174 /* OUTPUT */
2175 /* */
2176 /* None */
2177 /* */
2178 /* CALLS */
2179 /* */
2180 /* nx_secure_tls_session_receive */
2181 /* nx_tcp_socket_receive */
2182 /* _nxd_mqtt_process_publish */
2183 /* _nxd_mqtt_process_publish_response */
2184 /* _nxd_mqtt_process_sub_unsub_ack */
2185 /* _nxd_mqtt_process_pingresp */
2186 /* _nxd_mqtt_process_disconnect */
2187 /* nx_packet_release */
2188 /* */
2189 /* CALLED BY */
2190 /* */
2191 /* _nxd_mqtt_client_event_process */
2192 /* */
2193 /* RELEASE HISTORY */
2194 /* */
2195 /* DATE NAME DESCRIPTION */
2196 /* */
2197 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2198 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2199 /* resulting in version 6.1 */
2200 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
2201 /* mqtt over websocket, */
2202 /* resulting in version 6.2.0 */
2203 /* */
2204 /**************************************************************************/
_nxd_mqtt_packet_receive_process(NXD_MQTT_CLIENT * client_ptr)2205 static VOID _nxd_mqtt_packet_receive_process(NXD_MQTT_CLIENT *client_ptr)
2206 {
2207 NX_PACKET *packet_ptr;
2208 NX_PACKET *previous_packet_ptr;
2209 UINT status;
2210 UCHAR packet_type;
2211 UINT remaining_length;
2212 UINT packet_consumed;
2213 ULONG offset;
2214 ULONG bytes_copied;
2215 ULONG packet_length;
2216
2217 for (;;)
2218 {
2219
2220 /* Release the mutex. */
2221 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2222
2223 /* Make a receive call. */
2224 status = _nxd_mqtt_packet_receive(client_ptr, &packet_ptr, NX_NO_WAIT);
2225
2226 if (status != NX_SUCCESS)
2227 {
2228 if ((status != NX_NO_PACKET) && (status != NX_CONTINUE))
2229 {
2230
2231 /* Network issue. Close the MQTT session. */
2232 #ifndef NXD_MQTT_CLOUD_ENABLE
2233 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_NETWORK_DISCONNECT_EVENT, TX_OR);
2234 #else
2235 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_NETWORK_DISCONNECT_EVENT);
2236 #endif /* NXD_MQTT_CLOUD_ENABLE */
2237 }
2238
2239 break;
2240 }
2241
2242 /* Obtain the mutex. */
2243 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
2244
2245 /* Is there a packet waiting for processing? */
2246 if (client_ptr -> nxd_mqtt_client_processing_packet)
2247 {
2248
2249 /* Yes. Link received packet to existing one. */
2250 if (client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last)
2251 {
2252 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last -> nx_packet_next = packet_ptr;
2253 }
2254 else
2255 {
2256 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_next = packet_ptr;
2257 }
2258 if (packet_ptr -> nx_packet_last)
2259 {
2260 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last = packet_ptr -> nx_packet_last;
2261 }
2262 else
2263 {
2264 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last = packet_ptr;
2265 }
2266 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_length += packet_ptr -> nx_packet_length;
2267
2268 /* Start to process existing packet. */
2269 packet_ptr = client_ptr -> nxd_mqtt_client_processing_packet;
2270 client_ptr -> nxd_mqtt_client_processing_packet = NX_NULL;
2271 }
2272
2273 /* Check notify function. */
2274 if (client_ptr -> nxd_mqtt_packet_receive_notify)
2275 {
2276
2277 /* Call notify function. Return NX_TRUE if the packet has been consumed. */
2278 if (client_ptr -> nxd_mqtt_packet_receive_notify(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_packet_receive_context) == NX_TRUE)
2279 {
2280 continue;
2281 }
2282 }
2283
2284 packet_consumed = NX_FALSE;
2285 while (packet_ptr)
2286 {
2287 /* Parse the incoming packet. */
2288 status = _nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset);
2289 if (status == NXD_MQTT_PARTIAL_PACKET)
2290 {
2291
2292 /* We only have partial MQTT message.
2293 * Put it to waiting list for more packets. */
2294 client_ptr -> nxd_mqtt_client_processing_packet = packet_ptr;
2295 packet_consumed = NX_TRUE;
2296 break;
2297 }
2298 else if (status)
2299 {
2300
2301 /* Invalid packet. */
2302 break;
2303 }
2304
2305 /* Get packet type. */
2306 if (nx_packet_data_extract_offset(packet_ptr, 0, &packet_type, 1, &bytes_copied))
2307 {
2308
2309 /* Unable to read packet type. */
2310 break;
2311 }
2312
2313 /* Right shift 4 bits to get the packet type. */
2314 packet_type = packet_type >> 4;
2315
2316 /* Process based on packet type. */
2317 switch (packet_type)
2318 {
2319 case MQTT_CONTROL_PACKET_TYPE_CONNECT:
2320 /* Client does not accept connections. Nothing needs to be done. */
2321 break;
2322 case MQTT_CONTROL_PACKET_TYPE_CONNACK:
2323 _nxd_mqtt_process_connack(client_ptr, packet_ptr, NX_NO_WAIT);
2324 break;
2325
2326 case MQTT_CONTROL_PACKET_TYPE_PUBLISH:
2327 packet_consumed = _nxd_mqtt_process_publish(client_ptr, packet_ptr);
2328 break;
2329
2330 case MQTT_CONTROL_PACKET_TYPE_PUBACK:
2331 case MQTT_CONTROL_PACKET_TYPE_PUBREL:
2332 _nxd_mqtt_process_publish_response(client_ptr, packet_ptr);
2333 break;
2334
2335 case MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE:
2336 case MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE:
2337 /* Client should not process subscribe or unsubscribe message. */
2338 break;
2339
2340 case MQTT_CONTROL_PACKET_TYPE_SUBACK:
2341 case MQTT_CONTROL_PACKET_TYPE_UNSUBACK:
2342 _nxd_mqtt_process_sub_unsub_ack(client_ptr, packet_ptr);
2343 break;
2344
2345
2346 case MQTT_CONTROL_PACKET_TYPE_PINGREQ:
2347 /* Client is not supposed to receive ping req. Ignore it. */
2348 break;
2349
2350 case MQTT_CONTROL_PACKET_TYPE_PINGRESP:
2351 _nxd_mqtt_process_pingresp(client_ptr);
2352 break;
2353
2354 case MQTT_CONTROL_PACKET_TYPE_DISCONNECT:
2355 _nxd_mqtt_process_disconnect(client_ptr);
2356 break;
2357
2358 /* Publisher sender message type for QoS 2. Not supported. */
2359 case MQTT_CONTROL_PACKET_TYPE_PUBREC:
2360 case MQTT_CONTROL_PACKET_TYPE_PUBCOMP:
2361 default:
2362 /* Unknown type. */
2363 break;
2364 }
2365
2366 if (packet_consumed)
2367 {
2368 break;
2369 }
2370
2371 /* Trim current packet. */
2372 offset += remaining_length;
2373 packet_length = packet_ptr -> nx_packet_length;
2374 if (packet_length > offset)
2375 {
2376
2377 /* Multiple MQTT message in one packet. */
2378 packet_length = packet_ptr -> nx_packet_length - offset;
2379 while ((ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr) <= offset)
2380 {
2381 offset -= (ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr);
2382
2383 /* Current packet can be released. */
2384 previous_packet_ptr = packet_ptr;
2385 packet_ptr = packet_ptr -> nx_packet_next;
2386 previous_packet_ptr -> nx_packet_next = NX_NULL;
2387 nx_packet_release(previous_packet_ptr);
2388 if (packet_ptr == NX_NULL)
2389 {
2390
2391 /* Invalid packet. */
2392 break;
2393 }
2394 }
2395
2396 if (packet_ptr)
2397 {
2398
2399 /* Adjust current packet. */
2400 packet_ptr -> nx_packet_prepend_ptr = packet_ptr -> nx_packet_prepend_ptr + offset;
2401 packet_ptr -> nx_packet_length = packet_length;
2402 }
2403 }
2404 else
2405 {
2406
2407 /* All messages in current packet is processed. */
2408 break;
2409 }
2410 }
2411
2412 if (!packet_consumed)
2413 {
2414 nx_packet_release(packet_ptr);
2415 }
2416 }
2417
2418 /* No more data in the receive queue. Return. */
2419
2420 return;
2421 }
2422
2423
2424 /**************************************************************************/
2425 /* */
2426 /* FUNCTION RELEASE */
2427 /* */
2428 /* _nxd_mqtt_tcp_establish_process PORTABLE C */
2429 /* 6.2.0 */
2430 /* AUTHOR */
2431 /* */
2432 /* Yuxin Zhou, Microsoft Corporation */
2433 /* */
2434 /* DESCRIPTION */
2435 /* */
2436 /* This function processes MQTT TCP connection establish event. */
2437 /* */
2438 /* INPUT */
2439 /* */
2440 /* client_ptr Pointer to MQTT Client */
2441 /* */
2442 /* OUTPUT */
2443 /* */
2444 /* None */
2445 /* */
2446 /* CALLS */
2447 /* */
2448 /* nx_secure_tls_session_start */
2449 /* _nxd_mqtt_client_connection_end */
2450 /* _nxd_mqtt_client_connect_packet_send */
2451 /* */
2452 /* CALLED BY */
2453 /* */
2454 /* _nxd_mqtt_client_event_process */
2455 /* */
2456 /* RELEASE HISTORY */
2457 /* */
2458 /* DATE NAME DESCRIPTION */
2459 /* */
2460 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2461 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2462 /* resulting in version 6.1 */
2463 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
2464 /* mqtt over websocket, */
2465 /* resulting in version 6.2.0 */
2466 /* */
2467 /**************************************************************************/
_nxd_mqtt_tcp_establish_process(NXD_MQTT_CLIENT * client_ptr)2468 static VOID _nxd_mqtt_tcp_establish_process(NXD_MQTT_CLIENT *client_ptr)
2469 {
2470 UINT status;
2471
2472
2473 /* TCP connection is established. */
2474
2475 /* If TLS is enabled, start TLS */
2476 #ifdef NX_SECURE_ENABLE
2477 if (client_ptr -> nxd_mqtt_client_use_tls)
2478 {
2479 status = nx_secure_tls_session_start(&(client_ptr -> nxd_mqtt_tls_session), &(client_ptr -> nxd_mqtt_client_socket), NX_NO_WAIT);
2480
2481 if (status != NX_CONTINUE)
2482 {
2483
2484 /* End connection. */
2485 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2486
2487 /* Check callback function. */
2488 if (client_ptr -> nxd_mqtt_connect_notify)
2489 {
2490 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2491 }
2492
2493 return;
2494 }
2495
2496 /* TLS in progress. */
2497 client_ptr -> nxd_mqtt_tls_in_progress = NX_TRUE;
2498
2499 return;
2500 }
2501 #endif /* NX_SECURE_ENABLE */
2502
2503 #ifdef NXD_MQTT_OVER_WEBSOCKET
2504
2505 /* If using websocket, start websocket connection. */
2506 if (client_ptr -> nxd_mqtt_client_use_websocket)
2507 {
2508 status = nx_websocket_client_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_client_socket),
2509 client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
2510 client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
2511 (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
2512 NX_NO_WAIT);
2513
2514 if (status != NX_IN_PROGRESS)
2515 {
2516
2517 /* End connection. */
2518 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2519
2520 /* Check callback function. */
2521 if (client_ptr -> nxd_mqtt_connect_notify)
2522 {
2523 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2524 }
2525
2526 return;
2527 }
2528
2529 return;
2530 }
2531 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2532
2533 /* Start to send MQTT connect packet. */
2534 status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
2535
2536 /* Check status. */
2537 if (status)
2538 {
2539
2540 /* End connection. */
2541 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2542
2543 /* Check callback function. */
2544 if (client_ptr -> nxd_mqtt_connect_notify)
2545 {
2546 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2547 }
2548 }
2549 }
2550
2551
2552 #ifdef NX_SECURE_ENABLE
2553 /**************************************************************************/
2554 /* */
2555 /* FUNCTION RELEASE */
2556 /* */
2557 /* _nxd_mqtt_tls_establish_process PORTABLE C */
2558 /* 6.2.0 */
2559 /* AUTHOR */
2560 /* */
2561 /* Yuxin Zhou, Microsoft Corporation */
2562 /* */
2563 /* DESCRIPTION */
2564 /* */
2565 /* This function processes TLS connection establish event. */
2566 /* */
2567 /* INPUT */
2568 /* */
2569 /* client_ptr Pointer to MQTT Client */
2570 /* */
2571 /* OUTPUT */
2572 /* */
2573 /* None */
2574 /* */
2575 /* CALLS */
2576 /* */
2577 /* _nx_secure_tls_handshake_process */
2578 /* _nxd_mqtt_client_connect_packet_send */
2579 /* _nxd_mqtt_client_connection_end */
2580 /* */
2581 /* CALLED BY */
2582 /* */
2583 /* _nxd_mqtt_client_event_process */
2584 /* */
2585 /* RELEASE HISTORY */
2586 /* */
2587 /* DATE NAME DESCRIPTION */
2588 /* */
2589 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2590 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2591 /* resulting in version 6.1 */
2592 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
2593 /* mqtt over websocket, */
2594 /* resulting in version 6.2.0 */
2595 /* */
2596 /**************************************************************************/
_nxd_mqtt_tls_establish_process(NXD_MQTT_CLIENT * client_ptr)2597 static VOID _nxd_mqtt_tls_establish_process(NXD_MQTT_CLIENT *client_ptr)
2598 {
2599 UINT status;
2600
2601
2602 /* Directly call handshake process for async mode. */
2603 status = _nx_secure_tls_handshake_process(&(client_ptr -> nxd_mqtt_tls_session), NX_NO_WAIT);
2604 if (status == NX_SUCCESS)
2605 {
2606
2607 /* TLS session established. */
2608 client_ptr -> nxd_mqtt_tls_in_progress = NX_FALSE;
2609
2610 #ifdef NXD_MQTT_OVER_WEBSOCKET
2611
2612 /* If using websocket, start websocket connection. */
2613 if (client_ptr -> nxd_mqtt_client_use_websocket)
2614 {
2615 status = nx_websocket_client_secure_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_tls_session),
2616 client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
2617 client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
2618 (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
2619 NX_NO_WAIT);
2620
2621 if (status == NX_IN_PROGRESS)
2622 {
2623 return;
2624 }
2625 }
2626 else
2627 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2628 {
2629
2630 /* Start to send MQTT connect packet. */
2631 status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
2632 }
2633 }
2634 else if (status == NX_CONTINUE)
2635 {
2636 return;
2637 }
2638
2639 /* Check status. */
2640 if (status)
2641 {
2642
2643 /* End connection. */
2644 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2645
2646 /* Check callback function. */
2647 if (client_ptr -> nxd_mqtt_connect_notify)
2648 {
2649 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2650 }
2651 }
2652
2653 return;
2654 }
2655 #endif /* NX_SECURE_ENABLE */
2656
2657 /**************************************************************************/
2658 /* */
2659 /* FUNCTION RELEASE */
2660 /* */
2661 /* _nxd_mqtt_client_append_message PORTABLE C */
2662 /* 6.1 */
2663 /* AUTHOR */
2664 /* */
2665 /* Yuxin Zhou, Microsoft Corporation */
2666 /* */
2667 /* DESCRIPTION */
2668 /* */
2669 /* This function writes the message length and message in the outgoing */
2670 /* MQTT packet. */
2671 /* */
2672 /* INPUT */
2673 /* */
2674 /* client_ptr Pointer to MQTT Client */
2675 /* packet_ptr Outgoing MQTT packet */
2676 /* message Pointer to the message */
2677 /* length Length of the message */
2678 /* wait_option Wait option */
2679 /* */
2680 /* OUTPUT */
2681 /* */
2682 /* status */
2683 /* */
2684 /* CALLS */
2685 /* */
2686 /* nx_packet_data_append Append packet data */
2687 /* */
2688 /* CALLED BY */
2689 /* */
2690 /* _nxd_mqtt_client_sub_unsub */
2691 /* _nxd_mqtt_client_connect */
2692 /* _nxd_mqtt_client_publish */
2693 /* */
2694 /* RELEASE HISTORY */
2695 /* */
2696 /* DATE NAME DESCRIPTION */
2697 /* */
2698 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2699 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2700 /* resulting in version 6.1 */
2701 /* */
2702 /**************************************************************************/
_nxd_mqtt_client_append_message(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,CHAR * message,UINT length,ULONG wait_option)2703 UINT _nxd_mqtt_client_append_message(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, CHAR *message, UINT length, ULONG wait_option)
2704 {
2705 UINT ret = 0;
2706 UCHAR len[2];
2707
2708 len[0] = (length >> 8) & 0xFF;
2709 len[1] = length & 0xFF;
2710
2711 /* Append message length field. */
2712 ret = nx_packet_data_append(packet_ptr, len, 2, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
2713
2714 if (ret)
2715 {
2716 return(ret);
2717 }
2718
2719 if (length)
2720 {
2721 /* Copy the string into the packet. */
2722 ret = nx_packet_data_append(packet_ptr, message, length,
2723 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
2724 }
2725
2726 return(ret);
2727 }
2728
2729
2730 /**************************************************************************/
2731 /* */
2732 /* FUNCTION RELEASE */
2733 /* */
2734 /* _nxd_mqtt_client_connection_end PORTABLE C */
2735 /* 6.2.0 */
2736 /* AUTHOR */
2737 /* */
2738 /* Yuxin Zhou, Microsoft Corporation */
2739 /* */
2740 /* DESCRIPTION */
2741 /* */
2742 /* This function is used to end the MQTT connection. */
2743 /* */
2744 /* INPUT */
2745 /* */
2746 /* client_ptr Pointer to MQTT Client */
2747 /* wait_option Wait option */
2748 /* */
2749 /* OUTPUT */
2750 /* */
2751 /* None */
2752 /* */
2753 /* CALLS */
2754 /* */
2755 /* nx_secure_tls_session_end End TLS session */
2756 /* nx_secure_tls_session_delete Delete TLS session */
2757 /* nx_tcp_socket_disconnect Close TCP connection */
2758 /* nx_tcp_client_socket_unbind Unbind TCP socket */
2759 /* tx_timer_delete Delete timer */
2760 /* */
2761 /* CALLED BY */
2762 /* */
2763 /* _nxd_mqtt_client_connect */
2764 /* _nxd_mqtt_process_disconnect */
2765 /* */
2766 /* RELEASE HISTORY */
2767 /* */
2768 /* DATE NAME DESCRIPTION */
2769 /* */
2770 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2771 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2772 /* resulting in version 6.1 */
2773 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
2774 /* mqtt over websocket, */
2775 /* resulting in version 6.2.0 */
2776 /* */
2777 /**************************************************************************/
_nxd_mqtt_client_connection_end(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)2778 VOID _nxd_mqtt_client_connection_end(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
2779 {
2780
2781 /* Obtain the mutex. */
2782 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
2783
2784 /* Mark the session as terminated. */
2785 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
2786
2787 /* Release the mutex. */
2788 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2789
2790 #ifdef NXD_MQTT_OVER_WEBSOCKET
2791 if (client_ptr -> nxd_mqtt_client_use_websocket)
2792 {
2793 nx_websocket_client_disconnect(&(client_ptr -> nxd_mqtt_client_websocket), wait_option);
2794 }
2795 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2796
2797 #ifdef NX_SECURE_ENABLE
2798 if (client_ptr -> nxd_mqtt_client_use_tls)
2799 {
2800 nx_secure_tls_session_end(&(client_ptr -> nxd_mqtt_tls_session), wait_option);
2801 nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
2802 }
2803 #endif
2804 nx_tcp_socket_disconnect(&(client_ptr -> nxd_mqtt_client_socket), wait_option);
2805 nx_tcp_client_socket_unbind(&(client_ptr -> nxd_mqtt_client_socket));
2806
2807 /* Disable timer if timer has been started. */
2808 if (client_ptr -> nxd_mqtt_keepalive)
2809 {
2810 tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
2811 }
2812 }
2813
2814
2815 static UINT _nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT *client_ptr, UCHAR header_value);
2816
2817
2818 /* MQTT internal function */
2819
2820 /**************************************************************************/
2821 /* */
2822 /* FUNCTION RELEASE */
2823 /* */
2824 /* _nxd_mqtt_periodic_timer_entry PORTABLE C */
2825 /* 6.1 */
2826 /* AUTHOR */
2827 /* */
2828 /* Yuxin Zhou, Microsoft Corporation */
2829 /* */
2830 /* DESCRIPTION */
2831 /* */
2832 /* This internal function is passed to MQTT client socket create call. */
2833 /* This callback function notifies MQTT client thread when the TCP */
2834 /* connection is lost. */
2835 /* */
2836 /* */
2837 /* INPUT */
2838 /* */
2839 /* socket_ptr Pointer to TCP socket that */
2840 /* disconnected. */
2841 /* */
2842 /* OUTPUT */
2843 /* */
2844 /* None */
2845 /* */
2846 /* CALLS */
2847 /* */
2848 /* None */
2849 /* */
2850 /* CALLED BY */
2851 /* */
2852 /* TCP socket disconnect callback */
2853 /* */
2854 /* RELEASE HISTORY */
2855 /* */
2856 /* DATE NAME DESCRIPTION */
2857 /* */
2858 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2859 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2860 /* resulting in version 6.1 */
2861 /* */
2862 /**************************************************************************/
_nxd_mqtt_periodic_timer_entry(ULONG client)2863 static VOID _nxd_mqtt_periodic_timer_entry(ULONG client)
2864 {
2865 /* Check if it is time to send out a ping message. */
2866 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)client;
2867
2868 /* If an outstanding ping response has not been received, and the client exceeds the time waiting for ping response,
2869 the client shall disconnect from the server. */
2870 if (client_ptr -> nxd_mqtt_ping_not_responded)
2871 {
2872 /* If current time is greater than the ping timeout */
2873 if ((tx_time_get() - client_ptr -> nxd_mqtt_ping_sent_time) >= client_ptr -> nxd_mqtt_ping_timeout)
2874 {
2875 /* Ping timed out. Need to terminate the connection. */
2876 #ifndef NXD_MQTT_CLOUD_ENABLE
2877 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_PING_TIMEOUT_EVENT, TX_OR);
2878 #else
2879 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PING_TIMEOUT_EVENT);
2880 #endif /* NXD_MQTT_CLOUD_ENABLE */
2881
2882 return;
2883 }
2884 }
2885
2886 /* About to timeout? */
2887 if ((client_ptr -> nxd_mqtt_timeout - tx_time_get()) <= client_ptr -> nxd_mqtt_timer_value)
2888 {
2889 /* Set the flag so the MQTT thread can send the ping. */
2890 #ifndef NXD_MQTT_CLOUD_ENABLE
2891 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_TIMEOUT_EVENT, TX_OR);
2892 #else
2893 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_TIMEOUT_EVENT);
2894 #endif /* NXD_MQTT_CLOUD_ENABLE */
2895 }
2896
2897 /* If keepalive is not enabled, just return. */
2898 return;
2899 }
2900
2901
2902
2903 /**************************************************************************/
2904 /* */
2905 /* FUNCTION RELEASE */
2906 /* */
2907 /* _nxd_mqtt_client_event_process PORTABLE C */
2908 /* 6.1 */
2909 /* AUTHOR */
2910 /* */
2911 /* Yuxin Zhou, Microsoft Corporation */
2912 /* */
2913 /* DESCRIPTION */
2914 /* */
2915 /* This internal function serves as the entry point for the MQTT */
2916 /* client thread. */
2917 /* */
2918 /* INPUT */
2919 /* */
2920 /* mqtt_client Pointer to MQTT Client */
2921 /* */
2922 /* OUTPUT */
2923 /* */
2924 /* None */
2925 /* */
2926 /* CALLS */
2927 /* */
2928 /* _nxd_mqtt_release_transmit_packet */
2929 /* _nxd_mqtt_release_receive_packet */
2930 /* _nxd_mqtt_send_simple_message */
2931 /* _nxd_mqtt_process_disconnect */
2932 /* _nxd_mqtt_packet_receive_process */
2933 /* tx_timer_delete */
2934 /* tx_event_flags_delete */
2935 /* nx_tcp_socket_delete */
2936 /* tx_timer_delete */
2937 /* tx_mutex_delete */
2938 /* */
2939 /* CALLED BY */
2940 /* */
2941 /* _nxd_mqtt_client_create */
2942 /* */
2943 /* RELEASE HISTORY */
2944 /* */
2945 /* DATE NAME DESCRIPTION */
2946 /* */
2947 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2948 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
2949 /* corrected mqtt client state,*/
2950 /* resulting in version 6.1 */
2951 /* */
2952 /**************************************************************************/
_nxd_mqtt_client_event_process(VOID * mqtt_client,ULONG common_events,ULONG module_own_events)2953 static VOID _nxd_mqtt_client_event_process(VOID *mqtt_client, ULONG common_events, ULONG module_own_events)
2954 {
2955 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)mqtt_client;
2956
2957
2958 /* Obtain the mutex. */
2959 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
2960
2961 /* Process common events. */
2962 NX_PARAMETER_NOT_USED(common_events);
2963
2964 if (module_own_events & MQTT_TIMEOUT_EVENT)
2965 {
2966 /* Send out PING only if the client is connected. */
2967 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2968 {
2969 _nxd_mqtt_send_simple_message(client_ptr, MQTT_CONTROL_PACKET_TYPE_PINGREQ);
2970 }
2971 }
2972
2973 if (module_own_events & MQTT_TCP_ESTABLISH_EVENT)
2974 {
2975 _nxd_mqtt_tcp_establish_process(client_ptr);
2976 }
2977
2978 if (module_own_events & MQTT_PACKET_RECEIVE_EVENT)
2979 {
2980 #ifdef NX_SECURE_ENABLE
2981 /* TLS in progress on async mode. */
2982 if (client_ptr -> nxd_mqtt_tls_in_progress)
2983 {
2984 _nxd_mqtt_tls_establish_process(client_ptr);
2985 }
2986 else
2987 #endif /* NX_SECURE_ENABLE */
2988
2989 _nxd_mqtt_packet_receive_process(client_ptr);
2990 }
2991
2992 if (module_own_events & MQTT_PING_TIMEOUT_EVENT)
2993 {
2994 /* The server/broker didn't respond to our ping request message. Disconnect from the server. */
2995 _nxd_mqtt_process_disconnect(client_ptr);
2996 }
2997 if (module_own_events & MQTT_NETWORK_DISCONNECT_EVENT)
2998 {
2999 /* The server closed TCP socket. We shall go through the disconnect code path. */
3000 _nxd_mqtt_process_disconnect(client_ptr);
3001 }
3002
3003 if (module_own_events & MQTT_DELETE_EVENT)
3004 {
3005
3006 /* Stop the client and disconnect from the server. */
3007 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
3008 {
3009 _nxd_mqtt_process_disconnect(client_ptr);
3010 }
3011
3012 /* Delete the timer. Check first if it is already deleted. */
3013 if ((client_ptr -> nxd_mqtt_timer).tx_timer_id != 0)
3014 tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
3015
3016 #ifndef NXD_MQTT_CLOUD_ENABLE
3017 /* Delete the event flag. Check first if it is already deleted. */
3018 if ((client_ptr -> nxd_mqtt_events).tx_event_flags_group_id != 0)
3019 tx_event_flags_delete(&client_ptr -> nxd_mqtt_events);
3020 #endif /* NXD_MQTT_CLOUD_ENABLE */
3021
3022 /* Release all the messages on the receive queue. */
3023 while (client_ptr -> message_receive_queue_head)
3024 {
3025 _nxd_mqtt_release_receive_packet(client_ptr, client_ptr -> message_receive_queue_head, NX_NULL);
3026 }
3027 client_ptr -> message_receive_queue_depth = 0;
3028
3029 /* Delete all the messages sitting in the receive and transmit queue. */
3030 while (client_ptr -> message_transmit_queue_head)
3031 {
3032 _nxd_mqtt_release_transmit_packet(client_ptr, client_ptr -> message_transmit_queue_head, NX_NULL);
3033 }
3034
3035 /* Release mutex */
3036 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3037
3038 #ifndef NXD_MQTT_CLOUD_ENABLE
3039 /* Delete the mutex. */
3040 tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3041 #endif /* NXD_MQTT_CLOUD_ENABLE */
3042
3043 /* Deleting the socket, (the socket ID is cleared); this signals it is ok to delete this thread. */
3044 nx_tcp_socket_delete(&client_ptr -> nxd_mqtt_client_socket);
3045 }
3046 else
3047 {
3048
3049 /* Release mutex */
3050 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3051 }
3052 }
3053
3054
3055 #ifndef NXD_MQTT_CLOUD_ENABLE
3056 /**************************************************************************/
3057 /* */
3058 /* FUNCTION RELEASE */
3059 /* */
3060 /* _nxd_mqtt_thread_entry PORTABLE C */
3061 /* 6.1 */
3062 /* AUTHOR */
3063 /* */
3064 /* Yuxin Zhou, Microsoft Corporation */
3065 /* */
3066 /* DESCRIPTION */
3067 /* */
3068 /* This internal function serves as the entry point for the MQTT */
3069 /* client thread. */
3070 /* */
3071 /* INPUT */
3072 /* */
3073 /* mqtt_client Pointer to MQTT Client */
3074 /* */
3075 /* OUTPUT */
3076 /* */
3077 /* None */
3078 /* */
3079 /* CALLS */
3080 /* */
3081 /* tx_event_flags_get */
3082 /* _nxd_mqtt_client_event_process */
3083 /* */
3084 /* CALLED BY */
3085 /* */
3086 /* _nxd_mqtt_client_create */
3087 /* */
3088 /* RELEASE HISTORY */
3089 /* */
3090 /* DATE NAME DESCRIPTION */
3091 /* */
3092 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3093 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3094 /* resulting in version 6.1 */
3095 /* */
3096 /**************************************************************************/
_nxd_mqtt_thread_entry(ULONG mqtt_client)3097 static VOID _nxd_mqtt_thread_entry(ULONG mqtt_client)
3098 {
3099 NXD_MQTT_CLIENT *client_ptr;
3100 ULONG events;
3101
3102 client_ptr = (NXD_MQTT_CLIENT *)mqtt_client;
3103
3104 /* Loop to process events on the MQTT client */
3105 for (;;)
3106 {
3107
3108 tx_event_flags_get(&client_ptr -> nxd_mqtt_events, MQTT_ALL_EVENTS, TX_OR_CLEAR, &events, TX_WAIT_FOREVER);
3109
3110 /* Call the event processing routine. */
3111 _nxd_mqtt_client_event_process(client_ptr, NX_NULL, events);
3112
3113 if (events & MQTT_DELETE_EVENT)
3114 {
3115 break;
3116 }
3117 }
3118 }
3119 #endif /* NXD_MQTT_CLOUD_ENABLE */
3120
3121
3122 /**************************************************************************/
3123 /* */
3124 /* FUNCTION RELEASE */
3125 /* */
3126 /* _mqtt_client_disconnect_callback PORTABLE C */
3127 /* 6.1 */
3128 /* AUTHOR */
3129 /* */
3130 /* Yuxin Zhou, Microsoft Corporation */
3131 /* */
3132 /* DESCRIPTION */
3133 /* */
3134 /* This internal function is passed to MQTT client socket create call. */
3135 /* This callback function notifies MQTT client thread when the TCP */
3136 /* connection is lost. */
3137 /* */
3138 /* */
3139 /* INPUT */
3140 /* */
3141 /* socket_ptr Pointer to TCP socket that */
3142 /* disconnected. */
3143 /* */
3144 /* OUTPUT */
3145 /* */
3146 /* None */
3147 /* */
3148 /* CALLS */
3149 /* */
3150 /* None */
3151 /* */
3152 /* CALLED BY */
3153 /* */
3154 /* TCP socket disconnect callback */
3155 /* */
3156 /* RELEASE HISTORY */
3157 /* */
3158 /* DATE NAME DESCRIPTION */
3159 /* */
3160 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3161 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3162 /* resulting in version 6.1 */
3163 /* */
3164 /**************************************************************************/
_mqtt_client_disconnect_callback(NX_TCP_SOCKET * socket_ptr)3165 static VOID _mqtt_client_disconnect_callback(NX_TCP_SOCKET *socket_ptr)
3166 {
3167 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)(socket_ptr -> nx_tcp_socket_reserved_ptr);
3168
3169 /* Set the MQTT_NETWORK_DISCONNECT event. This event indicates
3170 that the disconnect is initiated from the network. */
3171 #ifndef NXD_MQTT_CLOUD_ENABLE
3172 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_NETWORK_DISCONNECT_EVENT, TX_OR);
3173 #else
3174 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_NETWORK_DISCONNECT_EVENT);
3175 #endif /* NXD_MQTT_CLOUD_ENABLE */
3176
3177 return;
3178 }
3179
3180
3181 /**************************************************************************/
3182 /* */
3183 /* FUNCTION RELEASE */
3184 /* */
3185 /* _nxd_mqtt_client_create PORTABLE C */
3186 /* 6.1 */
3187 /* AUTHOR */
3188 /* */
3189 /* Yuxin Zhou, Microsoft Corporation */
3190 /* */
3191 /* DESCRIPTION */
3192 /* */
3193 /* This function creates an instance for MQTT Client. It initializes */
3194 /* MQTT Client control block, creates a thread, mutex and event queue */
3195 /* for MQTT Client, and creates the TCP socket for MQTT messaging. */
3196 /* */
3197 /* */
3198 /* INPUT */
3199 /* */
3200 /* client_ptr Pointer to MQTT Client */
3201 /* client_id Client ID for this instance */
3202 /* client_id_length Length of Client ID, in bytes */
3203 /* ip_ptr Pointer to IP instance */
3204 /* pool_ptr Pointer to packet pool */
3205 /* stack_ptr Client thread's stack pointer */
3206 /* stack_size Client thread's stack size */
3207 /* mqtt_thread_priority Priority for MQTT thread */
3208 /* memory_ptr Deprecated and not used */
3209 /* memory_size Deprecated and not used */
3210 /* */
3211 /* OUTPUT */
3212 /* */
3213 /* status Completion status */
3214 /* */
3215 /* CALLS */
3216 /* */
3217 /* _nxd_mqtt_client_create_internal Create mqtt client */
3218 /* */
3219 /* CALLED BY */
3220 /* */
3221 /* Application Code */
3222 /* */
3223 /* RELEASE HISTORY */
3224 /* */
3225 /* DATE NAME DESCRIPTION */
3226 /* */
3227 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3228 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
3229 /* corrected mqtt client state,*/
3230 /* resulting in version 6.1 */
3231 /* */
3232 /**************************************************************************/
_nxd_mqtt_client_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority,VOID * memory_ptr,ULONG memory_size)3233 UINT _nxd_mqtt_client_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
3234 CHAR *client_id, UINT client_id_length,
3235 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
3236 VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority,
3237 VOID *memory_ptr, ULONG memory_size)
3238 {
3239
3240 UINT status;
3241
3242
3243 NX_PARAMETER_NOT_USED(memory_ptr);
3244 NX_PARAMETER_NOT_USED(memory_size);
3245
3246 /* Create MQTT client. */
3247 status = _nxd_mqtt_client_create_internal(client_ptr, client_name, client_id, client_id_length, ip_ptr,
3248 pool_ptr, stack_ptr, stack_size, mqtt_thread_priority);
3249
3250 /* Check status. */
3251 if (status)
3252 {
3253 return(status);
3254 }
3255
3256 #ifdef NXD_MQTT_CLOUD_ENABLE
3257
3258 /* Create cloud helper. */
3259 status = nx_cloud_create(&(client_ptr -> nxd_mqtt_client_cloud), "Cloud Helper", stack_ptr, stack_size, mqtt_thread_priority);
3260
3261 /* Determine if an error occurred. */
3262 if (status != NX_SUCCESS)
3263 {
3264
3265 /* Delete internal resource created in _nxd_mqtt_client_create_internal(). */
3266
3267 /* Delete socket. */
3268 nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
3269
3270 return(NXD_MQTT_INTERNAL_ERROR);
3271 }
3272
3273 /* Save the cloud pointer. */
3274 client_ptr -> nxd_mqtt_client_cloud_ptr = &(client_ptr -> nxd_mqtt_client_cloud);
3275
3276 /* Save the mutex pointer. */
3277 client_ptr -> nxd_mqtt_client_mutex_ptr = &(client_ptr -> nxd_mqtt_client_cloud.nx_cloud_mutex);
3278
3279 /* Register MQTT on cloud helper. */
3280 status = nx_cloud_module_register(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module), client_name, NX_CLOUD_MODULE_MQTT_EVENT,
3281 _nxd_mqtt_client_event_process, client_ptr);
3282
3283 /* Determine if an error occurred. */
3284 if (status != NX_SUCCESS)
3285 {
3286
3287 /* Delete own created cloud. */
3288 nx_cloud_delete(&(client_ptr -> nxd_mqtt_client_cloud));
3289
3290 /* Delete internal resource created in _nxd_mqtt_client_create_internal(). */
3291
3292 /* Delete socket. */
3293 nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
3294
3295 return(NXD_MQTT_INTERNAL_ERROR);
3296 }
3297
3298 #endif /* NXD_MQTT_CLOUD_ENABLE */
3299
3300 /* Update state. */
3301 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
3302
3303 /* Return. */
3304 return(NXD_MQTT_SUCCESS);
3305 }
3306
3307
3308 /**************************************************************************/
3309 /* */
3310 /* FUNCTION RELEASE */
3311 /* */
3312 /* _nxd_mqtt_client_create_internal PORTABLE C */
3313 /* 6.1.12 */
3314 /* AUTHOR */
3315 /* */
3316 /* Yuxin Zhou, Microsoft Corporation */
3317 /* */
3318 /* DESCRIPTION */
3319 /* */
3320 /* This function creates an instance for MQTT Client. It initializes */
3321 /* MQTT Client control block, creates a thread, mutex and event queue */
3322 /* for MQTT Client, and creates the TCP socket for MQTT messaging. */
3323 /* */
3324 /* */
3325 /* INPUT */
3326 /* */
3327 /* client_ptr Pointer to MQTT Client */
3328 /* client_id Client ID for this instance */
3329 /* client_id_length Length of Client ID, in bytes */
3330 /* ip_ptr Pointer to IP instance */
3331 /* pool_ptr Pointer to packet pool */
3332 /* stack_ptr Client thread's stack pointer */
3333 /* stack_size Client thread's stack size */
3334 /* mqtt_thread_priority Priority for MQTT thread */
3335 /* */
3336 /* OUTPUT */
3337 /* */
3338 /* status Completion status */
3339 /* */
3340 /* CALLS */
3341 /* */
3342 /* tx_thread_create Create a thread */
3343 /* tx_mutex_create Create mutex */
3344 /* tx_mutex_get */
3345 /* tx_mutex_put */
3346 /* tx_event_flag_create Create event flag */
3347 /* */
3348 /* CALLED BY */
3349 /* */
3350 /* Application Code */
3351 /* */
3352 /* RELEASE HISTORY */
3353 /* */
3354 /* DATE NAME DESCRIPTION */
3355 /* */
3356 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3357 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
3358 /* corrected mqtt client state,*/
3359 /* resulting in version 6.1 */
3360 /* 07-29-2022 Spencer McDonough Modified comment(s), and */
3361 /* improved internal logic, */
3362 /* resulting in version 6.1.12 */
3363 /* */
3364 /**************************************************************************/
_nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority)3365 static UINT _nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
3366 CHAR *client_id, UINT client_id_length,
3367 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
3368 VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority)
3369 {
3370 #ifndef NXD_MQTT_CLOUD_ENABLE
3371 UINT status;
3372 #endif /* NXD_MQTT_CLOUD_ENABLE */
3373
3374 #ifdef NXD_MQTT_CLOUD_ENABLE
3375 NX_PARAMETER_NOT_USED(stack_ptr);
3376 NX_PARAMETER_NOT_USED(stack_size);
3377 NX_PARAMETER_NOT_USED(mqtt_thread_priority);
3378 #endif /* NXD_MQTT_CLOUD_ENABLE */
3379
3380 /* Clear the MQTT Client control block. */
3381 NXD_MQTT_SECURE_MEMSET((void *)client_ptr, 0, sizeof(NXD_MQTT_CLIENT));
3382
3383 #ifndef NXD_MQTT_CLOUD_ENABLE
3384
3385 /* Create MQTT mutex. */
3386 status = tx_mutex_create(&client_ptr -> nxd_mqtt_protection, client_name, TX_NO_INHERIT);
3387
3388 /* Determine if an error occurred. */
3389 if (status != TX_SUCCESS)
3390 {
3391
3392 return(NXD_MQTT_INTERNAL_ERROR);
3393 }
3394 client_ptr -> nxd_mqtt_client_mutex_ptr = &(client_ptr -> nxd_mqtt_protection);
3395
3396 /* Now create MQTT client thread */
3397 status = tx_thread_create(&(client_ptr -> nxd_mqtt_thread), client_name, _nxd_mqtt_thread_entry,
3398 (ULONG)client_ptr, stack_ptr, stack_size, mqtt_thread_priority, mqtt_thread_priority,
3399 NXD_MQTT_CLIENT_THREAD_TIME_SLICE, TX_DONT_START);
3400
3401 /* Determine if an error occurred. */
3402 if (status != TX_SUCCESS)
3403 {
3404 /* Delete the mutex. */
3405 tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3406
3407 /* Return error code. */
3408 return(NXD_MQTT_INTERNAL_ERROR);
3409 }
3410
3411 status = tx_event_flags_create(&(client_ptr -> nxd_mqtt_events), client_name);
3412
3413 if (status != TX_SUCCESS)
3414 {
3415 /* Delete the mutex. */
3416 tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3417
3418 /* Delete the thread. */
3419 tx_thread_delete(&(client_ptr -> nxd_mqtt_thread));
3420
3421 /* Return error code. */
3422 return(NXD_MQTT_INTERNAL_ERROR);
3423 }
3424 #endif /* NXD_MQTT_CLOUD_ENABLE */
3425
3426 /* Record the client ID information. */
3427 client_ptr -> nxd_mqtt_client_id = client_id;
3428 client_ptr -> nxd_mqtt_client_id_length = client_id_length;
3429 client_ptr -> nxd_mqtt_client_ip_ptr = ip_ptr;
3430 client_ptr -> nxd_mqtt_client_packet_pool_ptr = pool_ptr;
3431 client_ptr -> nxd_mqtt_client_name = client_name;
3432
3433 /* Create the socket. */
3434 nx_tcp_socket_create(client_ptr -> nxd_mqtt_client_ip_ptr, &(client_ptr -> nxd_mqtt_client_socket), client_ptr -> nxd_mqtt_client_name,
3435 NX_IP_NORMAL, NX_DONT_FRAGMENT, 0x80, NXD_MQTT_CLIENT_SOCKET_WINDOW_SIZE,
3436 NX_NULL, _mqtt_client_disconnect_callback);
3437
3438
3439 /* Record the client_ptr in the socket structure. */
3440 client_ptr -> nxd_mqtt_client_socket.nx_tcp_socket_reserved_ptr = (VOID *)client_ptr;
3441
3442 #ifndef NXD_MQTT_CLOUD_ENABLE
3443 /* Start MQTT thread. */
3444 tx_thread_resume(&(client_ptr -> nxd_mqtt_thread));
3445 #endif /* NXD_MQTT_CLOUD_ENABLE */
3446 return(NXD_MQTT_SUCCESS);
3447 }
3448
3449
3450 /**************************************************************************/
3451 /* */
3452 /* FUNCTION RELEASE */
3453 /* */
3454 /* _nxd_mqtt_client_login_set PORTABLE C */
3455 /* 6.1 */
3456 /* AUTHOR */
3457 /* */
3458 /* Yuxin Zhou, Microsoft Corporation */
3459 /* */
3460 /* DESCRIPTION */
3461 /* */
3462 /* This function sets optional MQTT username and password. Note */
3463 /* if the broker requires username and password, this information */
3464 /* must be set prior to calling nxd_mqtt_client_connect or */
3465 /* nxd_mqtt_client_secure_connect. */
3466 /* */
3467 /* */
3468 /* INPUT */
3469 /* */
3470 /* client_ptr Pointer to MQTT Client */
3471 /* username User name, or NULL if user */
3472 /* name is not required */
3473 /* username_length Length of the user name, or */
3474 /* 0 if user name is NULL */
3475 /* password Password string, or NULL if */
3476 /* password is not required */
3477 /* password_length Length of the password, or */
3478 /* 0 if password is NULL */
3479 /* */
3480 /* OUTPUT */
3481 /* */
3482 /* status Completion status */
3483 /* */
3484 /* CALLS */
3485 /* */
3486 /* tx_mutex_get */
3487 /* */
3488 /* CALLED BY */
3489 /* */
3490 /* Application Code */
3491 /* */
3492 /* RELEASE HISTORY */
3493 /* */
3494 /* DATE NAME DESCRIPTION */
3495 /* */
3496 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3497 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3498 /* resulting in version 6.1 */
3499 /* */
3500 /**************************************************************************/
_nxd_mqtt_client_login_set(NXD_MQTT_CLIENT * client_ptr,CHAR * username,UINT username_length,CHAR * password,UINT password_length)3501 UINT _nxd_mqtt_client_login_set(NXD_MQTT_CLIENT *client_ptr,
3502 CHAR *username, UINT username_length, CHAR *password, UINT password_length)
3503 {
3504 UINT status;
3505
3506 /* Obtain the mutex. */
3507 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3508
3509 if (status != TX_SUCCESS)
3510 {
3511 return(NXD_MQTT_MUTEX_FAILURE);
3512 }
3513 client_ptr -> nxd_mqtt_client_username = username;
3514 client_ptr -> nxd_mqtt_client_username_length = (USHORT)username_length;
3515 client_ptr -> nxd_mqtt_client_password = password;
3516 client_ptr -> nxd_mqtt_client_password_length = (USHORT)password_length;
3517
3518 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3519
3520 return(NX_SUCCESS);
3521 }
3522
3523 /**************************************************************************/
3524 /* */
3525 /* FUNCTION RELEASE */
3526 /* */
3527 /* _nxd_mqtt_client_will_message_set PORTABLE C */
3528 /* 6.1 */
3529 /* AUTHOR */
3530 /* */
3531 /* Yuxin Zhou, Microsoft Corporation */
3532 /* */
3533 /* DESCRIPTION */
3534 /* */
3535 /* This function sets optional MQTT will topic and will message. */
3536 /* Note that if will message is needed, application must set will */
3537 /* message prior to calling nxd_mqtt_client_connect or */
3538 /* nxd_mqtt_client_secure_connect. */
3539 /* */
3540 /* */
3541 /* INPUT */
3542 /* */
3543 /* client_ptr Pointer to MQTT Client */
3544 /* will_topic Will topic string. */
3545 /* will_topic_length Length of the will topic. */
3546 /* will_message Will message string. */
3547 /* will_message_length Length of the will message. */
3548 /* will_retain_flag Whether or not the will */
3549 /* message is to be retained */
3550 /* when it is published. */
3551 /* Valid values are NX_TRUE */
3552 /* NX_FALSE */
3553 /* will_QoS QoS level to be used when */
3554 /* publishing will message. */
3555 /* Valid values are 0, 1, 2 */
3556 /* */
3557 /* OUTPUT */
3558 /* */
3559 /* status Completion status */
3560 /* */
3561 /* CALLS */
3562 /* */
3563 /* tx_mutex_get */
3564 /* */
3565 /* CALLED BY */
3566 /* */
3567 /* Application Code */
3568 /* */
3569 /* RELEASE HISTORY */
3570 /* */
3571 /* DATE NAME DESCRIPTION */
3572 /* */
3573 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3574 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3575 /* resulting in version 6.1 */
3576 /* */
3577 /**************************************************************************/
_nxd_mqtt_client_will_message_set(NXD_MQTT_CLIENT * client_ptr,const UCHAR * will_topic,UINT will_topic_length,const UCHAR * will_message,UINT will_message_length,UINT will_retain_flag,UINT will_QoS)3578 UINT _nxd_mqtt_client_will_message_set(NXD_MQTT_CLIENT *client_ptr,
3579 const UCHAR *will_topic, UINT will_topic_length, const UCHAR *will_message,
3580 UINT will_message_length, UINT will_retain_flag, UINT will_QoS)
3581 {
3582 UINT status;
3583
3584 if (will_QoS == 2)
3585 {
3586 return(NXD_MQTT_QOS2_NOT_SUPPORTED);
3587 }
3588
3589 /* Obtain the mutex. */
3590 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3591
3592 if (status != TX_SUCCESS)
3593 {
3594 return(NXD_MQTT_MUTEX_FAILURE);
3595 }
3596
3597 client_ptr -> nxd_mqtt_client_will_topic = will_topic;
3598 client_ptr -> nxd_mqtt_client_will_topic_length = will_topic_length;
3599 client_ptr -> nxd_mqtt_client_will_message = will_message;
3600 client_ptr -> nxd_mqtt_client_will_message_length = will_message_length;
3601
3602 if (will_retain_flag == NX_TRUE)
3603 {
3604 client_ptr -> nxd_mqtt_client_will_qos_retain = 0x80;
3605 }
3606 client_ptr -> nxd_mqtt_client_will_qos_retain = (UCHAR)(client_ptr -> nxd_mqtt_client_will_qos_retain | will_QoS);
3607
3608 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3609
3610 return(NX_SUCCESS);
3611 }
3612
3613
3614 /**************************************************************************/
3615 /* */
3616 /* FUNCTION RELEASE */
3617 /* */
3618 /* _nxde_mqtt_client_will_message_set PORTABLE C */
3619 /* 6.1 */
3620 /* AUTHOR */
3621 /* */
3622 /* Yuxin Zhou, Microsoft Corporation */
3623 /* */
3624 /* DESCRIPTION */
3625 /* */
3626 /* This function checks for errors in the */
3627 /* nxd_mqtt_client_will_message_set call. */
3628 /* */
3629 /* */
3630 /* INPUT */
3631 /* */
3632 /* client_ptr Pointer to MQTT Client */
3633 /* will_topic Will topic string. */
3634 /* will_topic_length Length of the will topic. */
3635 /* will_message Will message string. */
3636 /* will_message_length Length of the will message. */
3637 /* will_retain_flag Whether or not the will */
3638 /* message is to be retained */
3639 /* when it is published. */
3640 /* Valid values are NX_TRUE */
3641 /* NX_FALSE */
3642 /* will_QoS QoS level to be used when */
3643 /* publishing will message. */
3644 /* Valid values are 0, 1, 2 */
3645 /* */
3646 /* OUTPUT */
3647 /* */
3648 /* status Completion status */
3649 /* */
3650 /* CALLS */
3651 /* */
3652 /* _nxd_mqtt_client_will_message_set */
3653 /* */
3654 /* CALLED BY */
3655 /* */
3656 /* Application Code */
3657 /* */
3658 /* RELEASE HISTORY */
3659 /* */
3660 /* DATE NAME DESCRIPTION */
3661 /* */
3662 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3663 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3664 /* resulting in version 6.1 */
3665 /* */
3666 /**************************************************************************/
_nxde_mqtt_client_will_message_set(NXD_MQTT_CLIENT * client_ptr,const UCHAR * will_topic,UINT will_topic_length,const UCHAR * will_message,UINT will_message_length,UINT will_retain_flag,UINT will_QoS)3667 UINT _nxde_mqtt_client_will_message_set(NXD_MQTT_CLIENT *client_ptr,
3668 const UCHAR *will_topic, UINT will_topic_length, const UCHAR *will_message,
3669 UINT will_message_length, UINT will_retain_flag, UINT will_QoS)
3670 {
3671
3672 if (client_ptr == NX_NULL)
3673 {
3674 return(NXD_MQTT_INVALID_PARAMETER);
3675 }
3676
3677 /* Valid will_topic string. The will topic string cannot be NULL. */
3678 if ((will_topic == NX_NULL) || (will_topic_length == 0))
3679 {
3680 return(NXD_MQTT_INVALID_PARAMETER);
3681 }
3682
3683 if ((will_retain_flag != NX_TRUE) && (will_retain_flag != NX_FALSE))
3684 {
3685 return(NXD_MQTT_INVALID_PARAMETER);
3686 }
3687
3688 if (will_QoS > 2)
3689 {
3690 return(NXD_MQTT_INVALID_PARAMETER);
3691 }
3692
3693 return(_nxd_mqtt_client_will_message_set(client_ptr, will_topic, will_topic_length, will_message,
3694 will_message_length, will_retain_flag, will_QoS));
3695 }
3696
3697 /**************************************************************************/
3698 /* */
3699 /* FUNCTION RELEASE */
3700 /* */
3701 /* _nxde_mqtt_client_login_set PORTABLE C */
3702 /* 6.1 */
3703 /* AUTHOR */
3704 /* */
3705 /* Yuxin Zhou, Microsoft Corporation */
3706 /* */
3707 /* DESCRIPTION */
3708 /* */
3709 /* This function checks for errors in the nxd_mqtt_client_login call. */
3710 /* */
3711 /* */
3712 /* INPUT */
3713 /* */
3714 /* client_ptr Pointer to MQTT Client */
3715 /* username User name, or NULL if user */
3716 /* name is not required */
3717 /* username_length Length of the user name, or */
3718 /* 0 if user name is NULL */
3719 /* password Password string, or NULL if */
3720 /* password is not required */
3721 /* password_length Length of the password, or */
3722 /* 0 if password is NULL */
3723 /* */
3724 /* OUTPUT */
3725 /* */
3726 /* status Completion status */
3727 /* */
3728 /* CALLS */
3729 /* */
3730 /* _nxd_mqtt_client_login_set */
3731 /* */
3732 /* CALLED BY */
3733 /* */
3734 /* Application Code */
3735 /* */
3736 /* RELEASE HISTORY */
3737 /* */
3738 /* DATE NAME DESCRIPTION */
3739 /* */
3740 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3741 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3742 /* resulting in version 6.1 */
3743 /* */
3744 /**************************************************************************/
_nxde_mqtt_client_login_set(NXD_MQTT_CLIENT * client_ptr,CHAR * username,UINT username_length,CHAR * password,UINT password_length)3745 UINT _nxde_mqtt_client_login_set(NXD_MQTT_CLIENT *client_ptr,
3746 CHAR *username, UINT username_length, CHAR *password, UINT password_length)
3747 {
3748 if (client_ptr == NX_NULL)
3749 {
3750 return(NXD_MQTT_INVALID_PARAMETER);
3751 }
3752
3753 /* Username and username length don't match,
3754 or password and password length don't match. */
3755 if (((username == NX_NULL) && (username_length != 0)) ||
3756 ((password == NX_NULL) && (password_length != 0)))
3757 {
3758 return(NXD_MQTT_INVALID_PARAMETER);
3759 }
3760
3761 return(_nxd_mqtt_client_login_set(client_ptr, username, username_length, password, password_length));
3762 }
3763
3764
3765 /**************************************************************************/
3766 /* */
3767 /* FUNCTION RELEASE */
3768 /* */
3769 /* _nxd_mqtt_client_retransmit_message PORTABLE C */
3770 /* 6.2.0 */
3771 /* AUTHOR */
3772 /* */
3773 /* Yuxin Zhou, Microsoft Corporation */
3774 /* */
3775 /* DESCRIPTION */
3776 /* */
3777 /* This function retransmit QoS1 messages upon reconnection, if the */
3778 /* connection is not set CLEAN_SESSION. */
3779 /* */
3780 /* */
3781 /* INPUT */
3782 /* */
3783 /* client_ptr Pointer to MQTT Client */
3784 /* wait_option Timeout value */
3785 /* */
3786 /* OUTPUT */
3787 /* */
3788 /* status Completion status */
3789 /* */
3790 /* CALLS */
3791 /* */
3792 /* tx_mutex_get */
3793 /* tx_mutex_put */
3794 /* _nxd_mqtt_packet_send */
3795 /* nx_packet_release */
3796 /* tx_time_get */
3797 /* nx_packet_copy */
3798 /* */
3799 /* CALLED BY */
3800 /* */
3801 /* Application Code */
3802 /* */
3803 /* RELEASE HISTORY */
3804 /* */
3805 /* DATE NAME DESCRIPTION */
3806 /* */
3807 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3808 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3809 /* resulting in version 6.1 */
3810 /* 10-31-2022 Bo Chen Modified comment(s), improved */
3811 /* the logic of sending packet,*/
3812 /* resulting in version 6.2.0 */
3813 /* */
3814 /**************************************************************************/
_nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)3815 static UINT _nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
3816 {
3817 NX_PACKET *transmit_packet_ptr;
3818 NX_PACKET *packet_ptr;
3819 UINT status = NXD_MQTT_SUCCESS;
3820 UINT mutex_status;
3821 UCHAR fixed_header;
3822
3823 transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
3824
3825 while (transmit_packet_ptr)
3826 {
3827 fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
3828
3829 if ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4))
3830 {
3831
3832 /* Retransmit publish packet only. */
3833 /* Obtain a NetX Packet. */
3834 status = nx_packet_copy(transmit_packet_ptr, &packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
3835
3836 if (status != NXD_MQTT_SUCCESS)
3837 {
3838 return(NXD_MQTT_PACKET_POOL_FAILURE);
3839 }
3840
3841 /* Release the mutex. */
3842 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3843
3844 /* Send packet to server. */
3845 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
3846
3847 if (status)
3848 {
3849 /* Release the packet. */
3850 nx_packet_release(packet_ptr);
3851
3852 status = NXD_MQTT_COMMUNICATION_FAILURE;
3853 }
3854 /* Obtain the mutex. */
3855 mutex_status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, wait_option);
3856
3857 if (mutex_status != TX_SUCCESS)
3858 {
3859 return(NXD_MQTT_MUTEX_FAILURE);
3860 }
3861 if (status)
3862 {
3863 return(status);
3864 }
3865 }
3866 transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
3867 }
3868
3869 /* Update the timeout value. */
3870 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
3871
3872 return(status);
3873 }
3874
3875 /**************************************************************************/
3876 /* */
3877 /* FUNCTION RELEASE */
3878 /* */
3879 /* _nxd_mqtt_client_connect PORTABLE C */
3880 /* 6.3.0 */
3881 /* AUTHOR */
3882 /* */
3883 /* Yuxin Zhou, Microsoft Corporation */
3884 /* */
3885 /* DESCRIPTION */
3886 /* */
3887 /* This function makes an initial connection to the MQTT server. */
3888 /* */
3889 /* */
3890 /* INPUT */
3891 /* */
3892 /* client_ptr Pointer to MQTT Client */
3893 /* server_ip Server IP address structure */
3894 /* server_port Server port number, in host */
3895 /* byte order */
3896 /* keepalive Keepalive flag */
3897 /* clean_session Clean session flag */
3898 /* wait_option Timeout value */
3899 /* */
3900 /* */
3901 /* OUTPUT */
3902 /* */
3903 /* status Completion status */
3904 /* */
3905 /* CALLS */
3906 /* */
3907 /* tx_mutex_get */
3908 /* nx_tcp_socket_create Create TCP socket */
3909 /* nx_tcp_socket_receive_notify */
3910 /* nx_tcp_client_socket_bind */
3911 /* nxd_tcp_client_socket_connect */
3912 /* nx_tcp_client_socket_unbind */
3913 /* tx_mutex_put */
3914 /* nx_secure_tls_session_start */
3915 /* nx_secure_tls_session_send */
3916 /* nx_secure_tls_session_receive */
3917 /* _nxd_mqtt_client_set_fixed_header */
3918 /* _nxd_mqtt_client_append_message */
3919 /* nx_tcp_socket_send */
3920 /* nx_packet_release */
3921 /* nx_tcp_socket_receive */
3922 /* tx_event_flag_set */
3923 /* _nxd_mqtt_release_transmit_packet */
3924 /* tx_timer_create */
3925 /* nx_secure_tls_session_receive */
3926 /* _nxd_mqtt_client_connection_end */
3927 /* */
3928 /* CALLED BY */
3929 /* */
3930 /* Application Code */
3931 /* */
3932 /* RELEASE HISTORY */
3933 /* */
3934 /* DATE NAME DESCRIPTION */
3935 /* */
3936 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3937 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
3938 /* fixed return value when it */
3939 /* is set in CONNACK, corrected*/
3940 /* mqtt client state, */
3941 /* resulting in version 6.1 */
3942 /* 08-02-2021 Yuxin Zhou Modified comment(s), and */
3943 /* corrected the logic for */
3944 /* non-blocking mode, */
3945 /* resulting in version 6.1.8 */
3946 /* 07-29-2022 Spencer McDonough Modified comment(s), and */
3947 /* improved internal logic, */
3948 /* resulting in version 6.1.12 */
3949 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
3950 /* mqtt over websocket, */
3951 /* resulting in version 6.2.0 */
3952 /* 10-31-2023 Haiqing Zhao Modified comment(s), */
3953 /* resulting in version 6.3.0 */
3954 /* */
3955 /**************************************************************************/
_nxd_mqtt_client_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT keepalive,UINT clean_session,ULONG wait_option)3956 UINT _nxd_mqtt_client_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
3957 UINT keepalive, UINT clean_session, ULONG wait_option)
3958 {
3959 NX_PACKET *packet_ptr;
3960 UINT status;
3961 TX_THREAD *thread_ptr;
3962 UINT new_priority;
3963 UINT old_priority;
3964
3965
3966 /* Obtain the mutex. */
3967 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3968
3969 if (status != TX_SUCCESS)
3970 {
3971 #ifdef NX_SECURE_ENABLE
3972 if (client_ptr -> nxd_mqtt_client_use_tls)
3973 {
3974 nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
3975 }
3976 #endif /* NX_SECURE_ENABLE */
3977
3978 return(NXD_MQTT_MUTEX_FAILURE);
3979 }
3980
3981 /* Do nothing if the client is already connected. */
3982 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
3983 {
3984 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3985 return(NXD_MQTT_ALREADY_CONNECTED);
3986 }
3987
3988 /* Check if client is connecting. */
3989 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTING)
3990 {
3991 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3992 return(NXD_MQTT_CONNECTING);
3993 }
3994
3995 /* Client state must be in IDLE. */
3996 if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_IDLE)
3997 {
3998 #ifdef NX_SECURE_ENABLE
3999 if (client_ptr -> nxd_mqtt_client_use_tls)
4000 {
4001 nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
4002 }
4003 #endif /* NX_SECURE_ENABLE */
4004 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4005 return(NXD_MQTT_INVALID_STATE);
4006 }
4007
4008 #if defined(NX_SECURE_ENABLE) && defined(NXD_MQTT_REQUIRE_TLS)
4009 if (!client_ptr -> nxd_mqtt_client_use_tls)
4010 {
4011
4012 /* NXD_MQTT_REQUIRE_TLS is defined but the application does not use TLS.
4013 This is security violation. Return with failure code. */
4014 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4015 return(NXD_MQTT_CONNECT_FAILURE);
4016 }
4017 #endif /* NX_SECURE_ENABLE && NXD_MQTT_REQUIRE_TLS*/
4018
4019 /* Record the keepalive value, converted to TX timer ticks. */
4020 client_ptr -> nxd_mqtt_keepalive = keepalive * NX_IP_PERIODIC_RATE;
4021 if (keepalive)
4022 {
4023 client_ptr -> nxd_mqtt_timer_value = NXD_MQTT_KEEPALIVE_TIMER_RATE;
4024 client_ptr -> nxd_mqtt_ping_timeout = NXD_MQTT_PING_TIMEOUT_DELAY;
4025
4026 /* Create timer */
4027 tx_timer_create(&(client_ptr -> nxd_mqtt_timer), "MQTT Timer", _nxd_mqtt_periodic_timer_entry, (ULONG)client_ptr,
4028 client_ptr -> nxd_mqtt_timer_value, client_ptr -> nxd_mqtt_timer_value, TX_AUTO_ACTIVATE);
4029 }
4030 else
4031 {
4032 client_ptr -> nxd_mqtt_timer_value = 0;
4033 client_ptr -> nxd_mqtt_ping_timeout = 0;
4034 }
4035
4036 /* Record the clean session flag. */
4037 client_ptr -> nxd_mqtt_clean_session = clean_session;
4038
4039 /* Set TCP connection establish notify for non-blocking mode. */
4040 if (wait_option == 0)
4041 {
4042 nx_tcp_socket_establish_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_tcp_establish_notify);
4043
4044 /* Set the receive callback. */
4045 nx_tcp_socket_receive_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_receive_callback);
4046
4047 #ifdef NXD_MQTT_OVER_WEBSOCKET
4048 if (client_ptr -> nxd_mqtt_client_use_websocket)
4049 {
4050
4051 /* Set the websocket connection callback. */
4052 nx_websocket_client_connection_status_callback_set(&client_ptr -> nxd_mqtt_client_websocket, client_ptr, _nxd_mqtt_client_websocket_connection_status_callback);
4053 }
4054 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4055 }
4056 else
4057 {
4058
4059 /* Clean receive callback. */
4060 client_ptr -> nxd_mqtt_client_socket.nx_tcp_receive_callback = NX_NULL;
4061
4062 #ifdef NXD_MQTT_OVER_WEBSOCKET
4063 if (client_ptr -> nxd_mqtt_client_use_websocket)
4064 {
4065
4066 /* Clean the websocket connection callback. */
4067 nx_websocket_client_connection_status_callback_set(&client_ptr -> nxd_mqtt_client_websocket, NX_NULL, NX_NULL);
4068 }
4069 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4070 }
4071
4072 /* Release mutex */
4073 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4074
4075 /* First attempt to bind the client socket. */
4076 nx_tcp_client_socket_bind(&(client_ptr -> nxd_mqtt_client_socket), NX_ANY_PORT, wait_option);
4077
4078 /* Obtain the mutex. */
4079 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4080
4081 /* Make state as NXD_MQTT_CLIENT_STATE_CONNECTING. */
4082 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_CONNECTING;
4083
4084 /* Release mutex. */
4085 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4086
4087 /* Connect to the MQTT server */
4088 status = nxd_tcp_client_socket_connect(&(client_ptr -> nxd_mqtt_client_socket), server_ip, server_port, wait_option);
4089 if ((status != NX_SUCCESS) && (status != NX_IN_PROGRESS))
4090 {
4091
4092 /* Obtain the mutex. */
4093 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4094
4095 /* Make state as NXD_MQTT_CLIENT_STATE_IDLE. */
4096 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
4097
4098 /* Release mutex. */
4099 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4100
4101 #ifdef NX_SECURE_ENABLE
4102 if (client_ptr -> nxd_mqtt_client_use_tls)
4103 {
4104 nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
4105 }
4106 #endif /* NX_SECURE_ENABLE */
4107 nx_tcp_client_socket_unbind(&(client_ptr -> nxd_mqtt_client_socket));
4108 tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
4109 return(NXD_MQTT_CONNECT_FAILURE);
4110 }
4111
4112 /* Just return for non-blocking mode. */
4113 if (wait_option == 0)
4114 {
4115 return(NX_IN_PROGRESS);
4116 }
4117
4118 /* Increase priority to the same of internal thread to avoid out of order packet process. */
4119 #ifndef NXD_MQTT_CLOUD_ENABLE
4120 thread_ptr = &(client_ptr -> nxd_mqtt_thread);
4121 #else
4122 thread_ptr = &(client_ptr -> nxd_mqtt_client_cloud_ptr -> nx_cloud_thread);
4123 #endif /* NXD_MQTT_CLOUD_ENABLE */
4124 tx_thread_info_get(thread_ptr, NX_NULL, NX_NULL, NX_NULL,
4125 &new_priority, NX_NULL, NX_NULL, NX_NULL, NX_NULL);
4126 tx_thread_priority_change(tx_thread_identify(), new_priority, &old_priority);
4127
4128 /* If TLS is enabled, start TLS */
4129 #ifdef NX_SECURE_ENABLE
4130 if (client_ptr -> nxd_mqtt_client_use_tls)
4131 {
4132
4133 status = nx_secure_tls_session_start(&(client_ptr -> nxd_mqtt_tls_session), &(client_ptr -> nxd_mqtt_client_socket), wait_option);
4134
4135 if (status != NX_SUCCESS)
4136 {
4137
4138 /* Revert thread priority. */
4139 tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4140
4141 /* End connection. */
4142 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4143
4144 return(NXD_MQTT_CONNECT_FAILURE);
4145 }
4146 }
4147 #endif /* NX_SECURE_ENABLE */
4148
4149 #ifdef NXD_MQTT_OVER_WEBSOCKET
4150 if (client_ptr -> nxd_mqtt_client_use_websocket)
4151 {
4152 #ifdef NX_SECURE_ENABLE
4153 if (client_ptr -> nxd_mqtt_client_use_tls)
4154 {
4155 status = nx_websocket_client_secure_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_tls_session),
4156 client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
4157 client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
4158 (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
4159 wait_option);
4160 }
4161 else
4162 #endif /* NX_SECURE_ENABLE */
4163 {
4164 status = nx_websocket_client_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_client_socket),
4165 client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
4166 client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
4167 (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
4168 wait_option);
4169 }
4170
4171 if (status != NX_SUCCESS)
4172 {
4173
4174 /* Revert thread priority. */
4175 tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4176
4177 /* End connection. */
4178 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4179
4180 return(NXD_MQTT_CONNECT_FAILURE);
4181 }
4182 }
4183
4184 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4185
4186 /* Start to send connect packet. */
4187 status = _nxd_mqtt_client_connect_packet_send(client_ptr, wait_option);
4188
4189 if (status != NX_SUCCESS)
4190 {
4191
4192 /* Revert thread priority. */
4193 tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4194
4195 /* End connection. */
4196 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4197 return(NXD_MQTT_CONNECT_FAILURE);
4198 }
4199
4200 /* Call a receive. */
4201 status = _nxd_mqtt_packet_receive(client_ptr, &packet_ptr, wait_option);
4202
4203 /* Revert thread priority. */
4204 tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4205
4206 /* Check status. */
4207 if (status)
4208 {
4209
4210 /* End connection. */
4211 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4212 return(NXD_MQTT_COMMUNICATION_FAILURE);
4213 }
4214
4215 /* Process CONNACK message. */
4216 status = _nxd_mqtt_process_connack(client_ptr, packet_ptr, wait_option);
4217
4218 /* Release the packet. */
4219 nx_packet_release(packet_ptr);
4220
4221 /* Check status. */
4222 if (status == NX_SUCCESS)
4223 {
4224
4225 /* Set the receive callback. */
4226 nx_tcp_socket_receive_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_receive_callback);
4227 }
4228
4229 return(status);
4230 }
4231
4232 /**************************************************************************/
4233 /* */
4234 /* FUNCTION RELEASE */
4235 /* */
4236 /* _nxd_mqtt_client_connect_packet_send PORTABLE C */
4237 /* 6.3.0 */
4238 /* AUTHOR */
4239 /* */
4240 /* Yuxin Zhou, Microsoft Corporation */
4241 /* */
4242 /* DESCRIPTION */
4243 /* */
4244 /* This function sends CONNECT packet to MQTT server. */
4245 /* */
4246 /* */
4247 /* INPUT */
4248 /* */
4249 /* client_ptr Pointer to MQTT Client */
4250 /* keepalive Keepalive flag */
4251 /* clean_session Clean session flag */
4252 /* wait_option Timeout value */
4253 /* */
4254 /* */
4255 /* OUTPUT */
4256 /* */
4257 /* status Completion status */
4258 /* */
4259 /* CALLS */
4260 /* */
4261 /* nx_packet_release */
4262 /* _nxd_mqtt_client_packet_allocate */
4263 /* _nxd_mqtt_release_transmit_packet */
4264 /* _nxd_mqtt_client_connection_end */
4265 /* _nxd_mqtt_client_set_fixed_header */
4266 /* _nxd_mqtt_client_append_message */
4267 /* _nxd_mqtt_packet_send */
4268 /* */
4269 /* CALLED BY */
4270 /* */
4271 /* _nxd_mqtt_client_connect */
4272 /* _nxd_mqtt_tcp_establish_process */
4273 /* _nxd_mqtt_tls_establish_process */
4274 /* */
4275 /* RELEASE HISTORY */
4276 /* */
4277 /* DATE NAME DESCRIPTION */
4278 /* */
4279 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4280 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4281 /* resulting in version 6.1 */
4282 /* 07-29-2022 Spencer McDonough Modified comment(s), */
4283 /* improved internal logic, */
4284 /* resulting in version 6.1.12 */
4285 /* 10-31-2022 Bo Chen Modified comment(s), improved */
4286 /* the logic of sending packet,*/
4287 /* resulting in version 6.2.0 */
4288 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
4289 /* internal logic, */
4290 /* resulting in version 6.3.0 */
4291 /* */
4292 /**************************************************************************/
_nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)4293 UINT _nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
4294 {
4295 NX_PACKET *packet_ptr;
4296 UINT status;
4297 UINT length = 0;
4298 UCHAR connection_flags = 0;
4299 UINT ret = NXD_MQTT_SUCCESS;
4300 UCHAR temp_data[4];
4301 UINT keepalive = (client_ptr -> nxd_mqtt_keepalive/NX_IP_PERIODIC_RATE);
4302
4303
4304 /* Construct connect flags by taking the connect flag user supplies, or'ing the username and
4305 password bits, if they are supplied. */
4306 if (client_ptr -> nxd_mqtt_client_username)
4307 {
4308 connection_flags |= MQTT_CONNECT_FLAGS_USERNAME;
4309
4310 /* Add the password flag only if username is supplied. */
4311 if (client_ptr -> nxd_mqtt_client_password)
4312 {
4313 connection_flags |= MQTT_CONNECT_FLAGS_PASSWORD;
4314 }
4315 }
4316
4317 if (client_ptr -> nxd_mqtt_client_will_topic)
4318 {
4319 connection_flags = connection_flags | MQTT_CONNECT_FLAGS_WILL_FLAG;
4320
4321
4322 if (client_ptr -> nxd_mqtt_client_will_qos_retain & 0x80)
4323 {
4324 connection_flags = connection_flags | MQTT_CONNECT_FLAGS_WILL_RETAIN;
4325 }
4326
4327 connection_flags = (UCHAR)(connection_flags | ((client_ptr -> nxd_mqtt_client_will_qos_retain & 0x3) << 3));
4328 }
4329
4330 if (client_ptr -> nxd_mqtt_clean_session == NX_TRUE)
4331 {
4332 connection_flags = connection_flags | MQTT_CONNECT_FLAGS_CLEAN_SESSION;
4333
4334 /* Clear any transmit blocks from the previous session. */
4335 while (client_ptr -> message_transmit_queue_head)
4336 {
4337 _nxd_mqtt_release_transmit_packet(client_ptr, client_ptr -> message_transmit_queue_head, NX_NULL);
4338 }
4339 }
4340
4341 /* Set the length of the packet. */
4342 length = 10;
4343
4344 /* Add the size of the client Identifier. */
4345 length += (client_ptr -> nxd_mqtt_client_id_length + 2);
4346
4347 /* Add the will topic, if present. */
4348 if (connection_flags & MQTT_CONNECT_FLAGS_WILL_FLAG)
4349 {
4350 length += (client_ptr -> nxd_mqtt_client_will_topic_length + 2);
4351 length += (client_ptr -> nxd_mqtt_client_will_message_length + 2);
4352 }
4353 if (connection_flags & MQTT_CONNECT_FLAGS_USERNAME)
4354 {
4355 length += (UINT)(client_ptr -> nxd_mqtt_client_username_length + 2);
4356 }
4357 if (connection_flags & MQTT_CONNECT_FLAGS_PASSWORD)
4358 {
4359 length += (UINT)(client_ptr -> nxd_mqtt_client_password_length + 2);
4360 }
4361
4362 /* Check for invalid length. */
4363 if (length > (127 * 127 * 127 * 127))
4364 {
4365 return(NXD_MQTT_INTERNAL_ERROR);
4366 }
4367
4368 status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, wait_option);
4369
4370 if (status)
4371 {
4372 return(status);
4373 }
4374
4375 /* Construct MQTT CONNECT message. */
4376 temp_data[0] = ((MQTT_CONTROL_PACKET_TYPE_CONNECT << 4) & 0xF0);
4377
4378 /* Fill in fixed header. */
4379 ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, temp_data[0], length, wait_option);
4380
4381 if (ret)
4382 {
4383 /* Release the packet. */
4384 nx_packet_release(packet_ptr);
4385 return(NXD_MQTT_PACKET_POOL_FAILURE);
4386 }
4387
4388 /* Fill in protocol name. */
4389 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, "MQTT", 4, wait_option);
4390
4391 if (ret)
4392 {
4393 /* Release the packet. */
4394 nx_packet_release(packet_ptr);
4395 return(NXD_MQTT_PACKET_POOL_FAILURE);
4396 }
4397
4398 /* Fill in protocol level, */
4399 temp_data[0] = MQTT_PROTOCOL_LEVEL;
4400
4401 /* Fill in byte 8: connect flags */
4402 temp_data[1] = connection_flags;
4403
4404 /* Fill in byte 9 and 10: keep alive */
4405 temp_data[2] = (keepalive >> 8) & 0xFF;
4406 temp_data[3] = (keepalive & 0xFF);
4407
4408 ret = nx_packet_data_append(packet_ptr, temp_data, 4, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4409
4410 if (ret)
4411 {
4412
4413 /* Release the packet. */
4414 nx_packet_release(packet_ptr);
4415
4416 return(NXD_MQTT_PACKET_POOL_FAILURE);
4417 }
4418
4419 /* Fill in payload area, in the order of: client identifier, will topic, will message,
4420 user name, and password. */
4421 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_id,
4422 client_ptr -> nxd_mqtt_client_id_length, wait_option);
4423
4424 /* Next fill will topic and will message if the will flag is set. */
4425 if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_WILL_FLAG))
4426 {
4427 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, (CHAR *)client_ptr -> nxd_mqtt_client_will_topic,
4428 client_ptr -> nxd_mqtt_client_will_topic_length, wait_option);
4429
4430 if (!ret)
4431 {
4432 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, (CHAR *)client_ptr -> nxd_mqtt_client_will_message,
4433 client_ptr -> nxd_mqtt_client_will_message_length, wait_option);
4434 }
4435 }
4436
4437 /* Fill username if username flag is set */
4438 if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_USERNAME))
4439 {
4440 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_username,
4441 client_ptr -> nxd_mqtt_client_username_length, wait_option);
4442 }
4443
4444 /* Fill password if password flag is set */
4445 if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_PASSWORD))
4446 {
4447 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_password,
4448 client_ptr -> nxd_mqtt_client_password_length, wait_option);
4449 }
4450
4451 if (ret)
4452 {
4453
4454 /* Release the packet. */
4455 nx_packet_release(packet_ptr);
4456
4457 return(NXD_MQTT_PACKET_POOL_FAILURE);
4458 }
4459
4460 /* Ready to send the connect message to the server. */
4461 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
4462
4463 if (status)
4464 {
4465
4466 /* Release the packet. */
4467 nx_packet_release(packet_ptr);
4468 }
4469
4470 /* Update the timeout value. */
4471 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
4472
4473 return(status);
4474 }
4475
4476 /**************************************************************************/
4477 /* */
4478 /* FUNCTION RELEASE */
4479 /* */
4480 /* _nxd_mqtt_client_secure_connect PORTABLE C */
4481 /* 6.1 */
4482 /* AUTHOR */
4483 /* */
4484 /* Yuxin Zhou, Microsoft Corporation */
4485 /* */
4486 /* DESCRIPTION */
4487 /* */
4488 /* This function makes an initial secure (TLS) connection to */
4489 /* the MQTT server. */
4490 /* */
4491 /* */
4492 /* INPUT */
4493 /* */
4494 /* client_ptr Pointer to MQTT Client */
4495 /* server_ip Server IP address structure */
4496 /* server_port Server port number, in host */
4497 /* byte order */
4498 /* tls_setup User-supplied callback */
4499 /* function to set up TLS */
4500 /* parameters. */
4501 /* username User name, or NULL if user */
4502 /* name is not required */
4503 /* username_length Length of the user name, or */
4504 /* 0 if user name is NULL */
4505 /* password Password string, or NULL if */
4506 /* password is not required */
4507 /* password_length Length of the password, or */
4508 /* 0 if password is NULL */
4509 /* connection_flag Flag passed to the server */
4510 /* timeout Timeout value */
4511 /* */
4512 /* */
4513 /* OUTPUT */
4514 /* */
4515 /* status Completion status */
4516 /* */
4517 /* CALLS */
4518 /* */
4519 /* _nxd_mqtt_client_connect Actual MQTT Client connect */
4520 /* call */
4521 /* */
4522 /* CALLED BY */
4523 /* */
4524 /* Application Code */
4525 /* */
4526 /* RELEASE HISTORY */
4527 /* */
4528 /* DATE NAME DESCRIPTION */
4529 /* */
4530 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4531 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4532 /* resulting in version 6.1 */
4533 /* */
4534 /**************************************************************************/
4535 #ifdef NX_SECURE_ENABLE
_nxd_mqtt_client_secure_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT (* tls_setup)(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION *,NX_SECURE_X509_CERT *,NX_SECURE_X509_CERT *),UINT keepalive,UINT clean_session,ULONG wait_option)4536 UINT _nxd_mqtt_client_secure_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
4537 UINT (*tls_setup)(NXD_MQTT_CLIENT *client_ptr, NX_SECURE_TLS_SESSION *,
4538 NX_SECURE_X509_CERT *, NX_SECURE_X509_CERT *),
4539 UINT keepalive, UINT clean_session, ULONG wait_option)
4540 {
4541 UINT ret;
4542
4543 /* Set up TLS session information. */
4544 ret = (*tls_setup)(client_ptr, &client_ptr -> nxd_mqtt_tls_session,
4545 &client_ptr -> nxd_mqtt_tls_certificate,
4546 &client_ptr -> nxd_mqtt_tls_trusted_certificate);
4547
4548 if (ret)
4549 {
4550 return(ret);
4551 }
4552
4553 /* Mark the connection as secure. */
4554 client_ptr -> nxd_mqtt_client_use_tls = 1;
4555
4556 ret = _nxd_mqtt_client_connect(client_ptr, server_ip, server_port, keepalive, clean_session, wait_option);
4557
4558 return(ret);
4559 }
4560
4561 #endif /* NX_SECURE_ENABLE */
4562
4563
4564 /**************************************************************************/
4565 /* */
4566 /* FUNCTION RELEASE */
4567 /* */
4568 /* _nxd_mqtt_client_delete PORTABLE C */
4569 /* 6.2.0 */
4570 /* AUTHOR */
4571 /* */
4572 /* Yuxin Zhou, Microsoft Corporation */
4573 /* */
4574 /* DESCRIPTION */
4575 /* */
4576 /* This function deletes a previously created MQTT client instance. */
4577 /* If the NXD_MQTT_SOCKET_TIMEOUT is set to NX_WAIT_FOREVER, this may */
4578 /* suspend infinitely. This is because the MQTT Client must */
4579 /* disconnect with the server, and if the network link is disabled or */
4580 /* the server is not responding, this will blocks this function from */
4581 /* completing. */
4582 /* */
4583 /* INPUT */
4584 /* */
4585 /* client_ptr Pointer to MQTT Client */
4586 /* */
4587 /* OUTPUT */
4588 /* */
4589 /* status Completion status */
4590 /* */
4591 /* CALLS */
4592 /* */
4593 /* tx_event_flags_set */
4594 /* */
4595 /* CALLED BY */
4596 /* */
4597 /* Application Code */
4598 /* */
4599 /* RELEASE HISTORY */
4600 /* */
4601 /* DATE NAME DESCRIPTION */
4602 /* */
4603 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4604 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4605 /* resulting in version 6.1 */
4606 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
4607 /* mqtt over websocket, */
4608 /* resulting in version 6.2.0 */
4609 /* */
4610 /**************************************************************************/
_nxd_mqtt_client_delete(NXD_MQTT_CLIENT * client_ptr)4611 UINT _nxd_mqtt_client_delete(NXD_MQTT_CLIENT *client_ptr)
4612 {
4613
4614
4615 /* Set the event flag for DELETE. Next time when the MQTT client thread
4616 wakes up, it will perform the deletion process. */
4617 #ifndef NXD_MQTT_CLOUD_ENABLE
4618 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_DELETE_EVENT, TX_OR);
4619 #else
4620 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_DELETE_EVENT);
4621 #endif /* NXD_MQTT_CLOUD_ENABLE */
4622
4623 /* Check if the MQTT client thread has completed. */
4624 while((client_ptr -> nxd_mqtt_client_socket).nx_tcp_socket_id != 0)
4625 {
4626 tx_thread_sleep(NX_IP_PERIODIC_RATE);
4627 }
4628
4629 #ifndef NXD_MQTT_CLOUD_ENABLE
4630 /* Now we can delete the Client instance. */
4631 tx_thread_delete(&(client_ptr -> nxd_mqtt_thread));
4632 #else
4633 /* Deregister mqtt module from cloud helper. */
4634 nx_cloud_module_deregister(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module));
4635
4636 /* Delete own created cloud. */
4637 if (client_ptr -> nxd_mqtt_client_cloud.nx_cloud_id == NX_CLOUD_ID)
4638 nx_cloud_delete(&(client_ptr -> nxd_mqtt_client_cloud));
4639 #endif /* NXD_MQTT_CLOUD_ENABLE */
4640
4641 #ifdef NXD_MQTT_OVER_WEBSOCKET
4642 if (client_ptr -> nxd_mqtt_client_use_websocket)
4643 {
4644 nx_websocket_client_delete(&client_ptr -> nxd_mqtt_client_websocket);
4645 client_ptr -> nxd_mqtt_client_use_websocket = NX_FALSE;
4646 }
4647 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4648
4649 return(NXD_MQTT_SUCCESS);
4650 }
4651
4652 /**************************************************************************/
4653 /* */
4654 /* FUNCTION RELEASE */
4655 /* */
4656 /* _nxd_mqtt_client_publish_packet_send PORTABLE C */
4657 /* 6.2.0 */
4658 /* AUTHOR */
4659 /* */
4660 /* Yuxin Zhou, Microsoft Corporation */
4661 /* */
4662 /* DESCRIPTION */
4663 /* */
4664 /* This function sends a publish packet to the connected broker. */
4665 /* */
4666 /* */
4667 /* INPUT */
4668 /* */
4669 /* client_ptr Pointer to MQTT Client */
4670 /* packet_ptr Pointer to publish packet */
4671 /* packet_id Current packet ID */
4672 /* QoS Quality of service */
4673 /* wait_option Suspension option */
4674 /* */
4675 /* OUTPUT */
4676 /* */
4677 /* status Completion status */
4678 /* */
4679 /* CALLS */
4680 /* */
4681 /* tx_mutex_get */
4682 /* tx_mutex_put */
4683 /* nx_packet_release */
4684 /* _nxd_mqtt_copy_transmit_packet */
4685 /* _nxd_mqtt_packet_send */
4686 /* */
4687 /* CALLED BY */
4688 /* */
4689 /* _nxd_mqtt_client_publish */
4690 /* */
4691 /* RELEASE HISTORY */
4692 /* */
4693 /* DATE NAME DESCRIPTION */
4694 /* */
4695 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4696 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4697 /* resulting in version 6.1 */
4698 /* 08-02-2021 Yuxin Zhou Modified comment(s), */
4699 /* supported maximum transmit */
4700 /* queue depth, */
4701 /* resulting in version 6.1.8 */
4702 /* 10-31-2022 Bo Chen Modified comment(s), improved */
4703 /* the logic of sending packet,*/
4704 /* resulting in version 6.2.0 */
4705 /* */
4706 /**************************************************************************/
_nxd_mqtt_client_publish_packet_send(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,USHORT packet_id,UINT QoS,ULONG wait_option)4707 UINT _nxd_mqtt_client_publish_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr,
4708 USHORT packet_id, UINT QoS, ULONG wait_option)
4709 {
4710
4711 UINT status;
4712 UINT ret = NXD_MQTT_SUCCESS;
4713
4714 if (QoS != 0)
4715 {
4716 /* This packet needs to be stored locally for possible retransmission. */
4717 NX_PACKET *transmit_packet_ptr;
4718
4719 /* Copy packet for retransmission. */
4720 if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
4721 packet_id, NX_TRUE, wait_option))
4722 {
4723 return(NXD_MQTT_PACKET_POOL_FAILURE);
4724 }
4725
4726 /* Obtain the mutex. */
4727 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4728
4729 if (status != TX_SUCCESS)
4730 {
4731 nx_packet_release(transmit_packet_ptr);
4732
4733 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
4734 /* Decrease the transmit queue depth. */
4735 client_ptr -> message_transmit_queue_depth--;
4736 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
4737 return(NXD_MQTT_MUTEX_FAILURE);
4738 }
4739
4740 if (client_ptr -> message_transmit_queue_head == NX_NULL)
4741 {
4742 client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
4743 }
4744 else
4745 {
4746 client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
4747 }
4748 client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
4749 }
4750 else
4751 {
4752
4753 /* Obtain the mutex. */
4754 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4755
4756 if (status != TX_SUCCESS)
4757 {
4758 return(NXD_MQTT_MUTEX_FAILURE);
4759 }
4760 }
4761
4762 /* Update the timeout value. */
4763 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
4764
4765
4766 /* Release the mutex. */
4767 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4768
4769 /* Ready to send the connect message to the server. */
4770 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
4771
4772 if (status)
4773 {
4774 ret = NXD_MQTT_COMMUNICATION_FAILURE;
4775 }
4776
4777 return(ret);
4778 }
4779
4780 /**************************************************************************/
4781 /* */
4782 /* FUNCTION RELEASE */
4783 /* */
4784 /* _nxd_mqtt_client_publish PORTABLE C */
4785 /* 6.3.0 */
4786 /* AUTHOR */
4787 /* */
4788 /* Yuxin Zhou, Microsoft Corporation */
4789 /* */
4790 /* DESCRIPTION */
4791 /* */
4792 /* This function publishes a message to the connected broker. */
4793 /* */
4794 /* */
4795 /* INPUT */
4796 /* */
4797 /* client_ptr Pointer to MQTT Client */
4798 /* topic_name Name of the topic */
4799 /* topic_name_length Length of the topic name */
4800 /* message Message string */
4801 /* message_length Length of the message, */
4802 /* in bytes */
4803 /* retain The retain flag, whether */
4804 /* or not the broker should */
4805 /* store this message */
4806 /* QoS Expected QoS level */
4807 /* wait_option Suspension option */
4808 /* */
4809 /* OUTPUT */
4810 /* */
4811 /* status Completion status */
4812 /* */
4813 /* CALLS */
4814 /* */
4815 /* tx_mutex_get */
4816 /* _nxd_mqtt_client_packet_allocate */
4817 /* _nxd_mqtt_client_set_fixed_header */
4818 /* _nxd_mqtt_client_append_message */
4819 /* tx_mutex_put */
4820 /* nx_packet_release */
4821 /* _nxd_mqtt_client_publish_packet_send */
4822 /* */
4823 /* CALLED BY */
4824 /* */
4825 /* Application Code */
4826 /* */
4827 /* RELEASE HISTORY */
4828 /* */
4829 /* DATE NAME DESCRIPTION */
4830 /* */
4831 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4832 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4833 /* resulting in version 6.1 */
4834 /* 07-29-2022 Spencer McDonough Modified comment(s), */
4835 /* improved internal logic, */
4836 /* resulting in version 6.1.12 */
4837 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
4838 /* internal logic, */
4839 /* resulting in version 6.3.0 */
4840 /* */
4841 /**************************************************************************/
_nxd_mqtt_client_publish(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,CHAR * message,UINT message_length,UINT retain,UINT QoS,ULONG wait_option)4842 UINT _nxd_mqtt_client_publish(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length,
4843 CHAR *message, UINT message_length, UINT retain, UINT QoS, ULONG wait_option)
4844 {
4845
4846 NX_PACKET *packet_ptr;
4847 UINT status;
4848 UINT length = 0;
4849 UCHAR flags;
4850 USHORT packet_id = 0;
4851 UINT ret = NXD_MQTT_SUCCESS;
4852
4853 if (QoS == 2)
4854 {
4855 return(NXD_MQTT_QOS2_NOT_SUPPORTED);
4856 }
4857
4858
4859 /* Do nothing if the client is already connected. */
4860 if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTED)
4861 {
4862 return(NXD_MQTT_NOT_CONNECTED);
4863 }
4864
4865 status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, wait_option);
4866
4867 if (status != NXD_MQTT_SUCCESS)
4868 {
4869 return(NXD_MQTT_PACKET_POOL_FAILURE);
4870 }
4871
4872 flags = (UCHAR)((MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4) | (QoS << 1));
4873
4874 if (retain)
4875 {
4876 flags = flags | MQTT_PUBLISH_RETAIN;
4877 }
4878
4879
4880 /* Compute the remaining length. */
4881
4882 /* Topic Name. */
4883 /* Compute Topic Name length. */
4884 length = topic_name_length + 2;
4885
4886 /* Count packet ID for QoS 1 or QoS 2 message. */
4887 if ((QoS == 1) || (QoS == 2))
4888 {
4889 length += 2;
4890 }
4891
4892 /* Count message. */
4893 if ((message != NX_NULL) && (message_length != 0))
4894 {
4895 length += message_length;
4896 }
4897
4898 /* Write out the control header and remaining length field. */
4899 ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, flags, length, wait_option);
4900
4901 if (ret)
4902 {
4903
4904 /* Release the packet. */
4905 nx_packet_release(packet_ptr);
4906
4907 return(NXD_MQTT_INTERNAL_ERROR);
4908 }
4909
4910
4911 /* Write out topic */
4912 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, topic_name, topic_name_length, wait_option);
4913
4914 if (ret)
4915 {
4916
4917 /* Release the packet. */
4918 nx_packet_release(packet_ptr);
4919
4920 return(NXD_MQTT_INTERNAL_ERROR);
4921 }
4922
4923 /* Append Packet Identifier for QoS level 1 or 2 MQTT 3.3.2.2 */
4924 if ((QoS == 1) || (QoS == 2))
4925 {
4926 UCHAR identifier[2];
4927
4928 /* Obtain the mutex. */
4929 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4930
4931 if (status != TX_SUCCESS)
4932 {
4933 return(NXD_MQTT_MUTEX_FAILURE);
4934 }
4935
4936 packet_id = (USHORT)client_ptr -> nxd_mqtt_client_packet_identifier;
4937 identifier[0] = (UCHAR)(client_ptr -> nxd_mqtt_client_packet_identifier >> 8);
4938 identifier[1] = (client_ptr -> nxd_mqtt_client_packet_identifier & 0xFF);
4939
4940 /* Update packet id. */
4941 client_ptr -> nxd_mqtt_client_packet_identifier = (client_ptr -> nxd_mqtt_client_packet_identifier + 1) & 0xFFFF;
4942
4943 /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
4944 if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
4945 client_ptr -> nxd_mqtt_client_packet_identifier = 1;
4946
4947 /* Release the mutex. */
4948 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4949
4950 ret = nx_packet_data_append(packet_ptr, identifier, 2,
4951 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4952
4953 if (ret)
4954 {
4955
4956 /* Release the packet. */
4957 nx_packet_release(packet_ptr);
4958
4959 return(NXD_MQTT_INTERNAL_ERROR);
4960 }
4961 }
4962
4963 /* Append message. */
4964 if ((message != NX_NULL) && (message_length) != 0)
4965 {
4966
4967 /* Use nx_packet_data_append to move user-supplied message data into the packet.
4968 nx_packet_data_append uses chained packet if the additional storage space is
4969 needed. */
4970 ret = nx_packet_data_append(packet_ptr, message, message_length,
4971 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4972 if(ret)
4973 {
4974 /* Unable to obtain a new packet to store the message. */
4975
4976 /* Release the packet. */
4977 nx_packet_release(packet_ptr);
4978
4979 return(NXD_MQTT_INTERNAL_ERROR);
4980 }
4981 }
4982
4983 /* Send publish packet. */
4984 ret = _nxd_mqtt_client_publish_packet_send(client_ptr, packet_ptr, packet_id, QoS, wait_option);
4985
4986 if (ret)
4987 {
4988
4989 /* Release the packet. */
4990 nx_packet_release(packet_ptr);
4991 }
4992 return(ret);
4993 }
4994
4995 /**************************************************************************/
4996 /* */
4997 /* FUNCTION RELEASE */
4998 /* */
4999 /* _nxd_mqtt_client_subscribe PORTABLE C */
5000 /* 6.1.2 */
5001 /* AUTHOR */
5002 /* */
5003 /* Yuxin Zhou, Microsoft Corporation */
5004 /* */
5005 /* DESCRIPTION */
5006 /* */
5007 /* This function sends a subscribe message to the broker. */
5008 /* */
5009 /* */
5010 /* INPUT */
5011 /* */
5012 /* client_ptr Pointer to MQTT Client */
5013 /* topic_name Pointer to the topic string */
5014 /* to subscribe to */
5015 /* topic_name_length Length of the topic string */
5016 /* in bytes */
5017 /* QoS Expected QoS level */
5018 /* */
5019 /* OUTPUT */
5020 /* */
5021 /* status Completion status */
5022 /* */
5023 /* CALLS */
5024 /* */
5025 /* _nxd_mqtt_client_sub_unsub The actual routine that */
5026 /* performs the sub/unsub */
5027 /* action. */
5028 /* */
5029 /* CALLED BY */
5030 /* */
5031 /* Application Code */
5032 /* */
5033 /* RELEASE HISTORY */
5034 /* */
5035 /* DATE NAME DESCRIPTION */
5036 /* */
5037 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5038 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5039 /* resulting in version 6.1 */
5040 /* 11-09-2020 Yuxin Zhou Modified comment(s), and */
5041 /* added packet id parameter, */
5042 /* resulting in version 6.1.2 */
5043 /* */
5044 /**************************************************************************/
_nxd_mqtt_client_subscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,UINT QoS)5045 UINT _nxd_mqtt_client_subscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length, UINT QoS)
5046 {
5047
5048 if (QoS == 2)
5049 {
5050 return(NXD_MQTT_QOS2_NOT_SUPPORTED);
5051 }
5052
5053 return(_nxd_mqtt_client_sub_unsub(client_ptr, (MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02,
5054 topic_name, topic_name_length, NX_NULL, QoS));
5055 }
5056
5057
5058
5059
5060 /**************************************************************************/
5061 /* */
5062 /* FUNCTION RELEASE */
5063 /* */
5064 /* _nxd_mqtt_client_unsubscribe PORTABLE C */
5065 /* 6.1.2 */
5066 /* AUTHOR */
5067 /* */
5068 /* Yuxin Zhou, Microsoft Corporation */
5069 /* */
5070 /* DESCRIPTION */
5071 /* */
5072 /* This function unsubscribes a topic from the broker. */
5073 /* */
5074 /* */
5075 /* INPUT */
5076 /* */
5077 /* client_ptr Pointer to MQTT Client */
5078 /* topic_name Pointer to the topic string */
5079 /* to subscribe to */
5080 /* topic_name_length Length of the topic string */
5081 /* in bytes */
5082 /* */
5083 /* OUTPUT */
5084 /* */
5085 /* status Completion status */
5086 /* */
5087 /* CALLS */
5088 /* */
5089 /* _nxd_mqtt_client_sub_unsub */
5090 /* */
5091 /* CALLED BY */
5092 /* */
5093 /* Application Code */
5094 /* */
5095 /* RELEASE HISTORY */
5096 /* */
5097 /* DATE NAME DESCRIPTION */
5098 /* */
5099 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5100 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5101 /* resulting in version 6.1 */
5102 /* 11-09-2020 Yuxin Zhou Modified comment(s), and */
5103 /* added packet id parameter, */
5104 /* resulting in version 6.1.2 */
5105 /* */
5106 /**************************************************************************/
_nxd_mqtt_client_unsubscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length)5107 UINT _nxd_mqtt_client_unsubscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length)
5108 {
5109 return(_nxd_mqtt_client_sub_unsub(client_ptr, (MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE << 4) | 0x02,
5110 topic_name, topic_name_length, NX_NULL, 0));
5111 }
5112
5113 /**************************************************************************/
5114 /* */
5115 /* FUNCTION RELEASE */
5116 /* */
5117 /* _nxd_mqtt_send_simple_message PORTABLE C */
5118 /* 6.3.0 */
5119 /* AUTHOR */
5120 /* */
5121 /* Yuxin Zhou, Microsoft Corporation */
5122 /* */
5123 /* DESCRIPTION */
5124 /* */
5125 /* This internal function handles the transmission of PINGREQ or */
5126 /* DISCONNECT message. */
5127 /* */
5128 /* */
5129 /* INPUT */
5130 /* */
5131 /* client_ptr Pointer to MQTT Client */
5132 /* header_value Value to be programmed into */
5133 /* MQTT header. */
5134 /* */
5135 /* OUTPUT */
5136 /* */
5137 /* status Completion status */
5138 /* */
5139 /* CALLS */
5140 /* */
5141 /* tx_mutex_get */
5142 /* _nxd_mqtt_client_packet_allocate */
5143 /* tx_mutex_put */
5144 /* _nxd_mqtt_packet_send */
5145 /* nx_packet_release */
5146 /* */
5147 /* CALLED BY */
5148 /* */
5149 /* _nxd_mqtt_client_ping */
5150 /* _nxd_mqtt_client_disconnect */
5151 /* */
5152 /* RELEASE HISTORY */
5153 /* */
5154 /* DATE NAME DESCRIPTION */
5155 /* */
5156 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5157 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5158 /* resulting in version 6.1 */
5159 /* 07-29-2022 Spencer McDonough Modified comment(s), */
5160 /* improved internal logic, */
5161 /* resulting in version 6.1.12 */
5162 /* 10-31-2022 Bo Chen Modified comment(s), improved */
5163 /* the logic of sending packet,*/
5164 /* resulting in version 6.2.0 */
5165 /* 10-31-2023 Haiqing Zhao Modified comment(s), improved */
5166 /* internal logic, */
5167 /* resulting in version 6.3.0 */
5168 /* */
5169 /**************************************************************************/
_nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT * client_ptr,UCHAR header_value)5170 static UINT _nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT *client_ptr, UCHAR header_value)
5171 {
5172
5173 NX_PACKET *packet_ptr;
5174 UINT status;
5175 UINT status_mutex;
5176 UCHAR *byte;
5177
5178 status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
5179 if (status)
5180 {
5181 return(NXD_MQTT_INTERNAL_ERROR);
5182 }
5183
5184 if (2u > ((ULONG)(packet_ptr -> nx_packet_data_end) - (ULONG)(packet_ptr -> nx_packet_append_ptr)))
5185 {
5186 nx_packet_release(packet_ptr);
5187
5188 /* Packet buffer is too small to hold the message. */
5189 return(NX_SIZE_ERROR);
5190 }
5191
5192 byte = packet_ptr -> nx_packet_prepend_ptr;
5193
5194 *byte = (UCHAR)(header_value << 4);
5195 *(byte + 1) = 0;
5196
5197 packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + 2;
5198 packet_ptr -> nx_packet_length = 2;
5199
5200
5201 /* Release MQTT protection before making NetX/TLS calls. */
5202 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5203
5204 /* Send packet to server. */
5205 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
5206
5207 status_mutex = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5208 if (status)
5209 {
5210
5211 /* Release the packet. */
5212 nx_packet_release(packet_ptr);
5213
5214 status = NXD_MQTT_COMMUNICATION_FAILURE;
5215 }
5216 if (status_mutex)
5217 {
5218 return(NXD_MQTT_MUTEX_FAILURE);
5219 }
5220
5221 if (header_value == MQTT_CONTROL_PACKET_TYPE_PINGREQ)
5222 {
5223 /* Do not update the ping sent time if the outstanding ping has not been responded yet */
5224 if (client_ptr -> nxd_mqtt_ping_not_responded != 1)
5225 {
5226 /* Record the time we send out the PINGREG */
5227 client_ptr -> nxd_mqtt_ping_sent_time = tx_time_get();
5228 client_ptr -> nxd_mqtt_ping_not_responded = 1;
5229 }
5230 }
5231
5232 /* Update the timeout value. */
5233 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
5234
5235 return(status);
5236 }
5237
5238
5239 /**************************************************************************/
5240 /* */
5241 /* FUNCTION RELEASE */
5242 /* */
5243 /* _nxd_mqtt_client_disconnect PORTABLE C */
5244 /* 6.1 */
5245 /* AUTHOR */
5246 /* */
5247 /* Yuxin Zhou, Microsoft Corporation */
5248 /* */
5249 /* DESCRIPTION */
5250 /* */
5251 /* This function disconnects the MQTT client from a server. */
5252 /* */
5253 /* */
5254 /* INPUT */
5255 /* */
5256 /* client_ptr Pointer to MQTT Client */
5257 /* */
5258 /* OUTPUT */
5259 /* */
5260 /* status Completion status */
5261 /* */
5262 /* CALLS */
5263 /* */
5264 /* _nxd_mqtt_send_simple_message */
5265 /* _nxd_mqtt_process_disconnect */
5266 /* */
5267 /* CALLED BY */
5268 /* */
5269 /* Application Code */
5270 /* */
5271 /* RELEASE HISTORY */
5272 /* */
5273 /* DATE NAME DESCRIPTION */
5274 /* */
5275 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5276 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5277 /* resulting in version 6.1 */
5278 /* */
5279 /**************************************************************************/
_nxd_mqtt_client_disconnect(NXD_MQTT_CLIENT * client_ptr)5280 UINT _nxd_mqtt_client_disconnect(NXD_MQTT_CLIENT *client_ptr)
5281 {
5282 UINT status;
5283
5284 /* Obtain the mutex. */
5285 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
5286 if (status != TX_SUCCESS)
5287 {
5288 /* Disable timer if timer has been started. */
5289 if (client_ptr -> nxd_mqtt_keepalive)
5290 {
5291 tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
5292 }
5293
5294 return(NXD_MQTT_MUTEX_FAILURE);
5295 }
5296
5297 /* Let the server know we are ending the MQTT session. */
5298 _nxd_mqtt_send_simple_message(client_ptr, MQTT_CONTROL_PACKET_TYPE_DISCONNECT);
5299
5300 /* Call the disconnect routine to disconnect the socket,
5301 release transmit packets, release received packets,
5302 and delete the client timer. */
5303 _nxd_mqtt_process_disconnect(client_ptr);
5304
5305 /* Release the mutex. */
5306 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5307
5308 return(status);
5309 }
5310
5311 /**************************************************************************/
5312 /* */
5313 /* FUNCTION RELEASE */
5314 /* */
5315 /* _nxd_mqtt_client_receive_notify_set PORTABLE C */
5316 /* 6.1 */
5317 /* AUTHOR */
5318 /* */
5319 /* Yuxin Zhou, Microsoft Corporation */
5320 /* */
5321 /* DESCRIPTION */
5322 /* */
5323 /* This function installs the MQTT client publish notify callback */
5324 /* function. */
5325 /* */
5326 /* */
5327 /* INPUT */
5328 /* */
5329 /* client_ptr Pointer to MQTT Client */
5330 /* mqtt_client_receive_notify User-supplied callback */
5331 /* function, which is invoked */
5332 /* upon receiving a publish */
5333 /* message. */
5334 /* */
5335 /* OUTPUT */
5336 /* */
5337 /* status Completion status */
5338 /* */
5339 /* CALLS */
5340 /* */
5341 /* tx_mutex_get */
5342 /* tx_mutex_put */
5343 /* */
5344 /* CALLED BY */
5345 /* */
5346 /* Application Code */
5347 /* */
5348 /* RELEASE HISTORY */
5349 /* */
5350 /* DATE NAME DESCRIPTION */
5351 /* */
5352 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5353 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5354 /* resulting in version 6.1 */
5355 /* */
5356 /**************************************************************************/
_nxd_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* receive_notify)(NXD_MQTT_CLIENT * client_ptr,UINT message_count))5357 UINT _nxd_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT *client_ptr,
5358 VOID (*receive_notify)(NXD_MQTT_CLIENT *client_ptr, UINT message_count))
5359 {
5360
5361 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5362
5363 client_ptr -> nxd_mqtt_client_receive_notify = receive_notify;
5364
5365 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5366
5367 return(NXD_MQTT_SUCCESS);
5368 }
5369
5370 /**************************************************************************/
5371 /* */
5372 /* FUNCTION RELEASE */
5373 /* */
5374 /* _nxd_mqtt_client_message_get PORTABLE C */
5375 /* 6.1 */
5376 /* AUTHOR */
5377 /* */
5378 /* Yuxin Zhou, Microsoft Corporation */
5379 /* */
5380 /* DESCRIPTION */
5381 /* */
5382 /* This function retrieves a published MQTT message. */
5383 /* */
5384 /* */
5385 /* INPUT */
5386 /* */
5387 /* client_ptr Pointer to MQTT Client */
5388 /* topic_buffer Pointer to the topic buffer */
5389 /* where topic is copied to */
5390 /* topic_buffer_size Size of the topic buffer. */
5391 /* actual_topic_length Number of bytes copied into */
5392 /* topic_buffer */
5393 /* message_buffer Pointer to the buffer where */
5394 /* message is copied to */
5395 /* message_buffer_size Size of the message_buffer */
5396 /* actual_message_length Number of bytes copied into */
5397 /* the message buffer. */
5398 /* */
5399 /* OUTPUT */
5400 /* */
5401 /* status Completion status */
5402 /* */
5403 /* CALLS */
5404 /* */
5405 /* _nxd_mqtt_client_disconnect Actual MQTT Client disconnect */
5406 /* call */
5407 /* _nxd_mqtt_read_remaining_length Skip the remaining length */
5408 /* field */
5409 /* tx_mutex_get */
5410 /* tx_mutex_put */
5411 /* */
5412 /* CALLED BY */
5413 /* */
5414 /* Application Code */
5415 /* */
5416 /* RELEASE HISTORY */
5417 /* */
5418 /* DATE NAME DESCRIPTION */
5419 /* */
5420 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5421 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
5422 /* fixed uninitialized value, */
5423 /* resulting in version 6.1 */
5424 /* */
5425 /**************************************************************************/
_nxd_mqtt_client_message_get(NXD_MQTT_CLIENT * client_ptr,UCHAR * topic_buffer,UINT topic_buffer_size,UINT * actual_topic_length,UCHAR * message_buffer,UINT message_buffer_size,UINT * actual_message_length)5426 UINT _nxd_mqtt_client_message_get(NXD_MQTT_CLIENT *client_ptr, UCHAR *topic_buffer, UINT topic_buffer_size, UINT *actual_topic_length,
5427 UCHAR *message_buffer, UINT message_buffer_size, UINT *actual_message_length)
5428 {
5429
5430 UINT status;
5431 NX_PACKET *packet_ptr;
5432 ULONG topic_offset;
5433 USHORT topic_length;
5434 ULONG message_offset;
5435 ULONG message_length;
5436
5437 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5438 while (client_ptr -> message_receive_queue_depth)
5439 {
5440 packet_ptr = client_ptr -> message_receive_queue_head;
5441 status = _nxd_mqtt_process_publish_packet(packet_ptr, &topic_offset, &topic_length, &message_offset, &message_length);
5442 if (status == NXD_MQTT_SUCCESS)
5443 {
5444 if ((topic_buffer_size < topic_length) ||
5445 (message_buffer_size < message_length))
5446 {
5447 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5448 return(NXD_MQTT_INSUFFICIENT_BUFFER_SPACE);
5449 }
5450 }
5451
5452 client_ptr -> message_receive_queue_head = packet_ptr -> nx_packet_queue_next;
5453 if (client_ptr -> message_receive_queue_tail == packet_ptr)
5454 {
5455 client_ptr -> message_receive_queue_tail = NX_NULL;
5456 }
5457 client_ptr -> message_receive_queue_depth--;
5458
5459 if (status == NXD_MQTT_SUCCESS)
5460 {
5461
5462 /* Set topic and message lengths to avoid uninitialized value. */
5463 *actual_topic_length = 0;
5464 *actual_message_length = 0;
5465 nx_packet_data_extract_offset(packet_ptr, topic_offset, topic_buffer,
5466 topic_length, (ULONG *)actual_topic_length);
5467 nx_packet_data_extract_offset(packet_ptr, message_offset, message_buffer,
5468 message_length, (ULONG *)actual_message_length);
5469 nx_packet_release(packet_ptr);
5470
5471 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5472 return(NXD_MQTT_SUCCESS);
5473 }
5474 nx_packet_release(packet_ptr);
5475 }
5476 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5477 return(NXD_MQTT_NO_MESSAGE);
5478 }
5479
5480 /**************************************************************************/
5481 /* */
5482 /* FUNCTION RELEASE */
5483 /* */
5484 /* _nxde_mqtt_client_create PORTABLE C */
5485 /* 6.1 */
5486 /* AUTHOR */
5487 /* */
5488 /* Yuxin Zhou, Microsoft Corporation */
5489 /* */
5490 /* DESCRIPTION */
5491 /* */
5492 /* This function checks for errors in nxd_mqt_client_create call. */
5493 /* */
5494 /* */
5495 /* INPUT */
5496 /* */
5497 /* client_ptr Pointer to MQTT Client */
5498 /* client_name Name string used in by the */
5499 /* client */
5500 /* client_id Client ID used by the client */
5501 /* client_id_length Length of Client ID, in bytes */
5502 /* ip_ptr Pointer to IP instance */
5503 /* pool_ptr Pointer to packet pool */
5504 /* stack_ptr Client thread's stack pointer */
5505 /* stack_size Client thread's stack size */
5506 /* mqtt_thread_priority Priority for MQTT thread */
5507 /* memory_ptr Deprecated and not used */
5508 /* memory_size Deprecated and not used */
5509 /* */
5510 /* OUTPUT */
5511 /* */
5512 /* status Completion status */
5513 /* */
5514 /* CALLS */
5515 /* */
5516 /* _nxd_mqtt_client_create Actual client create call */
5517 /* */
5518 /* CALLED BY */
5519 /* */
5520 /* Application Code */
5521 /* */
5522 /* RELEASE HISTORY */
5523 /* */
5524 /* DATE NAME DESCRIPTION */
5525 /* */
5526 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5527 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5528 /* resulting in version 6.1 */
5529 /* */
5530 /**************************************************************************/
_nxde_mqtt_client_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority,VOID * memory_ptr,ULONG memory_size)5531 UINT _nxde_mqtt_client_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name, CHAR *client_id, UINT client_id_length,
5532 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
5533 VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority,
5534 VOID *memory_ptr, ULONG memory_size)
5535 {
5536
5537
5538 /* Check for invalid input pointers. */
5539 if ((client_ptr == NX_NULL) || (ip_ptr == NX_NULL) || (ip_ptr -> nx_ip_id != NX_IP_ID) ||
5540 (stack_ptr == NX_NULL) || (stack_size == 0) || (pool_ptr == NX_NULL))
5541 {
5542 return(NX_PTR_ERROR);
5543 }
5544
5545 return(_nxd_mqtt_client_create(client_ptr, client_name, client_id, client_id_length, ip_ptr,
5546 pool_ptr, stack_ptr, stack_size, mqtt_thread_priority,
5547 memory_ptr, memory_size));
5548 }
5549
5550
5551 /**************************************************************************/
5552 /* */
5553 /* FUNCTION RELEASE */
5554 /* */
5555 /* _nxde_mqtt_client_connect PORTABLE C */
5556 /* 6.1 */
5557 /* AUTHOR */
5558 /* */
5559 /* Yuxin Zhou, Microsoft Corporation */
5560 /* */
5561 /* DESCRIPTION */
5562 /* */
5563 /* This function checks for errors in MQTT client stop call. */
5564 /* */
5565 /* */
5566 /* INPUT */
5567 /* */
5568 /* client_ptr Pointer to MQTT Client */
5569 /* server_ip Server IP address structure */
5570 /* server_port Server port number, in host */
5571 /* byte order */
5572 /* keepalive The MQTT keepalive timer */
5573 /* clean_session Clean session flag */
5574 /* wait_option Suspension option */
5575 /* */
5576 /* */
5577 /* OUTPUT */
5578 /* */
5579 /* status Completion status */
5580 /* */
5581 /* CALLS */
5582 /* */
5583 /* _nxd_mqtt_client_connect Actual MQTT Client connect */
5584 /* call */
5585 /* */
5586 /* CALLED BY */
5587 /* */
5588 /* Application Code */
5589 /* */
5590 /* RELEASE HISTORY */
5591 /* */
5592 /* DATE NAME DESCRIPTION */
5593 /* */
5594 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5595 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
5596 /* corrected mqtt client state,*/
5597 /* resulting in version 6.1 */
5598 /* */
5599 /**************************************************************************/
_nxde_mqtt_client_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT keepalive,UINT clean_session,ULONG wait_option)5600 UINT _nxde_mqtt_client_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
5601 UINT keepalive, UINT clean_session, ULONG wait_option)
5602 {
5603
5604 UINT status;
5605
5606 if (client_ptr == NX_NULL)
5607 {
5608 return(NX_PTR_ERROR);
5609 }
5610
5611 if (server_ip == NX_NULL)
5612 {
5613 return(NX_PTR_ERROR);
5614 }
5615
5616 /* Test for IP version flag. */
5617 if ((server_ip -> nxd_ip_version != 4) && (server_ip -> nxd_ip_version != 6))
5618 {
5619 return(NXD_MQTT_INVALID_PARAMETER);
5620 }
5621
5622 if (server_port == 0)
5623 {
5624 return(NX_INVALID_PORT);
5625 }
5626
5627 status = _nxd_mqtt_client_connect(client_ptr, server_ip, server_port, keepalive, clean_session, wait_option);
5628
5629 return(status);
5630 }
5631
5632 /**************************************************************************/
5633 /* */
5634 /* FUNCTION RELEASE */
5635 /* */
5636 /* _nxde_mqtt_client_secure_connect PORTABLE C */
5637 /* 6.1 */
5638 /* AUTHOR */
5639 /* */
5640 /* Yuxin Zhou, Microsoft Corporation */
5641 /* */
5642 /* DESCRIPTION */
5643 /* */
5644 /* This function checks for errors in MQTT client TLS secure connect. */
5645 /* */
5646 /* */
5647 /* INPUT */
5648 /* */
5649 /* client_ptr Pointer to MQTT Client */
5650 /* server_ip Server IP address structure */
5651 /* server_port Server port number, in host */
5652 /* byte order */
5653 /* tls_setup User-supplied callback */
5654 /* function to set up TLS */
5655 /* parameters. */
5656 /* keepalive The MQTT keepalive timer */
5657 /* wait_option Suspension option */
5658 /* */
5659 /* */
5660 /* OUTPUT */
5661 /* */
5662 /* status Completion status */
5663 /* */
5664 /* CALLS */
5665 /* */
5666 /* _nxd_mqtt_client_connect Actual MQTT Client connect */
5667 /* call */
5668 /* */
5669 /* CALLED BY */
5670 /* */
5671 /* Application Code */
5672 /* */
5673 /* RELEASE HISTORY */
5674 /* */
5675 /* DATE NAME DESCRIPTION */
5676 /* */
5677 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5678 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
5679 /* corrected mqtt client state,*/
5680 /* resulting in version 6.1 */
5681 /* */
5682 /**************************************************************************/
5683 #ifdef NX_SECURE_ENABLE
5684
_nxde_mqtt_client_secure_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT (* tls_setup)(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION *,NX_SECURE_X509_CERT *,NX_SECURE_X509_CERT *),UINT keepalive,UINT clean_session,ULONG wait_option)5685 UINT _nxde_mqtt_client_secure_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
5686 UINT (*tls_setup)(NXD_MQTT_CLIENT *client_ptr, NX_SECURE_TLS_SESSION *,
5687 NX_SECURE_X509_CERT *, NX_SECURE_X509_CERT *),
5688 UINT keepalive, UINT clean_session, ULONG wait_option)
5689 {
5690
5691 UINT status;
5692
5693 if (client_ptr == NX_NULL)
5694 {
5695 return(NX_PTR_ERROR);
5696 }
5697
5698 if (server_ip == NX_NULL)
5699 {
5700 return(NX_PTR_ERROR);
5701 }
5702
5703 if (server_port == 0)
5704 {
5705 return(NX_INVALID_PORT);
5706 }
5707
5708 status = _nxd_mqtt_client_secure_connect(client_ptr, server_ip, server_port, tls_setup,
5709 keepalive, clean_session, wait_option);
5710
5711 return(status);
5712 }
5713 #endif /* NX_SECURE_ENABLE */
5714
5715
5716 /**************************************************************************/
5717 /* */
5718 /* FUNCTION RELEASE */
5719 /* */
5720 /* _nxde_mqtt_client_delete PORTABLE C */
5721 /* 6.1 */
5722 /* AUTHOR */
5723 /* */
5724 /* Yuxin Zhou, Microsoft Corporation */
5725 /* */
5726 /* DESCRIPTION */
5727 /* */
5728 /* This function checks for errors in MQTT client delete call. */
5729 /* */
5730 /* */
5731 /* INPUT */
5732 /* */
5733 /* client_ptr Pointer to MQTT Client */
5734 /* */
5735 /* OUTPUT */
5736 /* */
5737 /* status Completion status */
5738 /* */
5739 /* CALLS */
5740 /* */
5741 /* _nxd_mqtt_client_delete Actual MQTT Client delete */
5742 /* call. */
5743 /* */
5744 /* CALLED BY */
5745 /* */
5746 /* Application Code */
5747 /* */
5748 /* RELEASE HISTORY */
5749 /* */
5750 /* DATE NAME DESCRIPTION */
5751 /* */
5752 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5753 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5754 /* resulting in version 6.1 */
5755 /* */
5756 /**************************************************************************/
_nxde_mqtt_client_delete(NXD_MQTT_CLIENT * client_ptr)5757 UINT _nxde_mqtt_client_delete(NXD_MQTT_CLIENT *client_ptr)
5758 {
5759
5760 UINT status;
5761
5762 if (client_ptr == NX_NULL)
5763 {
5764 return(NX_PTR_ERROR);
5765 }
5766
5767 status = _nxd_mqtt_client_delete(client_ptr);
5768
5769 return(status);
5770 }
5771
5772
5773
5774 /**************************************************************************/
5775 /* */
5776 /* FUNCTION RELEASE */
5777 /* */
5778 /* _nxde_mqtt_client_publish PORTABLE C */
5779 /* 6.1 */
5780 /* AUTHOR */
5781 /* */
5782 /* Yuxin Zhou, Microsoft Corporation */
5783 /* */
5784 /* DESCRIPTION */
5785 /* */
5786 /* This function performs error checking to the publish service. */
5787 /* */
5788 /* */
5789 /* INPUT */
5790 /* */
5791 /* client_ptr Pointer to MQTT Client */
5792 /* topic_name Name of the topic */
5793 /* topic_name_length Length of the topic name */
5794 /* message Message string */
5795 /* message_length Length of the message, */
5796 /* in bytes */
5797 /* retain The retain flag, whether */
5798 /* or not the broker should */
5799 /* store this message */
5800 /* QoS Expected QoS level */
5801 /* wait_option Suspension option */
5802 /* */
5803 /* OUTPUT */
5804 /* */
5805 /* status Completion status */
5806 /* */
5807 /* CALLS */
5808 /* */
5809 /* */
5810 /* CALLED BY */
5811 /* */
5812 /* Application Code */
5813 /* */
5814 /* RELEASE HISTORY */
5815 /* */
5816 /* DATE NAME DESCRIPTION */
5817 /* */
5818 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5819 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5820 /* resulting in version 6.1 */
5821 /* */
5822 /**************************************************************************/
_nxde_mqtt_client_publish(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,CHAR * message,UINT message_length,UINT retain,UINT QoS,ULONG wait_option)5823 UINT _nxde_mqtt_client_publish(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length,
5824 CHAR *message, UINT message_length, UINT retain, UINT QoS, ULONG wait_option)
5825 {
5826 /* Validate client_ptr */
5827 if (client_ptr == NX_NULL)
5828 {
5829 return(NX_PTR_ERROR);
5830 }
5831
5832 /* Validate topic_name */
5833 if ((topic_name == NX_NULL) || (topic_name_length == 0))
5834 {
5835 return(NXD_MQTT_INVALID_PARAMETER);
5836 }
5837
5838 /* Validate message length. */
5839 if (message && (message_length == 0))
5840 {
5841 return(NXD_MQTT_INVALID_PARAMETER);
5842 }
5843
5844 /* Validate QoS value. */
5845 if (QoS > 3)
5846 {
5847 return(NXD_MQTT_INVALID_PARAMETER);
5848 }
5849
5850 return(_nxd_mqtt_client_publish(client_ptr, topic_name, topic_name_length, message, message_length, retain, QoS, wait_option));
5851 }
5852
5853
5854 /**************************************************************************/
5855 /* */
5856 /* FUNCTION RELEASE */
5857 /* */
5858 /* _nxde_mqtt_client_subscribe PORTABLE C */
5859 /* 6.1 */
5860 /* AUTHOR */
5861 /* */
5862 /* Yuxin Zhou, Microsoft Corporation */
5863 /* */
5864 /* DESCRIPTION */
5865 /* */
5866 /* This function performs error checking to the subscribe service. */
5867 /* */
5868 /* */
5869 /* INPUT */
5870 /* */
5871 /* client_ptr Pointer to MQTT Client */
5872 /* topic_name Pointer to the topic string */
5873 /* to subscribe to */
5874 /* topic_name_length Length of the topic string */
5875 /* in bytes */
5876 /* */
5877 /* OUTPUT */
5878 /* */
5879 /* client_ptr Pointer to MQTT Client */
5880 /* topic_name Pointer to the topic string */
5881 /* to subscribe to */
5882 /* topic_name_length Length of the topic string */
5883 /* in bytes */
5884 /* QoS Expected QoS level */
5885 /* */
5886 /* CALLS */
5887 /* */
5888 /* _nxd_mqtt_client_subscribe */
5889 /* */
5890 /* CALLED BY */
5891 /* */
5892 /* Application Code */
5893 /* */
5894 /* RELEASE HISTORY */
5895 /* */
5896 /* DATE NAME DESCRIPTION */
5897 /* */
5898 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5899 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5900 /* resulting in version 6.1 */
5901 /* */
5902 /**************************************************************************/
_nxde_mqtt_client_subscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,UINT QoS)5903 UINT _nxde_mqtt_client_subscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length, UINT QoS)
5904 {
5905
5906
5907 /* Validate client_ptr */
5908 if (client_ptr == NX_NULL)
5909 {
5910 return(NX_PTR_ERROR);
5911 }
5912
5913 /* Validate topic_name */
5914 if ((topic_name == NX_NULL) || (topic_name_length == 0))
5915 {
5916 return(NXD_MQTT_INVALID_PARAMETER);
5917 }
5918
5919 /* Validate QoS value. */
5920 if (QoS > 2)
5921 {
5922 return(NXD_MQTT_INVALID_PARAMETER);
5923 }
5924
5925 return(_nxd_mqtt_client_subscribe(client_ptr, topic_name, topic_name_length, QoS));
5926 }
5927
5928
5929
5930
5931 /**************************************************************************/
5932 /* */
5933 /* FUNCTION RELEASE */
5934 /* */
5935 /* _nxde_mqtt_client_unsubscribe PORTABLE C */
5936 /* 6.1 */
5937 /* AUTHOR */
5938 /* */
5939 /* Yuxin Zhou, Microsoft Corporation */
5940 /* */
5941 /* DESCRIPTION */
5942 /* */
5943 /* This function performs error checking to the unsubscribe service. */
5944 /* */
5945 /* */
5946 /* INPUT */
5947 /* */
5948 /* client_ptr Pointer to MQTT Client */
5949 /* topic_name Pointer to the topic string */
5950 /* to unsubscribe */
5951 /* topic_name_length Length of the topic string */
5952 /* in bytes */
5953 /* */
5954 /* OUTPUT */
5955 /* */
5956 /* status Completion status */
5957 /* */
5958 /* CALLS */
5959 /* */
5960 /* */
5961 /* CALLED BY */
5962 /* */
5963 /* Application Code */
5964 /* */
5965 /* RELEASE HISTORY */
5966 /* */
5967 /* DATE NAME DESCRIPTION */
5968 /* */
5969 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5970 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5971 /* resulting in version 6.1 */
5972 /* */
5973 /**************************************************************************/
_nxde_mqtt_client_unsubscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length)5974 UINT _nxde_mqtt_client_unsubscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length)
5975 {
5976 /* Validate client_ptr */
5977 if (client_ptr == NX_NULL)
5978 {
5979 return(NX_PTR_ERROR);
5980 }
5981
5982 /* Validate topic_name */
5983 if ((topic_name == NX_NULL) || (topic_name_length == 0))
5984 {
5985 return(NXD_MQTT_INVALID_PARAMETER);
5986 }
5987
5988 return(_nxd_mqtt_client_unsubscribe(client_ptr, topic_name, topic_name_length));
5989 }
5990
5991
5992 /**************************************************************************/
5993 /* */
5994 /* FUNCTION RELEASE */
5995 /* */
5996 /* _nxde_mqtt_client_disconnect PORTABLE C */
5997 /* 6.1 */
5998 /* AUTHOR */
5999 /* */
6000 /* Yuxin Zhou, Microsoft Corporation */
6001 /* */
6002 /* DESCRIPTION */
6003 /* */
6004 /* This function checks for errors in MQTT client disconnect call. */
6005 /* */
6006 /* */
6007 /* INPUT */
6008 /* */
6009 /* client_ptr Pointer to MQTT Client */
6010 /* */
6011 /* OUTPUT */
6012 /* */
6013 /* status Completion status */
6014 /* */
6015 /* CALLS */
6016 /* */
6017 /* _nxd_mqtt_client_disconnect Actual MQTT Client disconnect */
6018 /* call */
6019 /* */
6020 /* CALLED BY */
6021 /* */
6022 /* Application Code */
6023 /* */
6024 /* RELEASE HISTORY */
6025 /* */
6026 /* DATE NAME DESCRIPTION */
6027 /* */
6028 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6029 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6030 /* resulting in version 6.1 */
6031 /* */
6032 /**************************************************************************/
_nxde_mqtt_client_disconnect(NXD_MQTT_CLIENT * client_ptr)6033 UINT _nxde_mqtt_client_disconnect(NXD_MQTT_CLIENT *client_ptr)
6034 {
6035 /* Validate client_ptr */
6036 if (client_ptr == NX_NULL)
6037 {
6038 return(NX_PTR_ERROR);
6039 }
6040
6041 return(_nxd_mqtt_client_disconnect(client_ptr));
6042 }
6043
6044
6045 /**************************************************************************/
6046 /* */
6047 /* FUNCTION RELEASE */
6048 /* */
6049 /* _nxde_mqtt_client_message_get PORTABLE C */
6050 /* 6.1 */
6051 /* AUTHOR */
6052 /* */
6053 /* Yuxin Zhou, Microsoft Corporation */
6054 /* */
6055 /* DESCRIPTION */
6056 /* */
6057 /* This function checks for errors in MQTT client message get call. */
6058 /* */
6059 /* */
6060 /* INPUT */
6061 /* */
6062 /* client_ptr Pointer to MQTT Client */
6063 /* topic_buffer Pointer to the topic buffer */
6064 /* where topic is copied to */
6065 /* topic_buffer_size Size of the topic buffer. */
6066 /* actual_topic_length Number of bytes copied into */
6067 /* topic_buffer */
6068 /* message_buffer Pointer to the buffer where */
6069 /* message is copied to */
6070 /* message_buffer_size Size of the message_buffer */
6071 /* actual_message_length Number of bytes copied into */
6072 /* the message buffer. */
6073 /* */
6074 /* OUTPUT */
6075 /* */
6076 /* status Completion status */
6077 /* */
6078 /* CALLS */
6079 /* */
6080 /* _nxd_mqtt_client_message_get */
6081 /* */
6082 /* */
6083 /* CALLED BY */
6084 /* */
6085 /* Application Code */
6086 /* */
6087 /* RELEASE HISTORY */
6088 /* */
6089 /* DATE NAME DESCRIPTION */
6090 /* */
6091 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6092 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6093 /* resulting in version 6.1 */
6094 /* */
6095 /**************************************************************************/
_nxde_mqtt_client_message_get(NXD_MQTT_CLIENT * client_ptr,UCHAR * topic_buffer,UINT topic_buffer_size,UINT * actual_topic_length,UCHAR * message_buffer,UINT message_buffer_size,UINT * actual_message_length)6096 UINT _nxde_mqtt_client_message_get(NXD_MQTT_CLIENT *client_ptr, UCHAR *topic_buffer, UINT topic_buffer_size, UINT *actual_topic_length,
6097 UCHAR *message_buffer, UINT message_buffer_size, UINT *actual_message_length)
6098 {
6099
6100 /* Validate client_ptr */
6101 if (client_ptr == NX_NULL)
6102 {
6103 return(NX_PTR_ERROR);
6104 }
6105
6106 /* Topic and topic_length can be NULL if caller does not care the topic string. */
6107
6108 /* Validate message. Message_length can be NULL if caller does not care message length. */
6109 if ((message_buffer == NX_NULL) || (topic_buffer == NX_NULL))
6110 {
6111 return(NXD_MQTT_INVALID_PARAMETER);
6112 }
6113
6114
6115 return(_nxd_mqtt_client_message_get(client_ptr, topic_buffer, topic_buffer_size, actual_topic_length,
6116 message_buffer, message_buffer_size, actual_message_length));
6117 }
6118
6119
6120 /**************************************************************************/
6121 /* */
6122 /* FUNCTION RELEASE */
6123 /* */
6124 /* _nxde_mqtt_client_receive_notify_set PORTABLE C */
6125 /* 6.1 */
6126 /* AUTHOR */
6127 /* */
6128 /* Yuxin Zhou, Microsoft Corporation */
6129 /* */
6130 /* DESCRIPTION */
6131 /* */
6132 /* This function checks for errors in MQTT client publish notify call. */
6133 /* */
6134 /* */
6135 /* INPUT */
6136 /* */
6137 /* client_ptr Pointer to MQTT Client */
6138 /* receive_notify User-supplied callback */
6139 /* function, which is invoked */
6140 /* upon receiving a publish */
6141 /* message. */
6142 /* */
6143 /* OUTPUT */
6144 /* */
6145 /* status Completion status */
6146 /* */
6147 /* CALLS */
6148 /* */
6149 /* _nxd_mqtt_client_receive_notify_set */
6150 /* */
6151 /* */
6152 /* CALLED BY */
6153 /* */
6154 /* Application Code */
6155 /* */
6156 /* RELEASE HISTORY */
6157 /* */
6158 /* DATE NAME DESCRIPTION */
6159 /* */
6160 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6161 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6162 /* resulting in version 6.1 */
6163 /* */
6164 /**************************************************************************/
_nxde_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* receive_notify)(NXD_MQTT_CLIENT * client_ptr,UINT message_count))6165 UINT _nxde_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT *client_ptr,
6166 VOID (*receive_notify)(NXD_MQTT_CLIENT *client_ptr, UINT message_count))
6167 {
6168 /* Validate client_ptr */
6169 if (client_ptr == NX_NULL)
6170 {
6171 return(NX_PTR_ERROR);
6172 }
6173
6174 if (receive_notify == NX_NULL)
6175 {
6176 return(NX_PTR_ERROR);
6177 }
6178
6179 return(_nxd_mqtt_client_receive_notify_set(client_ptr, receive_notify));
6180 }
6181
6182
6183
6184 /**************************************************************************/
6185 /* */
6186 /* FUNCTION RELEASE */
6187 /* */
6188 /* _nxd_mqtt_client_disconnect_notify_set PORTABLE C */
6189 /* 6.1 */
6190 /* AUTHOR */
6191 /* */
6192 /* Yuxin Zhou, Microsoft Corporation */
6193 /* */
6194 /* DESCRIPTION */
6195 /* */
6196 /* This function sets the notify function for the disconnect event. */
6197 /* */
6198 /* INPUT */
6199 /* */
6200 /* client_ptr Pointer to MQTT Client */
6201 /* disconnect_notify The notify function to be */
6202 /* used when the client is */
6203 /* disconnected from the */
6204 /* server. */
6205 /* */
6206 /* OUTPUT */
6207 /* */
6208 /* status Completion status */
6209 /* */
6210 /* CALLS */
6211 /* */
6212 /* None */
6213 /* */
6214 /* CALLED BY */
6215 /* */
6216 /* Application Code */
6217 /* */
6218 /* RELEASE HISTORY */
6219 /* */
6220 /* DATE NAME DESCRIPTION */
6221 /* */
6222 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6223 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6224 /* resulting in version 6.1 */
6225 /* */
6226 /**************************************************************************/
_nxd_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* disconnect_notify)(NXD_MQTT_CLIENT *))6227 UINT _nxd_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT *client_ptr, VOID (*disconnect_notify)(NXD_MQTT_CLIENT *))
6228 {
6229
6230 client_ptr -> nxd_mqtt_disconnect_notify = disconnect_notify;
6231
6232 return(NXD_MQTT_SUCCESS);
6233 }
6234
6235
6236 /**************************************************************************/
6237 /* */
6238 /* FUNCTION RELEASE */
6239 /* */
6240 /* _nxde_mqtt_client_disconnect_notify_set PORTABLE C */
6241 /* 6.1 */
6242 /* AUTHOR */
6243 /* */
6244 /* Yuxin Zhou, Microsoft Corporation */
6245 /* */
6246 /* DESCRIPTION */
6247 /* */
6248 /* This function checks for errors in setting MQTT client disconnect */
6249 /* callback function. */
6250 /* */
6251 /* INPUT */
6252 /* */
6253 /* client_ptr Pointer to MQTT Client */
6254 /* disconnect_callback The callback function to be */
6255 /* used when an on-going */
6256 /* connection is disconnected. */
6257 /* */
6258 /* OUTPUT */
6259 /* */
6260 /* status Completion status */
6261 /* */
6262 /* CALLS */
6263 /* */
6264 /* _nxd_mqtt_client_disconnect_notify_set */
6265 /* */
6266 /* CALLED BY */
6267 /* */
6268 /* Application Code */
6269 /* */
6270 /* RELEASE HISTORY */
6271 /* */
6272 /* DATE NAME DESCRIPTION */
6273 /* */
6274 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6275 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6276 /* resulting in version 6.1 */
6277 /* */
6278 /**************************************************************************/
_nxde_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* disconnect_notify)(NXD_MQTT_CLIENT *))6279 UINT _nxde_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT *client_ptr, VOID (*disconnect_notify)(NXD_MQTT_CLIENT *))
6280 {
6281
6282 /* Validate client_ptr */
6283 if (client_ptr == NX_NULL)
6284 {
6285 return(NX_PTR_ERROR);
6286 }
6287 return(_nxd_mqtt_client_disconnect_notify_set(client_ptr, disconnect_notify));
6288 }
6289
6290
6291 #ifdef NXD_MQTT_CLOUD_ENABLE
6292 /**************************************************************************/
6293 /* */
6294 /* FUNCTION RELEASE */
6295 /* */
6296 /* _nxd_mqtt_client_cloud_create PORTABLE C */
6297 /* 6.1 */
6298 /* AUTHOR */
6299 /* */
6300 /* Yuxin Zhou, Microsoft Corporation */
6301 /* */
6302 /* DESCRIPTION */
6303 /* */
6304 /* This function creates mqtt client running on cloud helper thread. */
6305 /* */
6306 /* */
6307 /* INPUT */
6308 /* */
6309 /* client_ptr Pointer to MQTT Client */
6310 /* client_name Name string used in by the */
6311 /* client */
6312 /* client_id Client ID used by the client */
6313 /* client_id_length Length of Client ID, in bytes */
6314 /* ip_ptr Pointer to IP instance */
6315 /* pool_ptr Pointer to packet pool */
6316 /* cloud_ptr Pointer to Cloud instance */
6317 /* */
6318 /* OUTPUT */
6319 /* */
6320 /* status Completion status */
6321 /* */
6322 /* CALLS */
6323 /* */
6324 /* _nxd_mqtt_client_create Actual client create call */
6325 /* */
6326 /* CALLED BY */
6327 /* */
6328 /* Application Code */
6329 /* */
6330 /* RELEASE HISTORY */
6331 /* */
6332 /* DATE NAME DESCRIPTION */
6333 /* */
6334 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6335 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
6336 /* corrected mqtt client state,*/
6337 /* resulting in version 6.1 */
6338 /* */
6339 /**************************************************************************/
_nxd_mqtt_client_cloud_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,NX_CLOUD * cloud_ptr)6340 UINT _nxd_mqtt_client_cloud_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name, CHAR *client_id, UINT client_id_length,
6341 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr, NX_CLOUD *cloud_ptr)
6342 {
6343
6344 UINT status;
6345
6346
6347 /* Check for invalid input pointers. */
6348 if ((client_ptr == NX_NULL) || (ip_ptr == NX_NULL) || (ip_ptr -> nx_ip_id != NX_IP_ID) ||
6349 (pool_ptr == NX_NULL) || (cloud_ptr == NX_NULL) || (cloud_ptr -> nx_cloud_id != NX_CLOUD_ID))
6350 {
6351 return(NX_PTR_ERROR);
6352 }
6353
6354 /* Create MQTT client. */
6355 status = _nxd_mqtt_client_create_internal(client_ptr, client_name, client_id, client_id_length, ip_ptr,
6356 pool_ptr, NX_NULL, 0, 0);
6357
6358 /* Check status. */
6359 if (status)
6360 {
6361 return(status);
6362 }
6363
6364 /* Save the cloud pointer. */
6365 client_ptr -> nxd_mqtt_client_cloud_ptr = cloud_ptr;
6366
6367 /* Save the mutex pointer. */
6368 client_ptr -> nxd_mqtt_client_mutex_ptr = &(cloud_ptr -> nx_cloud_mutex);
6369
6370 /* Register MQTT on cloud helper. */
6371 status = nx_cloud_module_register(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module), client_name, NX_CLOUD_MODULE_MQTT_EVENT,
6372 _nxd_mqtt_client_event_process, client_ptr);
6373
6374 /* Determine if an error occurred. */
6375 if (status != NX_SUCCESS)
6376 {
6377
6378 /* Delete internal resource created in _nxd_mqtt_client_create_internal(). */
6379
6380 /* Delete socket. */
6381 nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
6382
6383 return(NXD_MQTT_INTERNAL_ERROR);
6384 }
6385
6386 /* Update state. */
6387 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
6388
6389 return(NXD_MQTT_SUCCESS);
6390 }
6391 #endif /* NXD_MQTT_CLOUD_ENABLE */
6392
6393 #ifdef NXD_MQTT_OVER_WEBSOCKET
6394 /**************************************************************************/
6395 /* */
6396 /* FUNCTION RELEASE */
6397 /* */
6398 /* _nxd_mqtt_client_websocket_connection_status_callback */
6399 /* PORTABLE C */
6400 /* 6.2.0 */
6401 /* AUTHOR */
6402 /* */
6403 /* Yuxin Zhou, Microsoft Corporation */
6404 /* */
6405 /* DESCRIPTION */
6406 /* */
6407 /* This function is the websocket connection status callback. */
6408 /* */
6409 /* INPUT */
6410 /* */
6411 /* websocket_client_ptr Pointer to websocket client */
6412 /* context Pointer to MQTT client */
6413 /* status Websocket connection status */
6414 /* */
6415 /* OUTPUT */
6416 /* */
6417 /* None */
6418 /* */
6419 /* CALLS */
6420 /* */
6421 /* _nxd_mqtt_client_connect_packet_send */
6422 /* _nxd_mqtt_client_connection_end */
6423 /* */
6424 /* CALLED BY */
6425 /* */
6426 /* _nxd_mqtt_client_event_process */
6427 /* */
6428 /* RELEASE HISTORY */
6429 /* */
6430 /* DATE NAME DESCRIPTION */
6431 /* */
6432 /* 10-31-2022 Yuxin Zhou Initial Version 6.2.0 */
6433 /* */
6434 /**************************************************************************/
_nxd_mqtt_client_websocket_connection_status_callback(NX_WEBSOCKET_CLIENT * websocket_client_ptr,VOID * context,UINT status)6435 VOID _nxd_mqtt_client_websocket_connection_status_callback(NX_WEBSOCKET_CLIENT *websocket_client_ptr, VOID *context, UINT status)
6436 {
6437 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)context;
6438
6439
6440 NX_PARAMETER_NOT_USED(websocket_client_ptr);
6441
6442 if (status == NX_SUCCESS)
6443 {
6444
6445 /* Start to send MQTT connect packet. */
6446 status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
6447 }
6448
6449 /* If an error occurs. */
6450 if (status)
6451 {
6452
6453 /* End connection. */
6454 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
6455
6456 /* Check callback function. */
6457 if (client_ptr -> nxd_mqtt_connect_notify)
6458 {
6459 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
6460 }
6461 }
6462 }
6463
6464 /**************************************************************************/
6465 /* */
6466 /* FUNCTION RELEASE */
6467 /* */
6468 /* _nxd_mqtt_client_websocket_set PORTABLE C */
6469 /* 6.2.0 */
6470 /* AUTHOR */
6471 /* */
6472 /* Yuxin Zhou, Microsoft Corporation */
6473 /* */
6474 /* DESCRIPTION */
6475 /* */
6476 /* This function sets the websocket. */
6477 /* */
6478 /* INPUT */
6479 /* */
6480 /* client_ptr Pointer to MQTT Client */
6481 /* host Host used by the client */
6482 /* host_length Length of host, in bytes */
6483 /* uri_path URI path used by the client */
6484 /* uri_path_length Length of uri path, in bytes */
6485 /* */
6486 /* OUTPUT */
6487 /* */
6488 /* status Completion status */
6489 /* */
6490 /* CALLS */
6491 /* */
6492 /* None */
6493 /* */
6494 /* CALLED BY */
6495 /* */
6496 /* Application Code */
6497 /* */
6498 /* RELEASE HISTORY */
6499 /* */
6500 /* DATE NAME DESCRIPTION */
6501 /* */
6502 /* 10-31-2022 Yuxin Zhou Initial Version 6.2.0 */
6503 /* */
6504 /**************************************************************************/
_nxd_mqtt_client_websocket_set(NXD_MQTT_CLIENT * client_ptr,UCHAR * host,UINT host_length,UCHAR * uri_path,UINT uri_path_length)6505 UINT _nxd_mqtt_client_websocket_set(NXD_MQTT_CLIENT *client_ptr, UCHAR *host, UINT host_length, UCHAR *uri_path, UINT uri_path_length)
6506 {
6507 UINT status;
6508
6509 /* Obtain the mutex. */
6510 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
6511
6512 if (status != TX_SUCCESS)
6513 {
6514 return(NXD_MQTT_MUTEX_FAILURE);
6515 }
6516
6517 /* Set the host info. */
6518 client_ptr -> nxd_mqtt_client_websocket_host = host;
6519 client_ptr -> nxd_mqtt_client_websocket_host_length = host_length;
6520 client_ptr -> nxd_mqtt_client_websocket_uri_path = uri_path;
6521 client_ptr -> nxd_mqtt_client_websocket_uri_path_length = uri_path_length;
6522
6523 /* Create WebSocket. */
6524 status = nx_websocket_client_create(&client_ptr -> nxd_mqtt_client_websocket, (UCHAR *)"",
6525 client_ptr -> nxd_mqtt_client_ip_ptr,
6526 client_ptr -> nxd_mqtt_client_packet_pool_ptr);
6527
6528 /* Check status. */
6529 if (status)
6530 {
6531 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
6532 return(status);
6533 }
6534
6535 client_ptr -> nxd_mqtt_client_use_websocket = NX_TRUE;
6536
6537 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
6538 return(NXD_MQTT_SUCCESS);
6539 }
6540
6541
6542 /**************************************************************************/
6543 /* */
6544 /* FUNCTION RELEASE */
6545 /* */
6546 /* _nxde_mqtt_client_websocket_set PORTABLE C */
6547 /* 6.2.0 */
6548 /* AUTHOR */
6549 /* */
6550 /* Yuxin Zhou, Microsoft Corporation */
6551 /* */
6552 /* DESCRIPTION */
6553 /* */
6554 /* This function checks for errors in setting MQTT client websocket. */
6555 /* */
6556 /* INPUT */
6557 /* */
6558 /* client_ptr Pointer to MQTT Client */
6559 /* host Host used by the client */
6560 /* host_length Length of host, in bytes */
6561 /* uri_path URI path used by the client */
6562 /* uri_path_length Length of uri path, in bytes */
6563 /* */
6564 /* OUTPUT */
6565 /* */
6566 /* status Completion status */
6567 /* */
6568 /* CALLS */
6569 /* */
6570 /* _nxd_mqtt_client_websocket_set */
6571 /* */
6572 /* CALLED BY */
6573 /* */
6574 /* Application Code */
6575 /* */
6576 /* RELEASE HISTORY */
6577 /* */
6578 /* DATE NAME DESCRIPTION */
6579 /* */
6580 /* 10-31-2022 Yuxin Zhou Initial Version 6.2.0 */
6581 /* */
6582 /**************************************************************************/
_nxde_mqtt_client_websocket_set(NXD_MQTT_CLIENT * client_ptr,UCHAR * host,UINT host_length,UCHAR * uri_path,UINT uri_path_length)6583 UINT _nxde_mqtt_client_websocket_set(NXD_MQTT_CLIENT *client_ptr, UCHAR *host, UINT host_length, UCHAR *uri_path, UINT uri_path_length)
6584 {
6585
6586 /* Validate the parameters. */
6587 if ((client_ptr == NX_NULL) || (host == NX_NULL) || (host_length == 0) ||
6588 (uri_path == NX_NULL) || (uri_path_length == 0))
6589 {
6590 return(NX_PTR_ERROR);
6591 }
6592
6593 return(_nxd_mqtt_client_websocket_set(client_ptr, host, host_length, uri_path, uri_path_length));
6594 }
6595 #endif /* NXD_MQTT_OVER_WEBSOCKET */
6596