1 /**************************************************************************/
2 /* */
3 /* Copyright (c) Microsoft Corporation. All rights reserved. */
4 /* */
5 /* This software is licensed under the Microsoft Software License */
6 /* Terms for Microsoft Azure RTOS. Full text of the license can be */
7 /* found in the LICENSE file at https://aka.ms/AzureRTOS_EULA */
8 /* and in the root directory of this software. */
9 /* */
10 /**************************************************************************/
11
12
13 /**************************************************************************/
14 /**************************************************************************/
15 /** */
16 /** NetX Component */
17 /** */
18 /** MQTT (MQTT) */
19 /** */
20 /**************************************************************************/
21 /**************************************************************************/
22
23 #define NXD_MQTT_CLIENT_SOURCE_CODE
24
25
26 /* Force error checking to be disabled in this module */
27
28 #ifndef NX_DISABLE_ERROR_CHECKING
29 #define NX_DISABLE_ERROR_CHECKING
30 #endif
31
32
33 /* Include necessary system files. */
34
35 #include "tx_api.h"
36 #include "nx_api.h"
37 #include "nx_ip.h"
38 #include "nxd_mqtt_client.h"
39
40 /* Bring in externals for caller checking code. */
41
42 #define MQTT_ALL_EVENTS ((ULONG)0xFFFFFFFF)
43 #define MQTT_TIMEOUT_EVENT ((ULONG)0x00000001)
44 #define MQTT_PACKET_RECEIVE_EVENT ((ULONG)0x00000002)
45 #define MQTT_START_EVENT ((ULONG)0x00000004)
46 #define MQTT_DELETE_EVENT ((ULONG)0x00000008)
47 #define MQTT_PING_TIMEOUT_EVENT ((ULONG)0x00000010)
48 #define MQTT_NETWORK_DISCONNECT_EVENT ((ULONG)0x00000020)
49 #define MQTT_TCP_ESTABLISH_EVENT ((ULONG)0x00000040)
50
51 static UINT _nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
52 CHAR *client_id, UINT client_id_length,
53 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
54 VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority);
55 static UINT _nxd_mqtt_packet_allocate(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, ULONG wait_option);
56 static UINT _nxd_mqtt_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UINT wait_option);
57 static UINT _nxd_mqtt_packet_receive(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, UINT wait_option);
58 static UINT _nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET **new_packet_ptr,
59 USHORT packet_id, UCHAR set_duplicate_flag, UINT wait_option);
60 static VOID _nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr);
61 static VOID _nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr);
62 static UINT _nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option);
63 static UINT _nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option);
64
65 /**************************************************************************/
66 /* */
67 /* FUNCTION RELEASE */
68 /* */
69 /* _nxd_mqtt_client_set_fixed_header PORTABLE C */
70 /* 6.1 */
71 /* AUTHOR */
72 /* */
73 /* Yuxin Zhou, Microsoft Corporation */
74 /* */
75 /* DESCRIPTION */
76 /* */
77 /* This function writes the fixed header filed in the outgoing */
78 /* MQTT packet. */
79 /* */
80 /* This function follows the logic outlined in 2.2 in MQTT */
81 /* specification. */
82 /* */
83 /* INPUT */
84 /* */
85 /* client_ptr Pointer to MQTT Client */
86 /* packet_ptr Outgoing MQTT packet */
87 /* control_header Control byte */
88 /* length Remaining length in bytes */
89 /* wait_option Wait option */
90 /* */
91 /* OUTPUT */
92 /* */
93 /* status */
94 /* */
95 /* CALLS */
96 /* */
97 /* nx_packet_data_append Append packet data */
98 /* */
99 /* CALLED BY */
100 /* */
101 /* _nxd_mqtt_client_sub_unsub */
102 /* _nxd_mqtt_client_connect */
103 /* _nxd_mqtt_client_publish */
104 /* */
105 /* RELEASE HISTORY */
106 /* */
107 /* DATE NAME DESCRIPTION */
108 /* */
109 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
110 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
111 /* resulting in version 6.1 */
112 /* */
113 /**************************************************************************/
_nxd_mqtt_client_set_fixed_header(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,UCHAR control_header,UINT length,UINT wait_option)114 UINT _nxd_mqtt_client_set_fixed_header(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UCHAR control_header, UINT length, UINT wait_option)
115 {
116 UCHAR fixed_header[5];
117 UCHAR *byte = fixed_header;
118 UINT count = 0;
119 UINT ret;
120
121 *byte = control_header;
122 byte++;
123
124 do
125 {
126 if (length & 0xFFFFFF80)
127 {
128 *(byte + count) = (UCHAR)((length & 0x7F) | 0x80);
129 }
130 else
131 {
132 *(byte + count) = length & 0x7F;
133 }
134 length = length >> 7;
135
136 count++;
137 } while (length != 0);
138
139 ret = nx_packet_data_append(packet_ptr, fixed_header, count + 1,
140 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
141
142 return(ret);
143 }
144
145
146
147 /**************************************************************************/
148 /* */
149 /* FUNCTION RELEASE */
150 /* */
151 /* _nxd_mqtt_read_remaining_length PORTABLE C */
152 /* 6.1 */
153 /* AUTHOR */
154 /* */
155 /* Yuxin Zhou, Microsoft Corporation */
156 /* */
157 /* DESCRIPTION */
158 /* */
159 /* This function parses the remaining length filed in the incoming */
160 /* MQTT packet. */
161 /* */
162 /* This function follows the logic outlined in 2.2.3 in MQTT */
163 /* specification */
164 /* */
165 /* INPUT */
166 /* */
167 /* packet_ptr Incoming MQTT packet. */
168 /* remaining_length remaining length in bytes, */
169 /* this is the return value. */
170 /* offset Pointer to offset of the */
171 /* remaining data */
172 /* */
173 /* OUTPUT */
174 /* */
175 /* status */
176 /* */
177 /* CALLS */
178 /* */
179 /* None */
180 /* */
181 /* CALLED BY */
182 /* */
183 /* _nxd_mqtt_process_publish */
184 /* _nxd_mqtt_client_message_get */
185 /* _nxd_mqtt_process_sub_unsub_ack */
186 /* */
187 /* RELEASE HISTORY */
188 /* */
189 /* DATE NAME DESCRIPTION */
190 /* */
191 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
192 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
193 /* resulting in version 6.1 */
194 /* */
195 /**************************************************************************/
_nxd_mqtt_read_remaining_length(NX_PACKET * packet_ptr,UINT * remaining_length,ULONG * offset_ptr)196 UINT _nxd_mqtt_read_remaining_length(NX_PACKET *packet_ptr, UINT *remaining_length, ULONG *offset_ptr)
197 {
198 UINT value = 0;
199 UCHAR bytes[4] = {0};
200 UINT multiplier = 1;
201 UINT byte_count = 0;
202 ULONG bytes_copied;
203
204 if (nx_packet_data_extract_offset(packet_ptr, 1, &bytes, sizeof(bytes), &bytes_copied))
205 {
206
207 /* Packet is incomplete. */
208 return(NXD_MQTT_PARTIAL_PACKET);
209 }
210
211 do
212 {
213 if (byte_count >= bytes_copied)
214 {
215 if (byte_count == 4)
216 {
217 return(NXD_MQTT_INTERNAL_ERROR);
218 }
219 else
220 {
221
222 /* Packet is incomplete. */
223 return(NXD_MQTT_PARTIAL_PACKET);
224 }
225 }
226 value += (((bytes[byte_count]) & 0x7F) * multiplier);
227 multiplier = multiplier << 7;
228 } while ((bytes[byte_count++]) & 0x80);
229
230 if ((1 + byte_count + value) > packet_ptr -> nx_packet_length)
231 {
232
233 /* Packet is incomplete. */
234 /* Remaining length is larger than packet size. */
235 return(NXD_MQTT_PARTIAL_PACKET);
236 }
237
238 *remaining_length = value;
239 *offset_ptr = (1 + byte_count);
240
241 return(NXD_MQTT_SUCCESS);
242 }
243
244
245
246 /**************************************************************************/
247 /* */
248 /* FUNCTION RELEASE */
249 /* */
250 /* _nxd_mqtt_client_sub_unsub PORTABLE C */
251 /* 6.2.0 */
252 /* AUTHOR */
253 /* */
254 /* Yuxin Zhou, Microsoft Corporation */
255 /* */
256 /* DESCRIPTION */
257 /* */
258 /* This function sends a subscribe or unsubscribe message to the */
259 /* broker. */
260 /* */
261 /* */
262 /* INPUT */
263 /* */
264 /* client_ptr Pointer to MQTT Client */
265 /* op Subscribe or Unsubscribe */
266 /* topic_name Pointer to the topic string */
267 /* to subscribe to */
268 /* topic_name_length Length of the topic string */
269 /* in bytes */
270 /* packet_id_ptr Pointer to packet id that */
271 /* will be filled with */
272 /* assigned packet id for */
273 /* sub/unsub message */
274 /* QoS Expected QoS level */
275 /* */
276 /* OUTPUT */
277 /* */
278 /* status Completion status */
279 /* */
280 /* CALLS */
281 /* */
282 /* tx_mutex_get */
283 /* _nxd_mqtt_packet_allocate */
284 /* _nxd_mqtt_client_set_fixed_header */
285 /* _nxd_mqtt_client_append_message */
286 /* tx_mutex_put */
287 /* _nxd_mqtt_packet_send */
288 /* nx_packet_release */
289 /* _nxd_mqtt_copy_transmit_packet */
290 /* */
291 /* CALLED BY */
292 /* */
293 /* _nxd_mqtt_client_subscribe */
294 /* _nxd_mqtt_client_unsubscribe */
295 /* */
296 /* RELEASE HISTORY */
297 /* */
298 /* DATE NAME DESCRIPTION */
299 /* */
300 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
301 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
302 /* resulting in version 6.1 */
303 /* 11-09-2020 Yuxin Zhou Modified comment(s), and */
304 /* added packet id parameter, */
305 /* resulting in version 6.1.2 */
306 /* 07-29-2022 Spencer McDonough Modified comment(s), */
307 /* improved internal logic, */
308 /* resulting in version 6.1.12 */
309 /* 10-31-2022 Bo Chen Modified comment(s), improved */
310 /* the logic of sending packet,*/
311 /* resulting in version 6.2.0 */
312 /* */
313 /**************************************************************************/
_nxd_mqtt_client_sub_unsub(NXD_MQTT_CLIENT * client_ptr,UINT op,CHAR * topic_name,UINT topic_name_length,USHORT * packet_id_ptr,UINT QoS)314 UINT _nxd_mqtt_client_sub_unsub(NXD_MQTT_CLIENT *client_ptr, UINT op,
315 CHAR *topic_name, UINT topic_name_length,
316 USHORT *packet_id_ptr, UINT QoS)
317 {
318
319
320 NX_PACKET *packet_ptr;
321 NX_PACKET *transmit_packet_ptr;
322 UINT status;
323 UINT length = 0;
324 UINT ret = NXD_MQTT_SUCCESS;
325 UCHAR temp_data[2];
326
327 /* Obtain the mutex. */
328 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
329
330 if (status != TX_SUCCESS)
331 {
332 return(NXD_MQTT_MUTEX_FAILURE);
333 }
334
335 /* Do nothing if the client is already connected. */
336 if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTED)
337 {
338 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
339 return(NXD_MQTT_NOT_CONNECTED);
340 }
341
342 status = _nxd_mqtt_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
343 if (status)
344 {
345 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
346 return(status);
347 }
348
349 /* Compute the remaining length field, starting with 2 bytes of packet ID */
350 length = 2;
351
352 /* Count the topic. */
353 length += (2 + topic_name_length);
354
355 if (op == ((MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02))
356 {
357 /* Count one byte for QoS */
358 length++;
359 }
360
361 /* Write out the control header and remaining length field. */
362 ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, (UCHAR )op, length, NX_WAIT_FOREVER);
363
364 if (ret)
365 {
366
367 /* Release the mutex. */
368 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
369
370 /* Release the packet. */
371 nx_packet_release(packet_ptr);
372
373 return(NXD_MQTT_PACKET_POOL_FAILURE);
374 }
375
376 temp_data[0] = (UCHAR)(client_ptr -> nxd_mqtt_client_packet_identifier >> 8);
377 temp_data[1] = (client_ptr -> nxd_mqtt_client_packet_identifier & 0xFF);
378
379 if (packet_id_ptr)
380 {
381 *packet_id_ptr = (USHORT)(client_ptr -> nxd_mqtt_client_packet_identifier & 0xFFFF);
382 }
383
384 /* Append packet ID. */
385 ret = nx_packet_data_append(packet_ptr, temp_data, 2, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_WAIT_FOREVER);
386
387 if (ret)
388 {
389
390 /* Release the mutex. */
391 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
392
393 /* Release the packet. */
394 nx_packet_release(packet_ptr);
395
396 return(NXD_MQTT_PACKET_POOL_FAILURE);
397 }
398
399 /* Append topic name */
400 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, topic_name, topic_name_length, NX_WAIT_FOREVER);
401
402 if (ret)
403 {
404
405 /* Release the mutex. */
406 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
407
408 /* Release the packet. */
409 nx_packet_release(packet_ptr);
410
411 return(NXD_MQTT_PACKET_POOL_FAILURE);
412 }
413
414 if (op == ((MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02))
415 {
416 /* Fill in QoS value. */
417 temp_data[0] = QoS & 0x3;
418
419 ret = nx_packet_data_append(packet_ptr, temp_data, 1, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_WAIT_FOREVER);
420
421 if (ret)
422 {
423
424 /* Release the mutex. */
425 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
426
427 /* Release the packet. */
428 nx_packet_release(packet_ptr);
429
430 return(NXD_MQTT_PACKET_POOL_FAILURE);
431 }
432 }
433
434 /* Copy packet for retransmission. */
435 if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
436 (USHORT)(client_ptr -> nxd_mqtt_client_packet_identifier),
437 NX_FALSE, NX_WAIT_FOREVER))
438 {
439 /* Release the mutex. */
440 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
441
442 /* Release the packet. */
443 nx_packet_release(packet_ptr);
444
445 return(NXD_MQTT_PACKET_POOL_FAILURE);
446 }
447
448 if (client_ptr -> message_transmit_queue_head == NX_NULL)
449 {
450 client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
451 }
452 else
453 {
454 client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
455 }
456 client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
457
458 client_ptr -> nxd_mqtt_client_packet_identifier = (client_ptr -> nxd_mqtt_client_packet_identifier + 1) & 0xFFFF;
459
460 /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
461 if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
462 client_ptr -> nxd_mqtt_client_packet_identifier = 1;
463
464 /* Update the timeout value. */
465 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
466
467 /* Release the mutex. */
468 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
469
470 /* Ready to send the connect message to the server. */
471 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
472
473 if (status)
474 {
475 /* Release the packet. */
476 nx_packet_release(packet_ptr);
477
478 ret = NXD_MQTT_COMMUNICATION_FAILURE;
479 }
480
481 return(ret);
482 }
483
484
485 /**************************************************************************/
486 /* */
487 /* FUNCTION RELEASE */
488 /* */
489 /* _nxd_mqtt_packet_allocate PORTABLE C */
490 /* 6.2.0 */
491 /* AUTHOR */
492 /* */
493 /* Yuxin Zhou, Microsoft Corporation */
494 /* */
495 /* DESCRIPTION */
496 /* */
497 /* This function allocates a packet for transmitting MQTT message. */
498 /* Special care has to be taken for accommodating IPv4/IPv6 header, */
499 /* and possibly TLS record if TLS is being used. On failure, the */
500 /* TLS mutex is released and the caller can simply return. */
501 /* */
502 /* */
503 /* INPUT */
504 /* */
505 /* client_ptr Pointer to MQTT Client */
506 /* packet_ptr Allocated packet to be */
507 /* returned to the caller. */
508 /* */
509 /* OUTPUT */
510 /* */
511 /* status Completion status */
512 /* */
513 /* CALLS */
514 /* */
515 /* nx_secure_tls_packet_allocate Allocate packet for MQTT */
516 /* over TLS socket */
517 /* nx_packet_allocate Allocate a packet for MQTT */
518 /* over regular TCP socket */
519 /* tx_mutex_put Release a mutex */
520 /* */
521 /* CALLED BY */
522 /* */
523 /* _nxd_mqtt_process_publish */
524 /* _nxd_mqtt_client_connect */
525 /* _nxd_mqtt_client_publish */
526 /* _nxd_mqtt_client_subscribe */
527 /* _nxd_mqtt_client_unsubscribe */
528 /* _nxd_mqtt_client_send_simple_message */
529 /* */
530 /* RELEASE HISTORY */
531 /* */
532 /* DATE NAME DESCRIPTION */
533 /* */
534 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
535 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
536 /* resulting in version 6.1 */
537 /* 07-29-2022 Spencer McDonough Modified comment(s), and */
538 /* improved internal logic, */
539 /* resulting in version 6.1.12 */
540 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
541 /* mqtt over websocket, */
542 /* resulting in version 6.2.0 */
543 /* */
544 /**************************************************************************/
_nxd_mqtt_packet_allocate(NXD_MQTT_CLIENT * client_ptr,NX_PACKET ** packet_ptr,ULONG wait_option)545 static UINT _nxd_mqtt_packet_allocate(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, ULONG wait_option)
546 {
547 UINT status = NXD_MQTT_SUCCESS;
548
549 #ifdef NXD_MQTT_OVER_WEBSOCKET
550 if (client_ptr -> nxd_mqtt_client_use_websocket)
551 {
552
553 /* Use WebSocket packet allocate since it is able to count for WebSocket-related header space */
554 status = nx_websocket_client_packet_allocate(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, wait_option);
555 }
556 else
557 #endif /* NXD_MQTT_OVER_WEBSOCKET */
558 #ifdef NX_SECURE_ENABLE
559 if (client_ptr -> nxd_mqtt_client_use_tls)
560 {
561 /* Use TLS packet allocate. The TLS packet allocate is able to count for
562 TLS-related header space including crypto initial vector area. */
563 status = nx_secure_tls_packet_allocate(&client_ptr -> nxd_mqtt_tls_session, client_ptr -> nxd_mqtt_client_packet_pool_ptr,
564 packet_ptr, wait_option);
565 }
566 /* Allocate a packet */
567 else
568 {
569 #endif /* NX_SECURE_ENABLE */
570 if (client_ptr -> nxd_mqtt_client_socket.nx_tcp_socket_connect_ip.nxd_ip_version == NX_IP_VERSION_V4)
571 {
572 status = nx_packet_allocate(client_ptr -> nxd_mqtt_client_packet_pool_ptr, packet_ptr, NX_IPv4_TCP_PACKET,
573 wait_option);
574 }
575 else
576 {
577 status = nx_packet_allocate(client_ptr -> nxd_mqtt_client_packet_pool_ptr, packet_ptr, NX_IPv6_TCP_PACKET,
578 wait_option);
579 }
580 #ifdef NX_SECURE_ENABLE
581 }
582 #endif /* NX_SECURE_ENABLE */
583
584 if (status != NX_SUCCESS)
585 {
586 return(NXD_MQTT_PACKET_POOL_FAILURE);
587 }
588
589 return(NXD_MQTT_SUCCESS);
590 }
591
592 /**************************************************************************/
593 /* */
594 /* FUNCTION RELEASE */
595 /* */
596 /* _nxd_mqtt_packet_send PORTABLE C */
597 /* 6.2.0 */
598 /* AUTHOR */
599 /* */
600 /* Yuxin Zhou, Microsoft Corporation */
601 /* */
602 /* DESCRIPTION */
603 /* */
604 /* This function sends out a packet. */
605 /* */
606 /* */
607 /* INPUT */
608 /* */
609 /* client_ptr Pointer to MQTT Client */
610 /* packet_ptr Pointer to packet */
611 /* wait_option Timeout value */
612 /* */
613 /* OUTPUT */
614 /* */
615 /* status Completion status */
616 /* */
617 /* CALLS */
618 /* */
619 /* nx_websocket_client_send */
620 /* nx_secure_tls_session_send */
621 /* nx_tcp_socket_send */
622 /* */
623 /* CALLED BY */
624 /* */
625 /* */
626 /* RELEASE HISTORY */
627 /* */
628 /* DATE NAME DESCRIPTION */
629 /* */
630 /* 10-31-2022 Bo Chen Initial Version 6.2.0 */
631 /* */
632 /**************************************************************************/
_nxd_mqtt_packet_send(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,UINT wait_option)633 static UINT _nxd_mqtt_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UINT wait_option)
634 {
635 UINT status = NXD_MQTT_SUCCESS;
636
637 #ifdef NXD_MQTT_OVER_WEBSOCKET
638 if (client_ptr -> nxd_mqtt_client_use_websocket)
639 {
640 status = nx_websocket_client_send(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, NX_WEBSOCKET_OPCODE_BINARY_FRAME, NX_TRUE, wait_option);
641 }
642 else
643 #endif /* NXD_MQTT_OVER_WEBSOCKET */
644
645 #ifdef NX_SECURE_ENABLE
646 if (client_ptr -> nxd_mqtt_client_use_tls)
647 {
648 status = nx_secure_tls_session_send(&(client_ptr -> nxd_mqtt_tls_session), packet_ptr, wait_option);
649 }
650 else
651 #endif /* NX_SECURE_ENABLE */
652 {
653 status = nx_tcp_socket_send(&client_ptr -> nxd_mqtt_client_socket, packet_ptr, wait_option);
654 }
655
656 return(status);
657 }
658
659 /**************************************************************************/
660 /* */
661 /* FUNCTION RELEASE */
662 /* */
663 /* _nxd_mqtt_packet_receive PORTABLE C */
664 /* 6.2.0 */
665 /* AUTHOR */
666 /* */
667 /* Yuxin Zhou, Microsoft Corporation */
668 /* */
669 /* DESCRIPTION */
670 /* */
671 /* This function receives a packet. */
672 /* */
673 /* */
674 /* INPUT */
675 /* */
676 /* client_ptr Pointer to MQTT Client */
677 /* packet_ptr Pointer to packet */
678 /* wait_option Timeout value */
679 /* */
680 /* OUTPUT */
681 /* */
682 /* status Completion status */
683 /* */
684 /* CALLS */
685 /* */
686 /* nx_websocket_client_receive */
687 /* nx_secure_tls_session_receive */
688 /* nx_tcp_socket_receive */
689 /* */
690 /* CALLED BY */
691 /* */
692 /* */
693 /* RELEASE HISTORY */
694 /* */
695 /* DATE NAME DESCRIPTION */
696 /* */
697 /* 10-31-2022 Bo Chen Initial Version 6.2.0 */
698 /* */
699 /**************************************************************************/
_nxd_mqtt_packet_receive(NXD_MQTT_CLIENT * client_ptr,NX_PACKET ** packet_ptr,UINT wait_option)700 static UINT _nxd_mqtt_packet_receive(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, UINT wait_option)
701 {
702 UINT status = NXD_MQTT_SUCCESS;
703 #ifdef NXD_MQTT_OVER_WEBSOCKET
704 UINT op_code = 0;
705 #endif /* NXD_MQTT_OVER_WEBSOCKET*/
706
707 #ifdef NXD_MQTT_OVER_WEBSOCKET
708 if (client_ptr -> nxd_mqtt_client_use_websocket)
709 {
710 status = nx_websocket_client_receive(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, &op_code, wait_option);
711 if ((status == NX_SUCCESS) && (op_code != NX_WEBSOCKET_OPCODE_BINARY_FRAME))
712 {
713 return(NX_INVALID_PACKET);
714 }
715 }
716 else
717 #endif /* NXD_MQTT_OVER_WEBSOCKET */
718
719 #ifdef NX_SECURE_ENABLE
720 if (client_ptr -> nxd_mqtt_client_use_tls)
721 {
722 status = nx_secure_tls_session_receive(&(client_ptr -> nxd_mqtt_tls_session), packet_ptr, wait_option);
723 }
724 else
725 #endif /* NX_SECURE_ENABLE */
726 {
727 status = nx_tcp_socket_receive(&(client_ptr -> nxd_mqtt_client_socket), packet_ptr, wait_option);
728 }
729
730 return(status);
731 }
732
733 /**************************************************************************/
734 /* */
735 /* FUNCTION RELEASE */
736 /* */
737 /* _nxd_mqtt_tcp_establish_notify PORTABLE C */
738 /* 6.1 */
739 /* AUTHOR */
740 /* */
741 /* Yuxin Zhou, Microsoft Corporation */
742 /* */
743 /* DESCRIPTION */
744 /* */
745 /* This internal function is installed as TCP connection establish */
746 /* callback function. */
747 /* */
748 /* INPUT */
749 /* */
750 /* socket_ptr The socket that receives */
751 /* the message. */
752 /* */
753 /* OUTPUT */
754 /* */
755 /* None */
756 /* */
757 /* CALLS */
758 /* */
759 /* tx_event_flags_set */
760 /* */
761 /* CALLED BY */
762 /* */
763 /* _nxd_mqtt_thread_entry */
764 /* _nxd_mqtt_client_event_process */
765 /* */
766 /* RELEASE HISTORY */
767 /* */
768 /* DATE NAME DESCRIPTION */
769 /* */
770 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
771 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
772 /* resulting in version 6.1 */
773 /* */
774 /**************************************************************************/
_nxd_mqtt_tcp_establish_notify(NX_TCP_SOCKET * socket_ptr)775 static VOID _nxd_mqtt_tcp_establish_notify(NX_TCP_SOCKET *socket_ptr)
776 {
777 NXD_MQTT_CLIENT *client_ptr;
778
779 client_ptr = (NXD_MQTT_CLIENT *)socket_ptr -> nx_tcp_socket_reserved_ptr;
780
781 if (&(client_ptr -> nxd_mqtt_client_socket) == socket_ptr)
782 {
783
784 /* Set the event flag. */
785 #ifndef NXD_MQTT_CLOUD_ENABLE
786 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_TCP_ESTABLISH_EVENT, TX_OR);
787 #else
788 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_TCP_ESTABLISH_EVENT);
789 #endif /* NXD_MQTT_CLOUD_ENABLE */
790 }
791 }
792
793 /**************************************************************************/
794 /* */
795 /* FUNCTION RELEASE */
796 /* */
797 /* _nxd_mqtt_receive_callback PORTABLE C */
798 /* 6.1 */
799 /* AUTHOR */
800 /* */
801 /* Yuxin Zhou, Microsoft Corporation */
802 /* */
803 /* DESCRIPTION */
804 /* */
805 /* This internal function is installed as TCP receive callback */
806 /* function. On receiving a TCP message, the callback function */
807 /* sets an event flag to trigger MQTT client to process received */
808 /* message. */
809 /* */
810 /* */
811 /* INPUT */
812 /* */
813 /* socket_ptr The socket that receives */
814 /* the message. */
815 /* */
816 /* OUTPUT */
817 /* */
818 /* None */
819 /* */
820 /* CALLS */
821 /* */
822 /* tx_event_flags_set */
823 /* */
824 /* CALLED BY */
825 /* */
826 /* _nxd_mqtt_thread_entry */
827 /* _nxd_mqtt_client_event_process */
828 /* */
829 /* RELEASE HISTORY */
830 /* */
831 /* DATE NAME DESCRIPTION */
832 /* */
833 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
834 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
835 /* resulting in version 6.1 */
836 /* */
837 /**************************************************************************/
_nxd_mqtt_receive_callback(NX_TCP_SOCKET * socket_ptr)838 static VOID _nxd_mqtt_receive_callback(NX_TCP_SOCKET *socket_ptr)
839 {
840 NXD_MQTT_CLIENT *client_ptr;
841
842 client_ptr = (NXD_MQTT_CLIENT *)socket_ptr -> nx_tcp_socket_reserved_ptr;
843
844 if (&(client_ptr -> nxd_mqtt_client_socket) == socket_ptr)
845 {
846 /* Set the event flag. */
847 #ifndef NXD_MQTT_CLOUD_ENABLE
848 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_PACKET_RECEIVE_EVENT, TX_OR);
849 #else
850 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PACKET_RECEIVE_EVENT);
851 #endif /* NXD_MQTT_CLOUD_ENABLE */
852 }
853 }
854
855 /**************************************************************************/
856 /* */
857 /* FUNCTION RELEASE */
858 /* */
859 /* _nxd_mqtt_copy_transmit_packet PORTABLE C */
860 /* 6.1.8 */
861 /* AUTHOR */
862 /* */
863 /* Yuxin Zhou, Microsoft Corporation */
864 /* */
865 /* DESCRIPTION */
866 /* */
867 /* This internal function saves a transmit packet. */
868 /* A transmit packet is allocated to store QoS 1 and 2 messages. */
869 /* Upon a message being properly acknowledged, the packet will */
870 /* be released. */
871 /* */
872 /* */
873 /* INPUT */
874 /* */
875 /* client_ptr Pointer to MQTT Client */
876 /* packet_ptr Pointer to the MQTT message */
877 /* packet to be saved */
878 /* new_packet_ptr Return a copied packet */
879 /* packet_id Current packet ID */
880 /* set_duplicate_flag Set duplicate flag for fixed */
881 /* header or not */
882 /* wait_option Timeout value */
883 /* */
884 /* OUTPUT */
885 /* */
886 /* status */
887 /* */
888 /* CALLS */
889 /* */
890 /* nx_packet_copy */
891 /* */
892 /* CALLED BY */
893 /* */
894 /* _nxd_mqtt_client_sub_unsub */
895 /* _nxd_mqtt_process_publish */
896 /* */
897 /* RELEASE HISTORY */
898 /* */
899 /* DATE NAME DESCRIPTION */
900 /* */
901 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
902 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
903 /* resulting in version 6.1 */
904 /* 08-02-2021 Yuxin Zhou Modified comment(s), */
905 /* supported maximum transmit */
906 /* queue depth, */
907 /* resulting in version 6.1.8 */
908 /* */
909 /**************************************************************************/
_nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET ** new_packet_ptr,USHORT packet_id,UCHAR set_duplicate_flag,UINT wait_option)910 static UINT _nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET **new_packet_ptr,
911 USHORT packet_id, UCHAR set_duplicate_flag, UINT wait_option)
912 {
913 UINT status;
914
915 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
916 if (client_ptr -> message_transmit_queue_depth >= NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH)
917 {
918
919 /* Hit the transmit queue depeth. No more packets should be queued. */
920 return(NX_TX_QUEUE_DEPTH);
921 }
922 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
923
924 /* Copy current packet. */
925 status = nx_packet_copy(packet_ptr, new_packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
926 if (status)
927 {
928
929 /* No available packet to be stored. */
930 return(NXD_MQTT_PACKET_POOL_FAILURE);
931 }
932
933 /* Save packet_id at the beginning of packet. */
934 *((USHORT *)(*new_packet_ptr) -> nx_packet_data_start) = packet_id;
935
936 if (set_duplicate_flag)
937 {
938
939 /* Update duplicate flag in fixed header. */
940 *((*new_packet_ptr) -> nx_packet_prepend_ptr) = (*((*new_packet_ptr) -> nx_packet_prepend_ptr)) | MQTT_PUBLISH_DUP_FLAG;
941 }
942
943 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
944 /* Increase the transmit queue depth. */
945 client_ptr -> message_transmit_queue_depth++;
946 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
947
948 return(NXD_MQTT_SUCCESS);
949 }
950
951 /**************************************************************************/
952 /* */
953 /* FUNCTION RELEASE */
954 /* */
955 /* _nxd_mqtt_release_transmit_packet PORTABLE C */
956 /* 6.1.8 */
957 /* AUTHOR */
958 /* */
959 /* Yuxin Zhou, Microsoft Corporation */
960 /* */
961 /* DESCRIPTION */
962 /* */
963 /* This internal function releases a transmit packet. */
964 /* A transmit packet is allocated to store QoS 1 and 2 messages. */
965 /* Upon a message being properly acknowledged, the packet can */
966 /* be released. */
967 /* */
968 /* */
969 /* INPUT */
970 /* */
971 /* client_ptr Pointer to MQTT Client */
972 /* packet_ptr Pointer to the MQTT message */
973 /* packet to be removed */
974 /* previous_packet_ptr Pointer to the previous packet*/
975 /* or NULL if none exists */
976 /* */
977 /* OUTPUT */
978 /* */
979 /* None */
980 /* */
981 /* CALLS */
982 /* */
983 /* None */
984 /* */
985 /* CALLED BY */
986 /* */
987 /* _nxd_mqtt_thread_entry */
988 /* _nxd_mqtt_client_event_process */
989 /* */
990 /* RELEASE HISTORY */
991 /* */
992 /* DATE NAME DESCRIPTION */
993 /* */
994 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
995 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
996 /* resulting in version 6.1 */
997 /* 08-02-2021 Yuxin Zhou Modified comment(s), */
998 /* supported maximum transmit */
999 /* queue depth, */
1000 /* resulting in version 6.1.8 */
1001 /* */
1002 /**************************************************************************/
_nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET * previous_packet_ptr)1003 static VOID _nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr)
1004 {
1005
1006 if (previous_packet_ptr)
1007 {
1008 previous_packet_ptr -> nx_packet_queue_next = packet_ptr -> nx_packet_queue_next;
1009 }
1010 else
1011 {
1012 client_ptr -> message_transmit_queue_head = packet_ptr -> nx_packet_queue_next;
1013 }
1014
1015 if (packet_ptr == client_ptr -> message_transmit_queue_tail)
1016 {
1017 client_ptr -> message_transmit_queue_tail = previous_packet_ptr;
1018 }
1019 nx_packet_release(packet_ptr);
1020
1021 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
1022 client_ptr -> message_transmit_queue_depth--;
1023 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
1024 }
1025
1026 /**************************************************************************/
1027 /* */
1028 /* FUNCTION RELEASE */
1029 /* */
1030 /* _nxd_mqtt_release_receive_packet PORTABLE C */
1031 /* 6.1 */
1032 /* AUTHOR */
1033 /* */
1034 /* Yuxin Zhou, Microsoft Corporation */
1035 /* */
1036 /* DESCRIPTION */
1037 /* */
1038 /* This internal function releases a receive packet. */
1039 /* A receive packet is allocated to store QoS 1 and 2 messages. */
1040 /* Upon a message being properly acknowledged, the packet can */
1041 /* be released. */
1042 /* */
1043 /* */
1044 /* INPUT */
1045 /* */
1046 /* client_ptr Pointer to MQTT Client */
1047 /* packet_ptr Pointer to the MQTT message */
1048 /* packet to be removed */
1049 /* previous_packet_ptr Pointer to the previous packet*/
1050 /* or NULL if none exists */
1051 /* */
1052 /* OUTPUT */
1053 /* */
1054 /* None */
1055 /* */
1056 /* CALLS */
1057 /* */
1058 /* None */
1059 /* */
1060 /* CALLED BY */
1061 /* */
1062 /* _nxd_mqtt_thread_entry */
1063 /* _nxd_mqtt_client_event_process */
1064 /* */
1065 /* RELEASE HISTORY */
1066 /* */
1067 /* DATE NAME DESCRIPTION */
1068 /* */
1069 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1070 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1071 /* resulting in version 6.1 */
1072 /* */
1073 /**************************************************************************/
_nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET * previous_packet_ptr)1074 static VOID _nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr)
1075 {
1076
1077 if (previous_packet_ptr)
1078 {
1079 previous_packet_ptr -> nx_packet_queue_next = packet_ptr -> nx_packet_queue_next;
1080 }
1081 else
1082 {
1083 client_ptr -> message_receive_queue_head = packet_ptr -> nx_packet_queue_next;
1084 }
1085
1086 if (packet_ptr == client_ptr -> message_receive_queue_tail)
1087 {
1088 client_ptr -> message_receive_queue_tail = previous_packet_ptr;
1089 }
1090
1091 client_ptr -> message_receive_queue_depth--;
1092 }
1093
1094 /**************************************************************************/
1095 /* */
1096 /* FUNCTION RELEASE */
1097 /* */
1098 /* _nxd_mqtt_process_connack PORTABLE C */
1099 /* 6.1 */
1100 /* AUTHOR */
1101 /* */
1102 /* Yuxin Zhou, Microsoft Corporation */
1103 /* */
1104 /* DESCRIPTION */
1105 /* */
1106 /* This internal function processes a CONNACK message from the broker. */
1107 /* */
1108 /* */
1109 /* INPUT */
1110 /* */
1111 /* client_ptr Pointer to MQTT Client */
1112 /* packet_ptr Pointer to the packet */
1113 /* wait_option Timeout value */
1114 /* */
1115 /* OUTPUT */
1116 /* */
1117 /* status */
1118 /* */
1119 /* CALLS */
1120 /* */
1121 /* [nxd_mqtt_connect_notify] User supplied connect */
1122 /* callback function */
1123 /* tx_mutex_get */
1124 /* tx_mutex_put */
1125 /* _nxd_mqtt_client_retransmit_message */
1126 /* _nxd_mqtt_client_connection_end */
1127 /* */
1128 /* */
1129 /* CALLED BY */
1130 /* */
1131 /* _nxd_mqtt_packet_receive_process */
1132 /* */
1133 /* RELEASE HISTORY */
1134 /* */
1135 /* DATE NAME DESCRIPTION */
1136 /* */
1137 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1138 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1139 /* resulting in version 6.1 */
1140 /* */
1141 /**************************************************************************/
_nxd_mqtt_process_connack(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,ULONG wait_option)1142 static UINT _nxd_mqtt_process_connack(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, ULONG wait_option)
1143 {
1144
1145 UINT ret = NXD_MQTT_COMMUNICATION_FAILURE;
1146 MQTT_PACKET_CONNACK *connack_packet_ptr = (MQTT_PACKET_CONNACK *)(packet_ptr -> nx_packet_prepend_ptr);
1147
1148
1149 /* Check the length. */
1150 if ((packet_ptr -> nx_packet_length != sizeof(MQTT_PACKET_CONNACK)) ||
1151 (connack_packet_ptr -> mqtt_connack_packet_header >> 4 != MQTT_CONTROL_PACKET_TYPE_CONNACK))
1152 {
1153 /* Invalid packet length. Free the packet and process error. */
1154 ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1155 }
1156 else
1157 {
1158
1159 /* Check remaining length. */
1160 if (connack_packet_ptr -> mqtt_connack_packet_remaining_length != 2)
1161 {
1162 ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1163 }
1164 /* Follow MQTT-3.2.2-1 rule. */
1165 else if ((client_ptr -> nxd_mqtt_clean_session) && (connack_packet_ptr -> mqtt_connack_packet_ack_flags & MQTT_CONNACK_CONNECT_FLAGS_SP))
1166 {
1167
1168 /* Client requested clean session, and server responded with Session Present. This is a violation. */
1169 ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1170 }
1171 else if (connack_packet_ptr -> mqtt_connack_packet_return_code > MQTT_CONNACK_CONNECT_RETURN_CODE_NOT_AUTHORIZED)
1172 {
1173 ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1174 }
1175 else if (connack_packet_ptr -> mqtt_connack_packet_return_code > 0)
1176 {
1177
1178 /* Pass the server return code to the application. */
1179 ret = (UINT)(NXD_MQTT_ERROR_CONNECT_RETURN_CODE + connack_packet_ptr -> mqtt_connack_packet_return_code);
1180 }
1181 else
1182 {
1183 ret = NXD_MQTT_SUCCESS;
1184
1185 /* Obtain mutex before we modify client control block. */
1186 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
1187
1188 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_CONNECTED;
1189
1190 /* Initialize the packet identification field. */
1191 client_ptr -> nxd_mqtt_client_packet_identifier = NXD_MQTT_INITIAL_PACKET_ID_VALUE;
1192
1193 /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
1194 if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
1195 client_ptr -> nxd_mqtt_client_packet_identifier = 1;
1196
1197 /* Release mutex */
1198 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1199 }
1200 }
1201
1202 /* Check callback function. */
1203 if (client_ptr -> nxd_mqtt_connect_notify)
1204 {
1205 client_ptr -> nxd_mqtt_connect_notify(client_ptr, ret, client_ptr -> nxd_mqtt_connect_context);
1206 }
1207
1208 if (ret == NXD_MQTT_SUCCESS)
1209 {
1210
1211 /* If client doesn't start with Clean Session, and there are un-acked PUBLISH messages,
1212 we shall re-publish these messages. */
1213 if ((client_ptr -> nxd_mqtt_clean_session != NX_TRUE) && (client_ptr -> message_transmit_queue_head))
1214 {
1215
1216 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
1217
1218 /* There are messages from the previous session that has not been acknowledged. */
1219 ret = _nxd_mqtt_client_retransmit_message(client_ptr, wait_option);
1220
1221 /* Release mutex */
1222 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1223 }
1224 }
1225 else
1226 {
1227
1228 /* End connection. */
1229 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
1230 }
1231
1232 return(ret);
1233 }
1234
1235 /**************************************************************************/
1236 /* */
1237 /* FUNCTION RELEASE */
1238 /* */
1239 /* _nxd_mqtt_process_publish_packet PORTABLE C */
1240 /* 6.1 */
1241 /* AUTHOR */
1242 /* */
1243 /* Yuxin Zhou, Microsoft Corporation */
1244 /* */
1245 /* DESCRIPTION */
1246 /* */
1247 /* This internal function processes a packet and parses topic and */
1248 /* message. */
1249 /* */
1250 /* */
1251 /* INPUT */
1252 /* */
1253 /* packet_ptr Pointer to the packet */
1254 /* topic_offset_ptr Return topic offset */
1255 /* topic_length_ptr Return topic length */
1256 /* message_offset_ptr Return message offset */
1257 /* message_length_ptr Return message length */
1258 /* */
1259 /* OUTPUT */
1260 /* */
1261 /* Status */
1262 /* */
1263 /* CALLS */
1264 /* */
1265 /* _nxd_mqtt_read_remaining_length */
1266 /* nx_packet_data_extract_offset */
1267 /* */
1268 /* */
1269 /* CALLED BY */
1270 /* */
1271 /* _nxd_mqtt_process_publish */
1272 /* */
1273 /* RELEASE HISTORY */
1274 /* */
1275 /* DATE NAME DESCRIPTION */
1276 /* */
1277 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1278 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1279 /* resulting in version 6.1 */
1280 /* */
1281 /**************************************************************************/
_nxd_mqtt_process_publish_packet(NX_PACKET * packet_ptr,ULONG * topic_offset_ptr,USHORT * topic_length_ptr,ULONG * message_offset_ptr,ULONG * message_length_ptr)1282 UINT _nxd_mqtt_process_publish_packet(NX_PACKET *packet_ptr, ULONG *topic_offset_ptr, USHORT *topic_length_ptr,
1283 ULONG *message_offset_ptr, ULONG *message_length_ptr)
1284 {
1285 UCHAR QoS;
1286 UINT remaining_length = 0;
1287 UINT topic_length;
1288 ULONG offset;
1289 UCHAR bytes[2];
1290 ULONG bytes_copied;
1291
1292
1293 QoS = (UCHAR)((*(packet_ptr -> nx_packet_prepend_ptr) & MQTT_PUBLISH_QOS_LEVEL_FIELD) >> 1);
1294
1295 if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1296 {
1297 return(NXD_MQTT_INVALID_PACKET);
1298 }
1299
1300 if (remaining_length < 2)
1301 {
1302 return(NXD_MQTT_INVALID_PACKET);
1303 }
1304
1305 /* Get topic length fields. */
1306 if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1307 (bytes_copied != sizeof(bytes)))
1308 {
1309 return(NXD_MQTT_INVALID_PACKET);
1310 }
1311
1312 topic_length = (UINT)(*(bytes) << 8) | (*(bytes + 1));
1313
1314 if (topic_length > remaining_length - 2u)
1315 {
1316 return(NXD_MQTT_INVALID_PACKET);
1317 }
1318
1319 *topic_offset_ptr = offset + 2;
1320 *topic_length_ptr = (USHORT)topic_length;
1321
1322 remaining_length = remaining_length - topic_length - 2;
1323 if ((QoS == 1) || (QoS == 2))
1324 {
1325 offset += 2 + 2 + topic_length;
1326
1327 if (remaining_length < 2)
1328 {
1329 return(NXD_MQTT_INVALID_PACKET);
1330 }
1331 remaining_length = remaining_length - 2;
1332 }
1333 else
1334 {
1335 offset += 2 + topic_length;
1336 }
1337
1338 *message_offset_ptr = offset;
1339 *message_length_ptr = (ULONG)remaining_length;
1340
1341 /* Return */
1342 return(NXD_MQTT_SUCCESS);
1343 }
1344 /**************************************************************************/
1345 /* */
1346 /* FUNCTION RELEASE */
1347 /* */
1348 /* _nxd_mqtt_process_publish PORTABLE C */
1349 /* 6.2.0 */
1350 /* AUTHOR */
1351 /* */
1352 /* Yuxin Zhou, Microsoft Corporation */
1353 /* */
1354 /* DESCRIPTION */
1355 /* */
1356 /* This internal function process a publish message from the broker. */
1357 /* */
1358 /* */
1359 /* INPUT */
1360 /* */
1361 /* client_ptr Pointer to MQTT Client */
1362 /* packet_ptr Pointer to the packet */
1363 /* */
1364 /* OUTPUT */
1365 /* */
1366 /* NX_TRUE - packet is consumed */
1367 /* NX_FALSE - packet is not consumed */
1368 /* */
1369 /* CALLS */
1370 /* */
1371 /* [receive_notify] User supplied receive */
1372 /* callback function */
1373 /* _nxd_mqtt_packet_allocate */
1374 /* _nxd_mqtt_packet_send */
1375 /* nx_packet_release */
1376 /* nx_secure_tls_session_send */
1377 /* _nxd_mqtt_process_publish_packet */
1378 /* _nxd_mqtt_copy_transmit_packet */
1379 /* */
1380 /* */
1381 /* CALLED BY */
1382 /* */
1383 /* _nxd_mqtt_packet_receive_process */
1384 /* */
1385 /* RELEASE HISTORY */
1386 /* */
1387 /* DATE NAME DESCRIPTION */
1388 /* */
1389 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1390 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1391 /* resulting in version 6.1 */
1392 /* 10-31-2022 Bo Chen Modified comment(s), improved */
1393 /* the logic of sending packet,*/
1394 /* resulting in version 6.2.0 */
1395 /* */
1396 /**************************************************************************/
_nxd_mqtt_process_publish(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1397 static UINT _nxd_mqtt_process_publish(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1398 {
1399 MQTT_PACKET_PUBLISH_RESPONSE *pubresp_ptr;
1400 UINT status;
1401 USHORT packet_id = 0;
1402 UCHAR QoS;
1403 UINT enqueue_message = 0;
1404 NX_PACKET *transmit_packet_ptr;
1405 UINT remaining_length = 0;
1406 UINT packet_consumed = NX_FALSE;
1407 UCHAR fixed_header;
1408 USHORT transmit_packet_id;
1409 UINT topic_length;
1410 ULONG offset;
1411 UCHAR bytes[2];
1412 ULONG bytes_copied;
1413
1414 QoS = (UCHAR)((*(packet_ptr -> nx_packet_prepend_ptr) & MQTT_PUBLISH_QOS_LEVEL_FIELD) >> 1);
1415
1416 if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1417 {
1418 return(NX_FALSE);
1419 }
1420
1421 if (remaining_length < 2)
1422 {
1423 return(NXD_MQTT_INVALID_PACKET);
1424 }
1425
1426 /* Get topic length fields. */
1427 if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1428 (bytes_copied != sizeof(bytes)))
1429 {
1430 return(NXD_MQTT_INVALID_PACKET);
1431 }
1432
1433 topic_length = (UINT)(*(bytes) << 8) | (*(bytes + 1));
1434
1435 if (topic_length > remaining_length - 2u)
1436 {
1437 return(NXD_MQTT_INVALID_PACKET);
1438 }
1439
1440 if (QoS == 0)
1441 {
1442 enqueue_message = 1;
1443 }
1444 else
1445 {
1446 /* QoS 1 or QoS 2 messages. */
1447 /* Get packet id fields. */
1448 if (nx_packet_data_extract_offset(packet_ptr, offset + 2 + topic_length, &bytes, sizeof(bytes), &bytes_copied))
1449 {
1450 return(NXD_MQTT_INVALID_PACKET);
1451 }
1452
1453 packet_id = (USHORT)(((*bytes) << 8) | (*(bytes + 1)));
1454
1455 /* Look for an existing transmit packets with the same packet id */
1456 transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1457
1458 while (transmit_packet_ptr)
1459 {
1460 fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1461 transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1462 if ((transmit_packet_id == packet_id) &&
1463 ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_PUBREC << 4)))
1464 {
1465
1466 /* Found a packet containing the packet_id */
1467 break;
1468 }
1469 transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1470 }
1471
1472 if (transmit_packet_ptr)
1473 {
1474 /* This published data is already in our system. No need to deliver this message to the application. */
1475 enqueue_message = 0;
1476 }
1477 else
1478 {
1479 enqueue_message = 1;
1480 }
1481 }
1482
1483 if (enqueue_message)
1484 {
1485 if (packet_ptr -> nx_packet_length > (offset + remaining_length))
1486 {
1487
1488 /* This packet contains multiple messages. */
1489 if (nx_packet_copy(packet_ptr, &packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_NO_WAIT))
1490 {
1491
1492 /* No packet is available. */
1493 return(NX_FALSE);
1494 }
1495 }
1496 else
1497 {
1498 packet_consumed = NX_TRUE;
1499 }
1500
1501 /* Increment the queue depth counter. */
1502 client_ptr -> message_receive_queue_depth++;
1503
1504 if (client_ptr -> message_receive_queue_head == NX_NULL)
1505 {
1506 client_ptr -> message_receive_queue_head = packet_ptr;
1507 }
1508 else
1509 {
1510 client_ptr -> message_receive_queue_tail -> nx_packet_queue_next = packet_ptr;
1511 }
1512 client_ptr -> message_receive_queue_tail = packet_ptr;
1513
1514 /* Invoke the user-defined receive notify function if it is set. */
1515 if (client_ptr -> nxd_mqtt_client_receive_notify)
1516 {
1517 (*(client_ptr -> nxd_mqtt_client_receive_notify))(client_ptr, client_ptr -> message_receive_queue_depth);
1518 }
1519 }
1520
1521 /* If the message QoS level is 0, we are done. */
1522 if (QoS == 0)
1523 {
1524 /* Return */
1525 return(packet_consumed);
1526 }
1527
1528 /* Send out proper ACKs for QoS 1 and 2 messages. */
1529 /* Allocate a new packet so we can send out a response. */
1530 status = _nxd_mqtt_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
1531 if (status)
1532 {
1533 /* Packet allocation fails. */
1534 return(packet_consumed);
1535 }
1536
1537 /* Fill in the packet ID */
1538 pubresp_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)(packet_ptr -> nx_packet_prepend_ptr);
1539 pubresp_ptr -> mqtt_publish_response_packet_remaining_length = 2;
1540 pubresp_ptr -> mqtt_publish_response_packet_packet_identifier_msb = (UCHAR)(packet_id >> 8);
1541 pubresp_ptr -> mqtt_publish_response_packet_packet_identifier_lsb = (UCHAR)(packet_id & 0xFF);
1542
1543 if (QoS == 1)
1544 {
1545
1546 pubresp_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBACK << 4;
1547 }
1548 else
1549 {
1550 pubresp_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBREC << 4;
1551 }
1552
1553 packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + sizeof(MQTT_PACKET_PUBLISH_RESPONSE);
1554 packet_ptr -> nx_packet_length = sizeof(MQTT_PACKET_PUBLISH_RESPONSE);
1555
1556 if (QoS == 2)
1557 {
1558
1559 /* Copy packet for checking duplicate publish packet. */
1560 if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
1561 packet_id, NX_FALSE, NX_WAIT_FOREVER))
1562 {
1563
1564 /* Release the packet. */
1565 nx_packet_release(packet_ptr);
1566 return(packet_consumed);
1567 }
1568 if (client_ptr -> message_transmit_queue_head == NX_NULL)
1569 {
1570 client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
1571 }
1572 else
1573 {
1574 client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
1575 }
1576 client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
1577 }
1578
1579 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1580
1581 /* Send packet to server. */
1582 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
1583
1584 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
1585 if (status)
1586 {
1587
1588 /* Release the packet. */
1589 nx_packet_release(packet_ptr);
1590 }
1591 else
1592 {
1593 /* Update the timeout value. */
1594 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
1595 }
1596
1597 /* Return */
1598 return(packet_consumed);
1599 }
1600
1601 /**************************************************************************/
1602 /* */
1603 /* FUNCTION RELEASE */
1604 /* */
1605 /* _nxd_mqtt_process_publish_response PORTABLE C */
1606 /* 6.2.0 */
1607 /* AUTHOR */
1608 /* */
1609 /* Yuxin Zhou, Microsoft Corporation */
1610 /* */
1611 /* DESCRIPTION */
1612 /* */
1613 /* This internal function process a publish response messages. */
1614 /* Publish Response messages are: PUBACK, PUBREC, PUBREL */
1615 /* */
1616 /* INPUT */
1617 /* */
1618 /* client_ptr Pointer to MQTT Client */
1619 /* packet_ptr Pointer to the packet */
1620 /* */
1621 /* OUTPUT */
1622 /* */
1623 /* Status */
1624 /* */
1625 /* CALLS */
1626 /* */
1627 /* [nxd_mqtt_client_receive_notify] User supplied publish */
1628 /* callback function */
1629 /* _nxd_mqtt_release_transmit_packet */
1630 /* _nxd_mqtt_packet_send */
1631 /* */
1632 /* */
1633 /* CALLED BY */
1634 /* */
1635 /* _nxd_mqtt_packet_receive_process */
1636 /* */
1637 /* RELEASE HISTORY */
1638 /* */
1639 /* DATE NAME DESCRIPTION */
1640 /* */
1641 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1642 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
1643 /* added ack receive notify, */
1644 /* resulting in version 6.1 */
1645 /* 10-31-2022 Bo Chen Modified comment(s), improved */
1646 /* the logic of sending packet,*/
1647 /* resulting in version 6.2.0 */
1648 /* */
1649 /**************************************************************************/
_nxd_mqtt_process_publish_response(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1650 static UINT _nxd_mqtt_process_publish_response(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1651 {
1652 MQTT_PACKET_PUBLISH_RESPONSE *response_ptr;
1653 USHORT packet_id;
1654 NX_PACKET *previous_packet_ptr;
1655 NX_PACKET *transmit_packet_ptr;
1656 NX_PACKET *response_packet;
1657 UINT ret;
1658 UCHAR fixed_header;
1659 USHORT transmit_packet_id;
1660
1661 response_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)(packet_ptr -> nx_packet_prepend_ptr);
1662
1663 /* Validate the packet. */
1664 if (response_ptr -> mqtt_publish_response_packet_remaining_length != 2)
1665 {
1666 /* Invalid remaining_length value. Return 1 so the caller can release
1667 the packet. */
1668
1669 return(1);
1670 }
1671
1672 packet_id = (USHORT)((response_ptr -> mqtt_publish_response_packet_packet_identifier_msb << 8) |
1673 (response_ptr -> mqtt_publish_response_packet_packet_identifier_lsb));
1674
1675 /* Search all the outstanding transmitted packets for a match. */
1676 previous_packet_ptr = NX_NULL;
1677 transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1678 while (transmit_packet_ptr)
1679 {
1680 fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1681 transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1682 if (transmit_packet_id == packet_id)
1683 {
1684
1685 /* Found the matching packet id */
1686 if (((response_ptr -> mqtt_publish_response_packet_header) >> 4) == MQTT_CONTROL_PACKET_TYPE_PUBACK)
1687 {
1688
1689 /* PUBACK is the response to a PUBLISH packet with QoS Level 1*/
1690 /* Therefore we verify that packet contains PUBLISH packet with QoS level 1*/
1691 if ((fixed_header & 0xF6) == ((MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4) | MQTT_PUBLISH_QOS_LEVEL_1))
1692 {
1693
1694 /* Check ack notify function. */
1695 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1696 {
1697
1698 /* Call notify function. Note: user routine should not release the packet. */
1699 client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_PUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1700 }
1701
1702 /* QoS Level1 message receives an ACK. */
1703 /* This message can be released. */
1704 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1705
1706 /* Return with value 1, so the caller will release packet_ptr */
1707 return(1);
1708 }
1709 }
1710 else if (((response_ptr -> mqtt_publish_response_packet_header) >> 4) == MQTT_CONTROL_PACKET_TYPE_PUBREL)
1711 {
1712
1713 /* QoS 2 publish Release received, part 2. */
1714 /* Therefore we verify that packet contains PUBLISH packet with QoS level 2*/
1715 if ((fixed_header & 0xF6) == (MQTT_CONTROL_PACKET_TYPE_PUBREC << 4))
1716 {
1717
1718 /* QoS Level2 message receives an ACK. */
1719 /* This message can be released. */
1720 /* Send PUBCOMP */
1721
1722 /* Allocate a packet to send the response. */
1723 ret = _nxd_mqtt_packet_allocate(client_ptr, &response_packet, NX_WAIT_FOREVER);
1724 if (ret)
1725 {
1726 return(1);
1727 }
1728
1729 if (4u > ((ULONG)(response_packet -> nx_packet_data_end) - (ULONG)(response_packet -> nx_packet_append_ptr)))
1730 {
1731 nx_packet_release(response_packet);
1732
1733 /* Packet buffer is too small to hold the message. */
1734 return(NX_SIZE_ERROR);
1735 }
1736
1737 response_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)response_packet -> nx_packet_prepend_ptr;
1738
1739 response_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBCOMP << 4;
1740 response_ptr -> mqtt_publish_response_packet_remaining_length = 2;
1741
1742 /* Fill in packet ID */
1743 response_packet -> nx_packet_prepend_ptr[3] = packet_ptr -> nx_packet_prepend_ptr[3];
1744 response_packet -> nx_packet_prepend_ptr[4] = packet_ptr -> nx_packet_prepend_ptr[4];
1745 response_packet -> nx_packet_append_ptr = response_packet -> nx_packet_prepend_ptr + 4;
1746 response_packet -> nx_packet_length = 4;
1747
1748 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1749
1750 /* Send packet to server. */
1751 ret = _nxd_mqtt_packet_send(client_ptr, response_packet, NX_WAIT_FOREVER);
1752
1753 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
1754
1755 /* Update the timeout value. */
1756 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
1757
1758 if (ret)
1759 {
1760 nx_packet_release(response_packet);
1761 }
1762
1763 /* Check ack notify function. */
1764 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1765 {
1766
1767 /* Call notify function. Note: user routine should not release the packet. */
1768 client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_PUBREL, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1769 }
1770
1771 /* This packet can be released. */
1772 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1773
1774 /* Return with value 1, so the caller will release packet_ptr */
1775 return(1);
1776 }
1777 }
1778 }
1779
1780 /* Move on to the next packet */
1781 previous_packet_ptr = transmit_packet_ptr;
1782 transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1783 }
1784
1785 /* nothing is found. Return 1 to release the packet.*/
1786 return(1);
1787 }
1788
1789 /**************************************************************************/
1790 /* */
1791 /* FUNCTION RELEASE */
1792 /* */
1793 /* _nxd_mqtt_process_sub_unsub_ack PORTABLE C */
1794 /* 6.1 */
1795 /* AUTHOR */
1796 /* */
1797 /* Yuxin Zhou, Microsoft Corporation */
1798 /* */
1799 /* DESCRIPTION */
1800 /* */
1801 /* This internal function process an ACK message for subscribe */
1802 /* or unsubscribe request. */
1803 /* */
1804 /* INPUT */
1805 /* */
1806 /* client_ptr Pointer to MQTT Client */
1807 /* packet_ptr Pointer to the packet */
1808 /* */
1809 /* OUTPUT */
1810 /* */
1811 /* Status */
1812 /* */
1813 /* CALLS */
1814 /* */
1815 /* [nxd_mqtt_client_receive_notify] User supplied publish */
1816 /* callback function */
1817 /* _nxd_mqtt_release_transmit_packet */
1818 /* Release the memory block */
1819 /* _nxd_mqtt_read_remaining_length Skip the remaining length */
1820 /* field */
1821 /* */
1822 /* CALLED BY */
1823 /* */
1824 /* _nxd_mqtt_packet_receive_process */
1825 /* */
1826 /* RELEASE HISTORY */
1827 /* */
1828 /* DATE NAME DESCRIPTION */
1829 /* */
1830 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1831 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
1832 /* added ack receive notify, */
1833 /* resulting in version 6.1 */
1834 /* */
1835 /**************************************************************************/
_nxd_mqtt_process_sub_unsub_ack(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1836 static UINT _nxd_mqtt_process_sub_unsub_ack(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1837 {
1838
1839 USHORT packet_id;
1840 NX_PACKET *previous_packet_ptr;
1841 NX_PACKET *transmit_packet_ptr;
1842 UCHAR response_header;
1843 UCHAR fixed_header;
1844 USHORT transmit_packet_id;
1845 UINT remaining_length;
1846 ULONG offset;
1847 UCHAR bytes[2];
1848 ULONG bytes_copied;
1849
1850
1851 response_header = *(packet_ptr -> nx_packet_prepend_ptr);
1852
1853 if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1854 {
1855
1856 /* Unable to process the sub/unsub ack. Simply return and release the packet. */
1857 return(NXD_MQTT_INVALID_PACKET);
1858 }
1859
1860 /* Get packet id fields. */
1861 if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1862 (bytes_copied != sizeof(bytes)))
1863 {
1864 return(NXD_MQTT_INVALID_PACKET);
1865 }
1866
1867 packet_id = (USHORT)(((*bytes) << 8) | (*(bytes + 1)));
1868
1869 /* Search all the outstanding transmitted packets for a match. */
1870 previous_packet_ptr = NX_NULL;
1871 transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1872 while (transmit_packet_ptr)
1873 {
1874 fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1875 transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1876 if (transmit_packet_id == packet_id)
1877 {
1878
1879 /* Found the matching packet id */
1880 if (((response_header >> 4) == MQTT_CONTROL_PACKET_TYPE_SUBACK) &&
1881 ((fixed_header >> 4) == MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE))
1882 {
1883 /* Validate the packet. */
1884 if (remaining_length != 3)
1885 {
1886 /* Invalid remaining_length value. */
1887 return(1);
1888 }
1889
1890 /* Check ack notify function. */
1891 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1892 {
1893
1894 /* Call notify function. Note: user routine should not release the packet. */
1895 client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_SUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1896 }
1897
1898 /* Release the transmit packet. */
1899 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1900
1901 return(1);
1902 }
1903 else if (((response_header >> 4) == MQTT_CONTROL_PACKET_TYPE_UNSUBACK) &&
1904 ((fixed_header >> 4) == MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE))
1905 {
1906 /* Validate the packet. */
1907 if (remaining_length != 2)
1908 {
1909 /* Invalid remaining_length value. */
1910 return(1);
1911 }
1912
1913 /* Check ack notify function. */
1914 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1915 {
1916
1917 /* Call notify function. Note: user routine should not release the packet. */
1918 client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_UNSUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1919 }
1920
1921 /* Unsubscribe succeeded. */
1922 /* Release the transmit packet. */
1923 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1924
1925 return(1);
1926 }
1927 }
1928
1929 /* Move on to the next packet */
1930 previous_packet_ptr = transmit_packet_ptr;
1931 transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1932 }
1933 return(1);
1934 }
1935
1936
1937 /**************************************************************************/
1938 /* */
1939 /* FUNCTION RELEASE */
1940 /* */
1941 /* _nxd_mqtt_process_pingresp PORTABLE C */
1942 /* 6.1 */
1943 /* AUTHOR */
1944 /* */
1945 /* Yuxin Zhou, Microsoft Corporation */
1946 /* */
1947 /* DESCRIPTION */
1948 /* */
1949 /* This internal function process a PINGRESP message. */
1950 /* */
1951 /* INPUT */
1952 /* */
1953 /* client_ptr Pointer to MQTT Client */
1954 /* packet_ptr Pointer to the packet */
1955 /* */
1956 /* OUTPUT */
1957 /* */
1958 /* Status */
1959 /* */
1960 /* CALLS */
1961 /* */
1962 /* None */
1963 /* callback function */
1964 /* */
1965 /* CALLED BY */
1966 /* */
1967 /* _nxd_mqtt_packet_receive_process */
1968 /* */
1969 /* RELEASE HISTORY */
1970 /* */
1971 /* DATE NAME DESCRIPTION */
1972 /* */
1973 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
1974 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
1975 /* resulting in version 6.1 */
1976 /* */
1977 /**************************************************************************/
_nxd_mqtt_process_pingresp(NXD_MQTT_CLIENT * client_ptr)1978 static VOID _nxd_mqtt_process_pingresp(NXD_MQTT_CLIENT *client_ptr)
1979 {
1980
1981
1982 /* If there is an outstanding ping, mark it as responded. */
1983 if (client_ptr -> nxd_mqtt_ping_not_responded == NX_TRUE)
1984 {
1985 client_ptr -> nxd_mqtt_ping_not_responded = NX_FALSE;
1986
1987 client_ptr -> nxd_mqtt_ping_sent_time = 0;
1988 }
1989
1990 return;
1991 }
1992
1993
1994 /**************************************************************************/
1995 /* */
1996 /* FUNCTION RELEASE */
1997 /* */
1998 /* _nxd_mqtt_process_disconnect PORTABLE C */
1999 /* 6.1 */
2000 /* AUTHOR */
2001 /* */
2002 /* Yuxin Zhou, Microsoft Corporation */
2003 /* */
2004 /* DESCRIPTION */
2005 /* */
2006 /* This internal function process a DISCONNECT message. */
2007 /* */
2008 /* INPUT */
2009 /* */
2010 /* client_ptr Pointer to MQTT Client */
2011 /* packet_ptr Pointer to the packet */
2012 /* */
2013 /* OUTPUT */
2014 /* */
2015 /* None */
2016 /* */
2017 /* CALLS */
2018 /* */
2019 /* nx_secure_tls_session_send */
2020 /* nx_tcp_socket_disconnect */
2021 /* nx_tcp_client_socket_unbind */
2022 /* _nxd_mqtt_release_transmit_packet */
2023 /* _nxd_mqtt_release_receive_packet */
2024 /* _nxd_mqtt_client_connection_end */
2025 /* */
2026 /* CALLED BY */
2027 /* */
2028 /* _nxd_mqtt_packet_receive_process */
2029 /* */
2030 /* RELEASE HISTORY */
2031 /* */
2032 /* DATE NAME DESCRIPTION */
2033 /* */
2034 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2035 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2036 /* resulting in version 6.1 */
2037 /* */
2038 /**************************************************************************/
_nxd_mqtt_process_disconnect(NXD_MQTT_CLIENT * client_ptr)2039 static VOID _nxd_mqtt_process_disconnect(NXD_MQTT_CLIENT *client_ptr)
2040 {
2041 NX_PACKET *previous = NX_NULL;
2042 NX_PACKET *current;
2043 NX_PACKET *next;
2044 UINT disconnect_callback = NX_FALSE;
2045 UINT status;
2046 UCHAR fixed_header;
2047
2048 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2049 {
2050 /* State changes from CONNECTED TO IDLE. Call disconnect notify callback
2051 if the function is set. */
2052 disconnect_callback = NX_TRUE;
2053 }
2054 else if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTING)
2055 {
2056
2057 /* If state isn't CONNECTED or CONNECTING, just return. */
2058 return;
2059 }
2060
2061 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2062
2063 /* End connection. */
2064 _nxd_mqtt_client_connection_end(client_ptr, NXD_MQTT_SOCKET_TIMEOUT);
2065
2066 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
2067
2068 /* Free up sub/unsub packets on the transmit queue. */
2069 current = client_ptr -> message_transmit_queue_head;
2070
2071 while (current)
2072 {
2073 next = current -> nx_packet_queue_next;
2074 fixed_header = *(current -> nx_packet_prepend_ptr);
2075
2076 if (((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4)) ||
2077 ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE << 4)))
2078 {
2079 _nxd_mqtt_release_transmit_packet(client_ptr, current, previous);
2080 }
2081 else
2082 {
2083 previous = current;
2084 }
2085 current = next;
2086 }
2087
2088 /* If a callback notification is defined, call it now. */
2089 if ((disconnect_callback == NX_TRUE) && (client_ptr -> nxd_mqtt_disconnect_notify))
2090 {
2091 client_ptr -> nxd_mqtt_disconnect_notify(client_ptr);
2092 }
2093
2094 /* If a connect callback notification is defined and is still in connecting stage, call it now. */
2095 if ((disconnect_callback == NX_FALSE) && (client_ptr -> nxd_mqtt_connect_notify))
2096 {
2097 client_ptr -> nxd_mqtt_connect_notify(client_ptr, NXD_MQTT_CONNECT_FAILURE, client_ptr -> nxd_mqtt_connect_context);
2098 }
2099
2100 if (status == TX_SUCCESS)
2101 {
2102 /* Remove all the packets in the receive queue. */
2103 while (client_ptr -> message_receive_queue_head)
2104 {
2105 _nxd_mqtt_release_receive_packet(client_ptr, client_ptr -> message_receive_queue_head, NX_NULL);
2106 }
2107 client_ptr -> message_receive_queue_depth = 0;
2108
2109 /* Clear the MQTT_PACKET_RECEIVE_EVENT */
2110 #ifndef NXD_MQTT_CLOUD_ENABLE
2111 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, ~MQTT_PACKET_RECEIVE_EVENT, TX_AND);
2112 #else
2113 nx_cloud_module_event_clear(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PACKET_RECEIVE_EVENT);
2114 #endif /* NXD_MQTT_CLOUD_ENABLE */
2115 }
2116
2117 /* Clear flags if keep alive is enabled. */
2118 if (client_ptr -> nxd_mqtt_keepalive)
2119 {
2120 client_ptr -> nxd_mqtt_ping_not_responded = 0;
2121 client_ptr -> nxd_mqtt_ping_sent_time = 0;
2122 }
2123
2124 /* Clean up the information when disconnecting. */
2125 client_ptr -> nxd_mqtt_client_username = NX_NULL;
2126 client_ptr -> nxd_mqtt_client_password = NX_NULL;
2127 client_ptr -> nxd_mqtt_client_will_topic = NX_NULL;
2128 client_ptr -> nxd_mqtt_client_will_message = NX_NULL;
2129 client_ptr -> nxd_mqtt_client_will_qos_retain = 0;
2130
2131 /* Release current processing packet. */
2132 if (client_ptr -> nxd_mqtt_client_processing_packet)
2133 {
2134 nx_packet_release(client_ptr -> nxd_mqtt_client_processing_packet);
2135 client_ptr -> nxd_mqtt_client_processing_packet = NX_NULL;
2136 }
2137
2138 return;
2139 }
2140
2141
2142 /**************************************************************************/
2143 /* */
2144 /* FUNCTION RELEASE */
2145 /* */
2146 /* _nxd_mqtt_packet_receive_process PORTABLE C */
2147 /* 6.2.0 */
2148 /* AUTHOR */
2149 /* */
2150 /* Yuxin Zhou, Microsoft Corporation */
2151 /* */
2152 /* DESCRIPTION */
2153 /* */
2154 /* This internal function process MQTT message. */
2155 /* NOTE: MQTT Mutex is NOT obtained on entering this function. */
2156 /* Therefore it shouldn't hold the mutex when it exists this function. */
2157 /* */
2158 /* INPUT */
2159 /* */
2160 /* client_ptr Pointer to MQTT Client */
2161 /* */
2162 /* OUTPUT */
2163 /* */
2164 /* None */
2165 /* */
2166 /* CALLS */
2167 /* */
2168 /* nx_secure_tls_session_receive */
2169 /* nx_tcp_socket_receive */
2170 /* _nxd_mqtt_process_publish */
2171 /* _nxd_mqtt_process_publish_response */
2172 /* _nxd_mqtt_process_sub_unsub_ack */
2173 /* _nxd_mqtt_process_pingresp */
2174 /* _nxd_mqtt_process_disconnect */
2175 /* nx_packet_release */
2176 /* */
2177 /* CALLED BY */
2178 /* */
2179 /* _nxd_mqtt_client_event_process */
2180 /* */
2181 /* RELEASE HISTORY */
2182 /* */
2183 /* DATE NAME DESCRIPTION */
2184 /* */
2185 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2186 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2187 /* resulting in version 6.1 */
2188 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
2189 /* mqtt over websocket, */
2190 /* resulting in version 6.2.0 */
2191 /* */
2192 /**************************************************************************/
_nxd_mqtt_packet_receive_process(NXD_MQTT_CLIENT * client_ptr)2193 static VOID _nxd_mqtt_packet_receive_process(NXD_MQTT_CLIENT *client_ptr)
2194 {
2195 NX_PACKET *packet_ptr;
2196 NX_PACKET *previous_packet_ptr;
2197 UINT status;
2198 UCHAR packet_type;
2199 UINT remaining_length;
2200 UINT packet_consumed;
2201 ULONG offset;
2202 ULONG bytes_copied;
2203 ULONG packet_length;
2204
2205 for (;;)
2206 {
2207
2208 /* Release the mutex. */
2209 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2210
2211 /* Make a receive call. */
2212 status = _nxd_mqtt_packet_receive(client_ptr, &packet_ptr, NX_NO_WAIT);
2213
2214 if (status != NX_SUCCESS)
2215 {
2216 if ((status != NX_NO_PACKET) && (status != NX_CONTINUE))
2217 {
2218
2219 /* Network issue. Close the MQTT session. */
2220 #ifndef NXD_MQTT_CLOUD_ENABLE
2221 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_NETWORK_DISCONNECT_EVENT, TX_OR);
2222 #else
2223 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_NETWORK_DISCONNECT_EVENT);
2224 #endif /* NXD_MQTT_CLOUD_ENABLE */
2225 }
2226
2227 break;
2228 }
2229
2230 /* Obtain the mutex. */
2231 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
2232
2233 /* Is there a packet waiting for processing? */
2234 if (client_ptr -> nxd_mqtt_client_processing_packet)
2235 {
2236
2237 /* Yes. Link received packet to existing one. */
2238 if (client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last)
2239 {
2240 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last -> nx_packet_next = packet_ptr;
2241 }
2242 else
2243 {
2244 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_next = packet_ptr;
2245 }
2246 if (packet_ptr -> nx_packet_last)
2247 {
2248 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last = packet_ptr -> nx_packet_last;
2249 }
2250 else
2251 {
2252 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last = packet_ptr;
2253 }
2254 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_length += packet_ptr -> nx_packet_length;
2255
2256 /* Start to process existing packet. */
2257 packet_ptr = client_ptr -> nxd_mqtt_client_processing_packet;
2258 client_ptr -> nxd_mqtt_client_processing_packet = NX_NULL;
2259 }
2260
2261 /* Check notify function. */
2262 if (client_ptr -> nxd_mqtt_packet_receive_notify)
2263 {
2264
2265 /* Call notify function. Return NX_TRUE if the packet has been consumed. */
2266 if (client_ptr -> nxd_mqtt_packet_receive_notify(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_packet_receive_context) == NX_TRUE)
2267 {
2268 continue;
2269 }
2270 }
2271
2272 packet_consumed = NX_FALSE;
2273 while (packet_ptr)
2274 {
2275 /* Parse the incoming packet. */
2276 status = _nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset);
2277 if (status == NXD_MQTT_PARTIAL_PACKET)
2278 {
2279
2280 /* We only have partial MQTT message.
2281 * Put it to waiting list for more packets. */
2282 client_ptr -> nxd_mqtt_client_processing_packet = packet_ptr;
2283 packet_consumed = NX_TRUE;
2284 break;
2285 }
2286 else if (status)
2287 {
2288
2289 /* Invalid packet. */
2290 break;
2291 }
2292
2293 /* Get packet type. */
2294 if (nx_packet_data_extract_offset(packet_ptr, 0, &packet_type, 1, &bytes_copied))
2295 {
2296
2297 /* Unable to read packet type. */
2298 break;
2299 }
2300
2301 /* Right shift 4 bits to get the packet type. */
2302 packet_type = packet_type >> 4;
2303
2304 /* Process based on packet type. */
2305 switch (packet_type)
2306 {
2307 case MQTT_CONTROL_PACKET_TYPE_CONNECT:
2308 /* Client does not accept connections. Nothing needs to be done. */
2309 break;
2310 case MQTT_CONTROL_PACKET_TYPE_CONNACK:
2311 _nxd_mqtt_process_connack(client_ptr, packet_ptr, NX_NO_WAIT);
2312 break;
2313
2314 case MQTT_CONTROL_PACKET_TYPE_PUBLISH:
2315 packet_consumed = _nxd_mqtt_process_publish(client_ptr, packet_ptr);
2316 break;
2317
2318 case MQTT_CONTROL_PACKET_TYPE_PUBACK:
2319 case MQTT_CONTROL_PACKET_TYPE_PUBREL:
2320 _nxd_mqtt_process_publish_response(client_ptr, packet_ptr);
2321 break;
2322
2323 case MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE:
2324 case MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE:
2325 /* Client should not process subscribe or unsubscribe message. */
2326 break;
2327
2328 case MQTT_CONTROL_PACKET_TYPE_SUBACK:
2329 case MQTT_CONTROL_PACKET_TYPE_UNSUBACK:
2330 _nxd_mqtt_process_sub_unsub_ack(client_ptr, packet_ptr);
2331 break;
2332
2333
2334 case MQTT_CONTROL_PACKET_TYPE_PINGREQ:
2335 /* Client is not supposed to receive ping req. Ignore it. */
2336 break;
2337
2338 case MQTT_CONTROL_PACKET_TYPE_PINGRESP:
2339 _nxd_mqtt_process_pingresp(client_ptr);
2340 break;
2341
2342 case MQTT_CONTROL_PACKET_TYPE_DISCONNECT:
2343 _nxd_mqtt_process_disconnect(client_ptr);
2344 break;
2345
2346 /* Publisher sender message type for QoS 2. Not supported. */
2347 case MQTT_CONTROL_PACKET_TYPE_PUBREC:
2348 case MQTT_CONTROL_PACKET_TYPE_PUBCOMP:
2349 default:
2350 /* Unknown type. */
2351 break;
2352 }
2353
2354 if (packet_consumed)
2355 {
2356 break;
2357 }
2358
2359 /* Trim current packet. */
2360 offset += remaining_length;
2361 packet_length = packet_ptr -> nx_packet_length;
2362 if (packet_length > offset)
2363 {
2364
2365 /* Multiple MQTT message in one packet. */
2366 packet_length = packet_ptr -> nx_packet_length - offset;
2367 while ((ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr) <= offset)
2368 {
2369 offset -= (ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr);
2370
2371 /* Current packet can be released. */
2372 previous_packet_ptr = packet_ptr;
2373 packet_ptr = packet_ptr -> nx_packet_next;
2374 previous_packet_ptr -> nx_packet_next = NX_NULL;
2375 nx_packet_release(previous_packet_ptr);
2376 if (packet_ptr == NX_NULL)
2377 {
2378
2379 /* Invalid packet. */
2380 break;
2381 }
2382 }
2383
2384 if (packet_ptr)
2385 {
2386
2387 /* Adjust current packet. */
2388 packet_ptr -> nx_packet_prepend_ptr = packet_ptr -> nx_packet_prepend_ptr + offset;
2389 packet_ptr -> nx_packet_length = packet_length;
2390 }
2391 }
2392 else
2393 {
2394
2395 /* All messages in current packet is processed. */
2396 break;
2397 }
2398 }
2399
2400 if (!packet_consumed)
2401 {
2402 nx_packet_release(packet_ptr);
2403 }
2404 }
2405
2406 /* No more data in the receive queue. Return. */
2407
2408 return;
2409 }
2410
2411
2412 /**************************************************************************/
2413 /* */
2414 /* FUNCTION RELEASE */
2415 /* */
2416 /* _nxd_mqtt_tcp_establish_process PORTABLE C */
2417 /* 6.2.0 */
2418 /* AUTHOR */
2419 /* */
2420 /* Yuxin Zhou, Microsoft Corporation */
2421 /* */
2422 /* DESCRIPTION */
2423 /* */
2424 /* This function processes MQTT TCP connection establish event. */
2425 /* */
2426 /* INPUT */
2427 /* */
2428 /* client_ptr Pointer to MQTT Client */
2429 /* */
2430 /* OUTPUT */
2431 /* */
2432 /* None */
2433 /* */
2434 /* CALLS */
2435 /* */
2436 /* nx_secure_tls_session_start */
2437 /* _nxd_mqtt_client_connection_end */
2438 /* _nxd_mqtt_client_connect_packet_send */
2439 /* */
2440 /* CALLED BY */
2441 /* */
2442 /* _nxd_mqtt_client_event_process */
2443 /* */
2444 /* RELEASE HISTORY */
2445 /* */
2446 /* DATE NAME DESCRIPTION */
2447 /* */
2448 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2449 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2450 /* resulting in version 6.1 */
2451 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
2452 /* mqtt over websocket, */
2453 /* resulting in version 6.2.0 */
2454 /* */
2455 /**************************************************************************/
_nxd_mqtt_tcp_establish_process(NXD_MQTT_CLIENT * client_ptr)2456 static VOID _nxd_mqtt_tcp_establish_process(NXD_MQTT_CLIENT *client_ptr)
2457 {
2458 UINT status;
2459
2460
2461 /* TCP connection is established. */
2462
2463 /* If TLS is enabled, start TLS */
2464 #ifdef NX_SECURE_ENABLE
2465 if (client_ptr -> nxd_mqtt_client_use_tls)
2466 {
2467 status = nx_secure_tls_session_start(&(client_ptr -> nxd_mqtt_tls_session), &(client_ptr -> nxd_mqtt_client_socket), NX_NO_WAIT);
2468
2469 if (status != NX_CONTINUE)
2470 {
2471
2472 /* End connection. */
2473 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2474
2475 /* Check callback function. */
2476 if (client_ptr -> nxd_mqtt_connect_notify)
2477 {
2478 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2479 }
2480
2481 return;
2482 }
2483
2484 /* TLS in progress. */
2485 client_ptr -> nxd_mqtt_tls_in_progress = NX_TRUE;
2486
2487 return;
2488 }
2489 #endif /* NX_SECURE_ENABLE */
2490
2491 #ifdef NXD_MQTT_OVER_WEBSOCKET
2492
2493 /* If using websocket, start websocket connection. */
2494 if (client_ptr -> nxd_mqtt_client_use_websocket)
2495 {
2496 status = nx_websocket_client_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_client_socket),
2497 client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
2498 client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
2499 (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
2500 NX_NO_WAIT);
2501
2502 if (status != NX_IN_PROGRESS)
2503 {
2504
2505 /* End connection. */
2506 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2507
2508 /* Check callback function. */
2509 if (client_ptr -> nxd_mqtt_connect_notify)
2510 {
2511 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2512 }
2513
2514 return;
2515 }
2516
2517 return;
2518 }
2519 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2520
2521 /* Start to send MQTT connect packet. */
2522 status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
2523
2524 /* Check status. */
2525 if (status)
2526 {
2527
2528 /* End connection. */
2529 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2530
2531 /* Check callback function. */
2532 if (client_ptr -> nxd_mqtt_connect_notify)
2533 {
2534 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2535 }
2536 }
2537 }
2538
2539
2540 #ifdef NX_SECURE_ENABLE
2541 /**************************************************************************/
2542 /* */
2543 /* FUNCTION RELEASE */
2544 /* */
2545 /* _nxd_mqtt_tls_establish_process PORTABLE C */
2546 /* 6.2.0 */
2547 /* AUTHOR */
2548 /* */
2549 /* Yuxin Zhou, Microsoft Corporation */
2550 /* */
2551 /* DESCRIPTION */
2552 /* */
2553 /* This function processes TLS connection establish event. */
2554 /* */
2555 /* INPUT */
2556 /* */
2557 /* client_ptr Pointer to MQTT Client */
2558 /* */
2559 /* OUTPUT */
2560 /* */
2561 /* None */
2562 /* */
2563 /* CALLS */
2564 /* */
2565 /* _nx_secure_tls_handshake_process */
2566 /* _nxd_mqtt_client_connect_packet_send */
2567 /* _nxd_mqtt_client_connection_end */
2568 /* */
2569 /* CALLED BY */
2570 /* */
2571 /* _nxd_mqtt_client_event_process */
2572 /* */
2573 /* RELEASE HISTORY */
2574 /* */
2575 /* DATE NAME DESCRIPTION */
2576 /* */
2577 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2578 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2579 /* resulting in version 6.1 */
2580 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
2581 /* mqtt over websocket, */
2582 /* resulting in version 6.2.0 */
2583 /* */
2584 /**************************************************************************/
_nxd_mqtt_tls_establish_process(NXD_MQTT_CLIENT * client_ptr)2585 static VOID _nxd_mqtt_tls_establish_process(NXD_MQTT_CLIENT *client_ptr)
2586 {
2587 UINT status;
2588
2589
2590 /* Directly call handshake process for async mode. */
2591 status = _nx_secure_tls_handshake_process(&(client_ptr -> nxd_mqtt_tls_session), NX_NO_WAIT);
2592 if (status == NX_SUCCESS)
2593 {
2594
2595 /* TLS session established. */
2596 client_ptr -> nxd_mqtt_tls_in_progress = NX_FALSE;
2597
2598 #ifdef NXD_MQTT_OVER_WEBSOCKET
2599
2600 /* If using websocket, start websocket connection. */
2601 if (client_ptr -> nxd_mqtt_client_use_websocket)
2602 {
2603 status = nx_websocket_client_secure_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_tls_session),
2604 client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
2605 client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
2606 (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
2607 NX_NO_WAIT);
2608
2609 if (status == NX_IN_PROGRESS)
2610 {
2611 return;
2612 }
2613 }
2614 else
2615 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2616 {
2617
2618 /* Start to send MQTT connect packet. */
2619 status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
2620 }
2621 }
2622 else if (status == NX_CONTINUE)
2623 {
2624 return;
2625 }
2626
2627 /* Check status. */
2628 if (status)
2629 {
2630
2631 /* End connection. */
2632 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2633
2634 /* Check callback function. */
2635 if (client_ptr -> nxd_mqtt_connect_notify)
2636 {
2637 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2638 }
2639 }
2640
2641 return;
2642 }
2643 #endif /* NX_SECURE_ENABLE */
2644
2645 /**************************************************************************/
2646 /* */
2647 /* FUNCTION RELEASE */
2648 /* */
2649 /* _nxd_mqtt_client_append_message PORTABLE C */
2650 /* 6.1 */
2651 /* AUTHOR */
2652 /* */
2653 /* Yuxin Zhou, Microsoft Corporation */
2654 /* */
2655 /* DESCRIPTION */
2656 /* */
2657 /* This function writes the message length and message in the outgoing */
2658 /* MQTT packet. */
2659 /* */
2660 /* INPUT */
2661 /* */
2662 /* client_ptr Pointer to MQTT Client */
2663 /* packet_ptr Outgoing MQTT packet */
2664 /* message Pointer to the message */
2665 /* length Length of the message */
2666 /* wait_option Wait option */
2667 /* */
2668 /* OUTPUT */
2669 /* */
2670 /* status */
2671 /* */
2672 /* CALLS */
2673 /* */
2674 /* nx_packet_data_append Append packet data */
2675 /* */
2676 /* CALLED BY */
2677 /* */
2678 /* _nxd_mqtt_client_sub_unsub */
2679 /* _nxd_mqtt_client_connect */
2680 /* _nxd_mqtt_client_publish */
2681 /* */
2682 /* RELEASE HISTORY */
2683 /* */
2684 /* DATE NAME DESCRIPTION */
2685 /* */
2686 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2687 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2688 /* resulting in version 6.1 */
2689 /* */
2690 /**************************************************************************/
_nxd_mqtt_client_append_message(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,CHAR * message,UINT length,ULONG wait_option)2691 UINT _nxd_mqtt_client_append_message(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, CHAR *message, UINT length, ULONG wait_option)
2692 {
2693 UINT ret = 0;
2694 UCHAR len[2];
2695
2696 len[0] = (length >> 8) & 0xFF;
2697 len[1] = length & 0xFF;
2698
2699 /* Append message length field. */
2700 ret = nx_packet_data_append(packet_ptr, len, 2, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
2701
2702 if (ret)
2703 {
2704 return(ret);
2705 }
2706
2707 if (length)
2708 {
2709 /* Copy the string into the packet. */
2710 ret = nx_packet_data_append(packet_ptr, message, length,
2711 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
2712 }
2713
2714 return(ret);
2715 }
2716
2717
2718 /**************************************************************************/
2719 /* */
2720 /* FUNCTION RELEASE */
2721 /* */
2722 /* _nxd_mqtt_client_connection_end PORTABLE C */
2723 /* 6.2.0 */
2724 /* AUTHOR */
2725 /* */
2726 /* Yuxin Zhou, Microsoft Corporation */
2727 /* */
2728 /* DESCRIPTION */
2729 /* */
2730 /* This function is used to end the MQTT connection. */
2731 /* */
2732 /* INPUT */
2733 /* */
2734 /* client_ptr Pointer to MQTT Client */
2735 /* wait_option Wait option */
2736 /* */
2737 /* OUTPUT */
2738 /* */
2739 /* None */
2740 /* */
2741 /* CALLS */
2742 /* */
2743 /* nx_secure_tls_session_end End TLS session */
2744 /* nx_secure_tls_session_delete Delete TLS session */
2745 /* nx_tcp_socket_disconnect Close TCP connection */
2746 /* nx_tcp_client_socket_unbind Unbind TCP socket */
2747 /* tx_timer_delete Delete timer */
2748 /* */
2749 /* CALLED BY */
2750 /* */
2751 /* _nxd_mqtt_client_connect */
2752 /* _nxd_mqtt_process_disconnect */
2753 /* */
2754 /* RELEASE HISTORY */
2755 /* */
2756 /* DATE NAME DESCRIPTION */
2757 /* */
2758 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2759 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2760 /* resulting in version 6.1 */
2761 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
2762 /* mqtt over websocket, */
2763 /* resulting in version 6.2.0 */
2764 /* */
2765 /**************************************************************************/
_nxd_mqtt_client_connection_end(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)2766 VOID _nxd_mqtt_client_connection_end(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
2767 {
2768
2769 /* Obtain the mutex. */
2770 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
2771
2772 /* Mark the session as terminated. */
2773 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
2774
2775 /* Release the mutex. */
2776 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2777
2778 #ifdef NXD_MQTT_OVER_WEBSOCKET
2779 if (client_ptr -> nxd_mqtt_client_use_websocket)
2780 {
2781 nx_websocket_client_disconnect(&(client_ptr -> nxd_mqtt_client_websocket), wait_option);
2782 }
2783 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2784
2785 #ifdef NX_SECURE_ENABLE
2786 if (client_ptr -> nxd_mqtt_client_use_tls)
2787 {
2788 nx_secure_tls_session_end(&(client_ptr -> nxd_mqtt_tls_session), wait_option);
2789 nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
2790 }
2791 #endif
2792 nx_tcp_socket_disconnect(&(client_ptr -> nxd_mqtt_client_socket), wait_option);
2793 nx_tcp_client_socket_unbind(&(client_ptr -> nxd_mqtt_client_socket));
2794
2795 /* Disable timer if timer has been started. */
2796 if (client_ptr -> nxd_mqtt_keepalive)
2797 {
2798 tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
2799 }
2800 }
2801
2802
2803 static UINT _nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT *client_ptr, UCHAR header_value);
2804
2805
2806 /* MQTT internal function */
2807
2808 /**************************************************************************/
2809 /* */
2810 /* FUNCTION RELEASE */
2811 /* */
2812 /* _nxd_mqtt_periodic_timer_entry PORTABLE C */
2813 /* 6.1 */
2814 /* AUTHOR */
2815 /* */
2816 /* Yuxin Zhou, Microsoft Corporation */
2817 /* */
2818 /* DESCRIPTION */
2819 /* */
2820 /* This internal function is passed to MQTT client socket create call. */
2821 /* This callback function notifies MQTT client thread when the TCP */
2822 /* connection is lost. */
2823 /* */
2824 /* */
2825 /* INPUT */
2826 /* */
2827 /* socket_ptr Pointer to TCP socket that */
2828 /* disconnected. */
2829 /* */
2830 /* OUTPUT */
2831 /* */
2832 /* None */
2833 /* */
2834 /* CALLS */
2835 /* */
2836 /* None */
2837 /* */
2838 /* CALLED BY */
2839 /* */
2840 /* TCP socket disconnect callback */
2841 /* */
2842 /* RELEASE HISTORY */
2843 /* */
2844 /* DATE NAME DESCRIPTION */
2845 /* */
2846 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2847 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
2848 /* resulting in version 6.1 */
2849 /* */
2850 /**************************************************************************/
_nxd_mqtt_periodic_timer_entry(ULONG client)2851 static VOID _nxd_mqtt_periodic_timer_entry(ULONG client)
2852 {
2853 /* Check if it is time to send out a ping message. */
2854 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)client;
2855
2856 /* If an outstanding ping response has not been received, and the client exceeds the time waiting for ping response,
2857 the client shall disconnect from the server. */
2858 if (client_ptr -> nxd_mqtt_ping_not_responded)
2859 {
2860 /* If current time is greater than the ping timeout */
2861 if ((tx_time_get() - client_ptr -> nxd_mqtt_ping_sent_time) >= client_ptr -> nxd_mqtt_ping_timeout)
2862 {
2863 /* Ping timed out. Need to terminate the connection. */
2864 #ifndef NXD_MQTT_CLOUD_ENABLE
2865 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_PING_TIMEOUT_EVENT, TX_OR);
2866 #else
2867 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PING_TIMEOUT_EVENT);
2868 #endif /* NXD_MQTT_CLOUD_ENABLE */
2869
2870 return;
2871 }
2872 }
2873
2874 /* About to timeout? */
2875 if ((client_ptr -> nxd_mqtt_timeout - tx_time_get()) <= client_ptr -> nxd_mqtt_timer_value)
2876 {
2877 /* Set the flag so the MQTT thread can send the ping. */
2878 #ifndef NXD_MQTT_CLOUD_ENABLE
2879 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_TIMEOUT_EVENT, TX_OR);
2880 #else
2881 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_TIMEOUT_EVENT);
2882 #endif /* NXD_MQTT_CLOUD_ENABLE */
2883 }
2884
2885 /* If keepalive is not enabled, just return. */
2886 return;
2887 }
2888
2889
2890
2891 /**************************************************************************/
2892 /* */
2893 /* FUNCTION RELEASE */
2894 /* */
2895 /* _nxd_mqtt_client_event_process PORTABLE C */
2896 /* 6.1 */
2897 /* AUTHOR */
2898 /* */
2899 /* Yuxin Zhou, Microsoft Corporation */
2900 /* */
2901 /* DESCRIPTION */
2902 /* */
2903 /* This internal function serves as the entry point for the MQTT */
2904 /* client thread. */
2905 /* */
2906 /* INPUT */
2907 /* */
2908 /* mqtt_client Pointer to MQTT Client */
2909 /* */
2910 /* OUTPUT */
2911 /* */
2912 /* None */
2913 /* */
2914 /* CALLS */
2915 /* */
2916 /* _nxd_mqtt_release_transmit_packet */
2917 /* _nxd_mqtt_release_receive_packet */
2918 /* _nxd_mqtt_send_simple_message */
2919 /* _nxd_mqtt_process_disconnect */
2920 /* _nxd_mqtt_packet_receive_process */
2921 /* tx_timer_delete */
2922 /* tx_event_flags_delete */
2923 /* nx_tcp_socket_delete */
2924 /* tx_timer_delete */
2925 /* tx_mutex_delete */
2926 /* */
2927 /* CALLED BY */
2928 /* */
2929 /* _nxd_mqtt_client_create */
2930 /* */
2931 /* RELEASE HISTORY */
2932 /* */
2933 /* DATE NAME DESCRIPTION */
2934 /* */
2935 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
2936 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
2937 /* corrected mqtt client state,*/
2938 /* resulting in version 6.1 */
2939 /* */
2940 /**************************************************************************/
_nxd_mqtt_client_event_process(VOID * mqtt_client,ULONG common_events,ULONG module_own_events)2941 static VOID _nxd_mqtt_client_event_process(VOID *mqtt_client, ULONG common_events, ULONG module_own_events)
2942 {
2943 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)mqtt_client;
2944
2945
2946 /* Obtain the mutex. */
2947 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
2948
2949 /* Process common events. */
2950 NX_PARAMETER_NOT_USED(common_events);
2951
2952 if (module_own_events & MQTT_TIMEOUT_EVENT)
2953 {
2954 /* Send out PING only if the client is connected. */
2955 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2956 {
2957 _nxd_mqtt_send_simple_message(client_ptr, MQTT_CONTROL_PACKET_TYPE_PINGREQ);
2958 }
2959 }
2960
2961 if (module_own_events & MQTT_TCP_ESTABLISH_EVENT)
2962 {
2963 _nxd_mqtt_tcp_establish_process(client_ptr);
2964 }
2965
2966 if (module_own_events & MQTT_PACKET_RECEIVE_EVENT)
2967 {
2968 #ifdef NX_SECURE_ENABLE
2969 /* TLS in progress on async mode. */
2970 if (client_ptr -> nxd_mqtt_tls_in_progress)
2971 {
2972 _nxd_mqtt_tls_establish_process(client_ptr);
2973 }
2974 else
2975 #endif /* NX_SECURE_ENABLE */
2976
2977 _nxd_mqtt_packet_receive_process(client_ptr);
2978 }
2979
2980 if (module_own_events & MQTT_PING_TIMEOUT_EVENT)
2981 {
2982 /* The server/broker didn't respond to our ping request message. Disconnect from the server. */
2983 _nxd_mqtt_process_disconnect(client_ptr);
2984 }
2985 if (module_own_events & MQTT_NETWORK_DISCONNECT_EVENT)
2986 {
2987 /* The server closed TCP socket. We shall go through the disconnect code path. */
2988 _nxd_mqtt_process_disconnect(client_ptr);
2989 }
2990
2991 if (module_own_events & MQTT_DELETE_EVENT)
2992 {
2993
2994 /* Stop the client and disconnect from the server. */
2995 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2996 {
2997 _nxd_mqtt_process_disconnect(client_ptr);
2998 }
2999
3000 /* Delete the timer. Check first if it is already deleted. */
3001 if ((client_ptr -> nxd_mqtt_timer).tx_timer_id != 0)
3002 tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
3003
3004 #ifndef NXD_MQTT_CLOUD_ENABLE
3005 /* Delete the event flag. Check first if it is already deleted. */
3006 if ((client_ptr -> nxd_mqtt_events).tx_event_flags_group_id != 0)
3007 tx_event_flags_delete(&client_ptr -> nxd_mqtt_events);
3008 #endif /* NXD_MQTT_CLOUD_ENABLE */
3009
3010 /* Release all the messages on the receive queue. */
3011 while (client_ptr -> message_receive_queue_head)
3012 {
3013 _nxd_mqtt_release_receive_packet(client_ptr, client_ptr -> message_receive_queue_head, NX_NULL);
3014 }
3015 client_ptr -> message_receive_queue_depth = 0;
3016
3017 /* Delete all the messages sitting in the receive and transmit queue. */
3018 while (client_ptr -> message_transmit_queue_head)
3019 {
3020 _nxd_mqtt_release_transmit_packet(client_ptr, client_ptr -> message_transmit_queue_head, NX_NULL);
3021 }
3022
3023 /* Release mutex */
3024 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3025
3026 #ifndef NXD_MQTT_CLOUD_ENABLE
3027 /* Delete the mutex. */
3028 tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3029 #endif /* NXD_MQTT_CLOUD_ENABLE */
3030
3031 /* Deleting the socket, (the socket ID is cleared); this signals it is ok to delete this thread. */
3032 nx_tcp_socket_delete(&client_ptr -> nxd_mqtt_client_socket);
3033 }
3034 else
3035 {
3036
3037 /* Release mutex */
3038 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3039 }
3040 }
3041
3042
3043 #ifndef NXD_MQTT_CLOUD_ENABLE
3044 /**************************************************************************/
3045 /* */
3046 /* FUNCTION RELEASE */
3047 /* */
3048 /* _nxd_mqtt_thread_entry PORTABLE C */
3049 /* 6.1 */
3050 /* AUTHOR */
3051 /* */
3052 /* Yuxin Zhou, Microsoft Corporation */
3053 /* */
3054 /* DESCRIPTION */
3055 /* */
3056 /* This internal function serves as the entry point for the MQTT */
3057 /* client thread. */
3058 /* */
3059 /* INPUT */
3060 /* */
3061 /* mqtt_client Pointer to MQTT Client */
3062 /* */
3063 /* OUTPUT */
3064 /* */
3065 /* None */
3066 /* */
3067 /* CALLS */
3068 /* */
3069 /* tx_event_flags_get */
3070 /* _nxd_mqtt_client_event_process */
3071 /* */
3072 /* CALLED BY */
3073 /* */
3074 /* _nxd_mqtt_client_create */
3075 /* */
3076 /* RELEASE HISTORY */
3077 /* */
3078 /* DATE NAME DESCRIPTION */
3079 /* */
3080 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3081 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3082 /* resulting in version 6.1 */
3083 /* */
3084 /**************************************************************************/
_nxd_mqtt_thread_entry(ULONG mqtt_client)3085 static VOID _nxd_mqtt_thread_entry(ULONG mqtt_client)
3086 {
3087 NXD_MQTT_CLIENT *client_ptr;
3088 ULONG events;
3089
3090 client_ptr = (NXD_MQTT_CLIENT *)mqtt_client;
3091
3092 /* Loop to process events on the MQTT client */
3093 for (;;)
3094 {
3095
3096 tx_event_flags_get(&client_ptr -> nxd_mqtt_events, MQTT_ALL_EVENTS, TX_OR_CLEAR, &events, TX_WAIT_FOREVER);
3097
3098 /* Call the event processing routine. */
3099 _nxd_mqtt_client_event_process(client_ptr, NX_NULL, events);
3100
3101 if (events & MQTT_DELETE_EVENT)
3102 {
3103 break;
3104 }
3105 }
3106 }
3107 #endif /* NXD_MQTT_CLOUD_ENABLE */
3108
3109
3110 /**************************************************************************/
3111 /* */
3112 /* FUNCTION RELEASE */
3113 /* */
3114 /* _mqtt_client_disconnect_callback PORTABLE C */
3115 /* 6.1 */
3116 /* AUTHOR */
3117 /* */
3118 /* Yuxin Zhou, Microsoft Corporation */
3119 /* */
3120 /* DESCRIPTION */
3121 /* */
3122 /* This internal function is passed to MQTT client socket create call. */
3123 /* This callback function notifies MQTT client thread when the TCP */
3124 /* connection is lost. */
3125 /* */
3126 /* */
3127 /* INPUT */
3128 /* */
3129 /* socket_ptr Pointer to TCP socket that */
3130 /* disconnected. */
3131 /* */
3132 /* OUTPUT */
3133 /* */
3134 /* None */
3135 /* */
3136 /* CALLS */
3137 /* */
3138 /* None */
3139 /* */
3140 /* CALLED BY */
3141 /* */
3142 /* TCP socket disconnect callback */
3143 /* */
3144 /* RELEASE HISTORY */
3145 /* */
3146 /* DATE NAME DESCRIPTION */
3147 /* */
3148 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3149 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3150 /* resulting in version 6.1 */
3151 /* */
3152 /**************************************************************************/
_mqtt_client_disconnect_callback(NX_TCP_SOCKET * socket_ptr)3153 static VOID _mqtt_client_disconnect_callback(NX_TCP_SOCKET *socket_ptr)
3154 {
3155 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)(socket_ptr -> nx_tcp_socket_reserved_ptr);
3156
3157 /* Set the MQTT_NETWORK_DISCONNECT event. This event indicates
3158 that the disconnect is initiated from the network. */
3159 #ifndef NXD_MQTT_CLOUD_ENABLE
3160 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_NETWORK_DISCONNECT_EVENT, TX_OR);
3161 #else
3162 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_NETWORK_DISCONNECT_EVENT);
3163 #endif /* NXD_MQTT_CLOUD_ENABLE */
3164
3165 return;
3166 }
3167
3168
3169 /**************************************************************************/
3170 /* */
3171 /* FUNCTION RELEASE */
3172 /* */
3173 /* _nxd_mqtt_client_create PORTABLE C */
3174 /* 6.1 */
3175 /* AUTHOR */
3176 /* */
3177 /* Yuxin Zhou, Microsoft Corporation */
3178 /* */
3179 /* DESCRIPTION */
3180 /* */
3181 /* This function creates an instance for MQTT Client. It initializes */
3182 /* MQTT Client control block, creates a thread, mutex and event queue */
3183 /* for MQTT Client, and creates the TCP socket for MQTT messaging. */
3184 /* */
3185 /* */
3186 /* INPUT */
3187 /* */
3188 /* client_ptr Pointer to MQTT Client */
3189 /* client_id Client ID for this instance */
3190 /* client_id_length Length of Client ID, in bytes */
3191 /* ip_ptr Pointer to IP instance */
3192 /* pool_ptr Pointer to packet pool */
3193 /* stack_ptr Client thread's stack pointer */
3194 /* stack_size Client thread's stack size */
3195 /* mqtt_thread_priority Priority for MQTT thread */
3196 /* memory_ptr Deprecated and not used */
3197 /* memory_size Deprecated and not used */
3198 /* */
3199 /* OUTPUT */
3200 /* */
3201 /* status Completion status */
3202 /* */
3203 /* CALLS */
3204 /* */
3205 /* _nxd_mqtt_client_create_internal Create mqtt client */
3206 /* */
3207 /* CALLED BY */
3208 /* */
3209 /* Application Code */
3210 /* */
3211 /* RELEASE HISTORY */
3212 /* */
3213 /* DATE NAME DESCRIPTION */
3214 /* */
3215 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3216 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
3217 /* corrected mqtt client state,*/
3218 /* resulting in version 6.1 */
3219 /* */
3220 /**************************************************************************/
_nxd_mqtt_client_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority,VOID * memory_ptr,ULONG memory_size)3221 UINT _nxd_mqtt_client_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
3222 CHAR *client_id, UINT client_id_length,
3223 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
3224 VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority,
3225 VOID *memory_ptr, ULONG memory_size)
3226 {
3227
3228 UINT status;
3229
3230
3231 NX_PARAMETER_NOT_USED(memory_ptr);
3232 NX_PARAMETER_NOT_USED(memory_size);
3233
3234 /* Create MQTT client. */
3235 status = _nxd_mqtt_client_create_internal(client_ptr, client_name, client_id, client_id_length, ip_ptr,
3236 pool_ptr, stack_ptr, stack_size, mqtt_thread_priority);
3237
3238 /* Check status. */
3239 if (status)
3240 {
3241 return(status);
3242 }
3243
3244 #ifdef NXD_MQTT_CLOUD_ENABLE
3245
3246 /* Create cloud helper. */
3247 status = nx_cloud_create(&(client_ptr -> nxd_mqtt_client_cloud), "Cloud Helper", stack_ptr, stack_size, mqtt_thread_priority);
3248
3249 /* Determine if an error occurred. */
3250 if (status != NX_SUCCESS)
3251 {
3252
3253 /* Delete internal resource created in _nxd_mqtt_client_create_internal(). */
3254
3255 /* Delete socket. */
3256 nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
3257
3258 return(NXD_MQTT_INTERNAL_ERROR);
3259 }
3260
3261 /* Save the cloud pointer. */
3262 client_ptr -> nxd_mqtt_client_cloud_ptr = &(client_ptr -> nxd_mqtt_client_cloud);
3263
3264 /* Save the mutex pointer. */
3265 client_ptr -> nxd_mqtt_client_mutex_ptr = &(client_ptr -> nxd_mqtt_client_cloud.nx_cloud_mutex);
3266
3267 /* Register MQTT on cloud helper. */
3268 status = nx_cloud_module_register(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module), client_name, NX_CLOUD_MODULE_MQTT_EVENT,
3269 _nxd_mqtt_client_event_process, client_ptr);
3270
3271 /* Determine if an error occurred. */
3272 if (status != NX_SUCCESS)
3273 {
3274
3275 /* Delete own created cloud. */
3276 nx_cloud_delete(&(client_ptr -> nxd_mqtt_client_cloud));
3277
3278 /* Delete internal resource created in _nxd_mqtt_client_create_internal(). */
3279
3280 /* Delete socket. */
3281 nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
3282
3283 return(NXD_MQTT_INTERNAL_ERROR);
3284 }
3285
3286 #endif /* NXD_MQTT_CLOUD_ENABLE */
3287
3288 /* Update state. */
3289 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
3290
3291 /* Return. */
3292 return(NXD_MQTT_SUCCESS);
3293 }
3294
3295
3296 /**************************************************************************/
3297 /* */
3298 /* FUNCTION RELEASE */
3299 /* */
3300 /* _nxd_mqtt_client_create_internal PORTABLE C */
3301 /* 6.1.12 */
3302 /* AUTHOR */
3303 /* */
3304 /* Yuxin Zhou, Microsoft Corporation */
3305 /* */
3306 /* DESCRIPTION */
3307 /* */
3308 /* This function creates an instance for MQTT Client. It initializes */
3309 /* MQTT Client control block, creates a thread, mutex and event queue */
3310 /* for MQTT Client, and creates the TCP socket for MQTT messaging. */
3311 /* */
3312 /* */
3313 /* INPUT */
3314 /* */
3315 /* client_ptr Pointer to MQTT Client */
3316 /* client_id Client ID for this instance */
3317 /* client_id_length Length of Client ID, in bytes */
3318 /* ip_ptr Pointer to IP instance */
3319 /* pool_ptr Pointer to packet pool */
3320 /* stack_ptr Client thread's stack pointer */
3321 /* stack_size Client thread's stack size */
3322 /* mqtt_thread_priority Priority for MQTT thread */
3323 /* */
3324 /* OUTPUT */
3325 /* */
3326 /* status Completion status */
3327 /* */
3328 /* CALLS */
3329 /* */
3330 /* tx_thread_create Create a thread */
3331 /* tx_mutex_create Create mutex */
3332 /* tx_mutex_get */
3333 /* tx_mutex_put */
3334 /* tx_event_flag_create Create event flag */
3335 /* */
3336 /* CALLED BY */
3337 /* */
3338 /* Application Code */
3339 /* */
3340 /* RELEASE HISTORY */
3341 /* */
3342 /* DATE NAME DESCRIPTION */
3343 /* */
3344 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3345 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
3346 /* corrected mqtt client state,*/
3347 /* resulting in version 6.1 */
3348 /* 07-29-2022 Spencer McDonough Modified comment(s), and */
3349 /* improved internal logic, */
3350 /* resulting in version 6.1.12 */
3351 /* */
3352 /**************************************************************************/
_nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority)3353 static UINT _nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
3354 CHAR *client_id, UINT client_id_length,
3355 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
3356 VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority)
3357 {
3358 #ifndef NXD_MQTT_CLOUD_ENABLE
3359 UINT status;
3360 #endif /* NXD_MQTT_CLOUD_ENABLE */
3361
3362 #ifdef NXD_MQTT_CLOUD_ENABLE
3363 NX_PARAMETER_NOT_USED(stack_ptr);
3364 NX_PARAMETER_NOT_USED(stack_size);
3365 NX_PARAMETER_NOT_USED(mqtt_thread_priority);
3366 #endif /* NXD_MQTT_CLOUD_ENABLE */
3367
3368 /* Clear the MQTT Client control block. */
3369 NXD_MQTT_SECURE_MEMSET((void *)client_ptr, 0, sizeof(NXD_MQTT_CLIENT));
3370
3371 #ifndef NXD_MQTT_CLOUD_ENABLE
3372
3373 /* Create MQTT mutex. */
3374 status = tx_mutex_create(&client_ptr -> nxd_mqtt_protection, client_name, TX_NO_INHERIT);
3375
3376 /* Determine if an error occurred. */
3377 if (status != TX_SUCCESS)
3378 {
3379
3380 return(NXD_MQTT_INTERNAL_ERROR);
3381 }
3382 client_ptr -> nxd_mqtt_client_mutex_ptr = &(client_ptr -> nxd_mqtt_protection);
3383
3384 /* Now create MQTT client thread */
3385 status = tx_thread_create(&(client_ptr -> nxd_mqtt_thread), client_name, _nxd_mqtt_thread_entry,
3386 (ULONG)client_ptr, stack_ptr, stack_size, mqtt_thread_priority, mqtt_thread_priority,
3387 NXD_MQTT_CLIENT_THREAD_TIME_SLICE, TX_DONT_START);
3388
3389 /* Determine if an error occurred. */
3390 if (status != TX_SUCCESS)
3391 {
3392 /* Delete the mutex. */
3393 tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3394
3395 /* Return error code. */
3396 return(NXD_MQTT_INTERNAL_ERROR);
3397 }
3398
3399 status = tx_event_flags_create(&(client_ptr -> nxd_mqtt_events), client_name);
3400
3401 if (status != TX_SUCCESS)
3402 {
3403 /* Delete the mutex. */
3404 tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3405
3406 /* Delete the thread. */
3407 tx_thread_delete(&(client_ptr -> nxd_mqtt_thread));
3408
3409 /* Return error code. */
3410 return(NXD_MQTT_INTERNAL_ERROR);
3411 }
3412 #endif /* NXD_MQTT_CLOUD_ENABLE */
3413
3414 /* Record the client ID information. */
3415 client_ptr -> nxd_mqtt_client_id = client_id;
3416 client_ptr -> nxd_mqtt_client_id_length = client_id_length;
3417 client_ptr -> nxd_mqtt_client_ip_ptr = ip_ptr;
3418 client_ptr -> nxd_mqtt_client_packet_pool_ptr = pool_ptr;
3419 client_ptr -> nxd_mqtt_client_name = client_name;
3420
3421 /* Create the socket. */
3422 nx_tcp_socket_create(client_ptr -> nxd_mqtt_client_ip_ptr, &(client_ptr -> nxd_mqtt_client_socket), client_ptr -> nxd_mqtt_client_name,
3423 NX_IP_NORMAL, NX_DONT_FRAGMENT, 0x80, NXD_MQTT_CLIENT_SOCKET_WINDOW_SIZE,
3424 NX_NULL, _mqtt_client_disconnect_callback);
3425
3426
3427 /* Record the client_ptr in the socket structure. */
3428 client_ptr -> nxd_mqtt_client_socket.nx_tcp_socket_reserved_ptr = (VOID *)client_ptr;
3429
3430 #ifndef NXD_MQTT_CLOUD_ENABLE
3431 /* Start MQTT thread. */
3432 tx_thread_resume(&(client_ptr -> nxd_mqtt_thread));
3433 #endif /* NXD_MQTT_CLOUD_ENABLE */
3434 return(NXD_MQTT_SUCCESS);
3435 }
3436
3437
3438 /**************************************************************************/
3439 /* */
3440 /* FUNCTION RELEASE */
3441 /* */
3442 /* _nxd_mqtt_client_login_set PORTABLE C */
3443 /* 6.1 */
3444 /* AUTHOR */
3445 /* */
3446 /* Yuxin Zhou, Microsoft Corporation */
3447 /* */
3448 /* DESCRIPTION */
3449 /* */
3450 /* This function sets optional MQTT username and password. Note */
3451 /* if the broker requires username and password, this information */
3452 /* must be set prior to calling nxd_mqtt_client_connect or */
3453 /* nxd_mqtt_client_secure_connect. */
3454 /* */
3455 /* */
3456 /* INPUT */
3457 /* */
3458 /* client_ptr Pointer to MQTT Client */
3459 /* username User name, or NULL if user */
3460 /* name is not required */
3461 /* username_length Length of the user name, or */
3462 /* 0 if user name is NULL */
3463 /* password Password string, or NULL if */
3464 /* password is not required */
3465 /* password_length Length of the password, or */
3466 /* 0 if password is NULL */
3467 /* */
3468 /* OUTPUT */
3469 /* */
3470 /* status Completion status */
3471 /* */
3472 /* CALLS */
3473 /* */
3474 /* tx_mutex_get */
3475 /* */
3476 /* CALLED BY */
3477 /* */
3478 /* Application Code */
3479 /* */
3480 /* RELEASE HISTORY */
3481 /* */
3482 /* DATE NAME DESCRIPTION */
3483 /* */
3484 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3485 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3486 /* resulting in version 6.1 */
3487 /* */
3488 /**************************************************************************/
_nxd_mqtt_client_login_set(NXD_MQTT_CLIENT * client_ptr,CHAR * username,UINT username_length,CHAR * password,UINT password_length)3489 UINT _nxd_mqtt_client_login_set(NXD_MQTT_CLIENT *client_ptr,
3490 CHAR *username, UINT username_length, CHAR *password, UINT password_length)
3491 {
3492 UINT status;
3493
3494 /* Obtain the mutex. */
3495 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3496
3497 if (status != TX_SUCCESS)
3498 {
3499 return(NXD_MQTT_MUTEX_FAILURE);
3500 }
3501 client_ptr -> nxd_mqtt_client_username = username;
3502 client_ptr -> nxd_mqtt_client_username_length = (USHORT)username_length;
3503 client_ptr -> nxd_mqtt_client_password = password;
3504 client_ptr -> nxd_mqtt_client_password_length = (USHORT)password_length;
3505
3506 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3507
3508 return(NX_SUCCESS);
3509 }
3510
3511 /**************************************************************************/
3512 /* */
3513 /* FUNCTION RELEASE */
3514 /* */
3515 /* _nxd_mqtt_client_will_message_set PORTABLE C */
3516 /* 6.1 */
3517 /* AUTHOR */
3518 /* */
3519 /* Yuxin Zhou, Microsoft Corporation */
3520 /* */
3521 /* DESCRIPTION */
3522 /* */
3523 /* This function sets optional MQTT will topic and will message. */
3524 /* Note that if will message is needed, application must set will */
3525 /* message prior to calling nxd_mqtt_client_connect or */
3526 /* nxd_mqtt_client_secure_connect. */
3527 /* */
3528 /* */
3529 /* INPUT */
3530 /* */
3531 /* client_ptr Pointer to MQTT Client */
3532 /* will_topic Will topic string. */
3533 /* will_topic_length Length of the will topic. */
3534 /* will_message Will message string. */
3535 /* will_message_length Length of the will message. */
3536 /* will_retain_flag Whether or not the will */
3537 /* message is to be retained */
3538 /* when it is published. */
3539 /* Valid values are NX_TRUE */
3540 /* NX_FALSE */
3541 /* will_QoS QoS level to be used when */
3542 /* publishing will message. */
3543 /* Valid values are 0, 1, 2 */
3544 /* */
3545 /* OUTPUT */
3546 /* */
3547 /* status Completion status */
3548 /* */
3549 /* CALLS */
3550 /* */
3551 /* tx_mutex_get */
3552 /* */
3553 /* CALLED BY */
3554 /* */
3555 /* Application Code */
3556 /* */
3557 /* RELEASE HISTORY */
3558 /* */
3559 /* DATE NAME DESCRIPTION */
3560 /* */
3561 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3562 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3563 /* resulting in version 6.1 */
3564 /* */
3565 /**************************************************************************/
_nxd_mqtt_client_will_message_set(NXD_MQTT_CLIENT * client_ptr,const UCHAR * will_topic,UINT will_topic_length,const UCHAR * will_message,UINT will_message_length,UINT will_retain_flag,UINT will_QoS)3566 UINT _nxd_mqtt_client_will_message_set(NXD_MQTT_CLIENT *client_ptr,
3567 const UCHAR *will_topic, UINT will_topic_length, const UCHAR *will_message,
3568 UINT will_message_length, UINT will_retain_flag, UINT will_QoS)
3569 {
3570 UINT status;
3571
3572 if (will_QoS == 2)
3573 {
3574 return(NXD_MQTT_QOS2_NOT_SUPPORTED);
3575 }
3576
3577 /* Obtain the mutex. */
3578 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3579
3580 if (status != TX_SUCCESS)
3581 {
3582 return(NXD_MQTT_MUTEX_FAILURE);
3583 }
3584
3585 client_ptr -> nxd_mqtt_client_will_topic = will_topic;
3586 client_ptr -> nxd_mqtt_client_will_topic_length = will_topic_length;
3587 client_ptr -> nxd_mqtt_client_will_message = will_message;
3588 client_ptr -> nxd_mqtt_client_will_message_length = will_message_length;
3589
3590 if (will_retain_flag == NX_TRUE)
3591 {
3592 client_ptr -> nxd_mqtt_client_will_qos_retain = 0x80;
3593 }
3594 client_ptr -> nxd_mqtt_client_will_qos_retain = (UCHAR)(client_ptr -> nxd_mqtt_client_will_qos_retain | will_QoS);
3595
3596 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3597
3598 return(NX_SUCCESS);
3599 }
3600
3601
3602 /**************************************************************************/
3603 /* */
3604 /* FUNCTION RELEASE */
3605 /* */
3606 /* _nxde_mqtt_client_will_message_set PORTABLE C */
3607 /* 6.1 */
3608 /* AUTHOR */
3609 /* */
3610 /* Yuxin Zhou, Microsoft Corporation */
3611 /* */
3612 /* DESCRIPTION */
3613 /* */
3614 /* This function checks for errors in the */
3615 /* nxd_mqtt_client_will_message_set call. */
3616 /* */
3617 /* */
3618 /* INPUT */
3619 /* */
3620 /* client_ptr Pointer to MQTT Client */
3621 /* will_topic Will topic string. */
3622 /* will_topic_length Length of the will topic. */
3623 /* will_message Will message string. */
3624 /* will_message_length Length of the will message. */
3625 /* will_retain_flag Whether or not the will */
3626 /* message is to be retained */
3627 /* when it is published. */
3628 /* Valid values are NX_TRUE */
3629 /* NX_FALSE */
3630 /* will_QoS QoS level to be used when */
3631 /* publishing will message. */
3632 /* Valid values are 0, 1, 2 */
3633 /* */
3634 /* OUTPUT */
3635 /* */
3636 /* status Completion status */
3637 /* */
3638 /* CALLS */
3639 /* */
3640 /* _nxd_mqtt_client_will_message_set */
3641 /* */
3642 /* CALLED BY */
3643 /* */
3644 /* Application Code */
3645 /* */
3646 /* RELEASE HISTORY */
3647 /* */
3648 /* DATE NAME DESCRIPTION */
3649 /* */
3650 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3651 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3652 /* resulting in version 6.1 */
3653 /* */
3654 /**************************************************************************/
_nxde_mqtt_client_will_message_set(NXD_MQTT_CLIENT * client_ptr,const UCHAR * will_topic,UINT will_topic_length,const UCHAR * will_message,UINT will_message_length,UINT will_retain_flag,UINT will_QoS)3655 UINT _nxde_mqtt_client_will_message_set(NXD_MQTT_CLIENT *client_ptr,
3656 const UCHAR *will_topic, UINT will_topic_length, const UCHAR *will_message,
3657 UINT will_message_length, UINT will_retain_flag, UINT will_QoS)
3658 {
3659
3660 if (client_ptr == NX_NULL)
3661 {
3662 return(NXD_MQTT_INVALID_PARAMETER);
3663 }
3664
3665 /* Valid will_topic string. The will topic string cannot be NULL. */
3666 if ((will_topic == NX_NULL) || (will_topic_length == 0))
3667 {
3668 return(NXD_MQTT_INVALID_PARAMETER);
3669 }
3670
3671 if ((will_retain_flag != NX_TRUE) && (will_retain_flag != NX_FALSE))
3672 {
3673 return(NXD_MQTT_INVALID_PARAMETER);
3674 }
3675
3676 if (will_QoS > 2)
3677 {
3678 return(NXD_MQTT_INVALID_PARAMETER);
3679 }
3680
3681 return(_nxd_mqtt_client_will_message_set(client_ptr, will_topic, will_topic_length, will_message,
3682 will_message_length, will_retain_flag, will_QoS));
3683 }
3684
3685 /**************************************************************************/
3686 /* */
3687 /* FUNCTION RELEASE */
3688 /* */
3689 /* _nxde_mqtt_client_login_set PORTABLE C */
3690 /* 6.1 */
3691 /* AUTHOR */
3692 /* */
3693 /* Yuxin Zhou, Microsoft Corporation */
3694 /* */
3695 /* DESCRIPTION */
3696 /* */
3697 /* This function checks for errors in the nxd_mqtt_client_login call. */
3698 /* */
3699 /* */
3700 /* INPUT */
3701 /* */
3702 /* client_ptr Pointer to MQTT Client */
3703 /* username User name, or NULL if user */
3704 /* name is not required */
3705 /* username_length Length of the user name, or */
3706 /* 0 if user name is NULL */
3707 /* password Password string, or NULL if */
3708 /* password is not required */
3709 /* password_length Length of the password, or */
3710 /* 0 if password is NULL */
3711 /* */
3712 /* OUTPUT */
3713 /* */
3714 /* status Completion status */
3715 /* */
3716 /* CALLS */
3717 /* */
3718 /* _nxd_mqtt_client_login_set */
3719 /* */
3720 /* CALLED BY */
3721 /* */
3722 /* Application Code */
3723 /* */
3724 /* RELEASE HISTORY */
3725 /* */
3726 /* DATE NAME DESCRIPTION */
3727 /* */
3728 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3729 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3730 /* resulting in version 6.1 */
3731 /* */
3732 /**************************************************************************/
_nxde_mqtt_client_login_set(NXD_MQTT_CLIENT * client_ptr,CHAR * username,UINT username_length,CHAR * password,UINT password_length)3733 UINT _nxde_mqtt_client_login_set(NXD_MQTT_CLIENT *client_ptr,
3734 CHAR *username, UINT username_length, CHAR *password, UINT password_length)
3735 {
3736 if (client_ptr == NX_NULL)
3737 {
3738 return(NXD_MQTT_INVALID_PARAMETER);
3739 }
3740
3741 /* Username and username length don't match,
3742 or password and password length don't match. */
3743 if (((username == NX_NULL) && (username_length != 0)) ||
3744 ((password == NX_NULL) && (password_length != 0)))
3745 {
3746 return(NXD_MQTT_INVALID_PARAMETER);
3747 }
3748
3749 return(_nxd_mqtt_client_login_set(client_ptr, username, username_length, password, password_length));
3750 }
3751
3752
3753 /**************************************************************************/
3754 /* */
3755 /* FUNCTION RELEASE */
3756 /* */
3757 /* _nxd_mqtt_client_retransmit_message PORTABLE C */
3758 /* 6.2.0 */
3759 /* AUTHOR */
3760 /* */
3761 /* Yuxin Zhou, Microsoft Corporation */
3762 /* */
3763 /* DESCRIPTION */
3764 /* */
3765 /* This function retransmit QoS1 messages upon reconnection, if the */
3766 /* connection is not set CLEAN_SESSION. */
3767 /* */
3768 /* */
3769 /* INPUT */
3770 /* */
3771 /* client_ptr Pointer to MQTT Client */
3772 /* wait_option Timeout value */
3773 /* */
3774 /* OUTPUT */
3775 /* */
3776 /* status Completion status */
3777 /* */
3778 /* CALLS */
3779 /* */
3780 /* tx_mutex_get */
3781 /* tx_mutex_put */
3782 /* _nxd_mqtt_packet_send */
3783 /* nx_packet_release */
3784 /* tx_time_get */
3785 /* nx_packet_copy */
3786 /* */
3787 /* CALLED BY */
3788 /* */
3789 /* Application Code */
3790 /* */
3791 /* RELEASE HISTORY */
3792 /* */
3793 /* DATE NAME DESCRIPTION */
3794 /* */
3795 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3796 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
3797 /* resulting in version 6.1 */
3798 /* 10-31-2022 Bo Chen Modified comment(s), improved */
3799 /* the logic of sending packet,*/
3800 /* resulting in version 6.2.0 */
3801 /* */
3802 /**************************************************************************/
_nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)3803 static UINT _nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
3804 {
3805 NX_PACKET *transmit_packet_ptr;
3806 NX_PACKET *packet_ptr;
3807 UINT status = NXD_MQTT_SUCCESS;
3808 UINT mutex_status;
3809 UCHAR fixed_header;
3810
3811 transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
3812
3813 while (transmit_packet_ptr)
3814 {
3815 fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
3816
3817 if ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4))
3818 {
3819
3820 /* Retransmit publish packet only. */
3821 /* Obtain a NetX Packet. */
3822 status = nx_packet_copy(transmit_packet_ptr, &packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
3823
3824 if (status != NXD_MQTT_SUCCESS)
3825 {
3826 return(NXD_MQTT_PACKET_POOL_FAILURE);
3827 }
3828
3829 /* Release the mutex. */
3830 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3831
3832 /* Send packet to server. */
3833 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
3834
3835 if (status)
3836 {
3837 /* Release the packet. */
3838 nx_packet_release(packet_ptr);
3839
3840 status = NXD_MQTT_COMMUNICATION_FAILURE;
3841 }
3842 /* Obtain the mutex. */
3843 mutex_status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, wait_option);
3844
3845 if (mutex_status != TX_SUCCESS)
3846 {
3847 return(NXD_MQTT_MUTEX_FAILURE);
3848 }
3849 if (status)
3850 {
3851 return(status);
3852 }
3853 }
3854 transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
3855 }
3856
3857 /* Update the timeout value. */
3858 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
3859
3860 return(status);
3861 }
3862
3863 /**************************************************************************/
3864 /* */
3865 /* FUNCTION RELEASE */
3866 /* */
3867 /* _nxd_mqtt_client_connect PORTABLE C */
3868 /* 6.2.0 */
3869 /* AUTHOR */
3870 /* */
3871 /* Yuxin Zhou, Microsoft Corporation */
3872 /* */
3873 /* DESCRIPTION */
3874 /* */
3875 /* This function makes an initial connection to the MQTT server. */
3876 /* */
3877 /* */
3878 /* INPUT */
3879 /* */
3880 /* client_ptr Pointer to MQTT Client */
3881 /* server_ip Server IP address structure */
3882 /* server_port Server port number, in host */
3883 /* byte order */
3884 /* keepalive Keepalive flag */
3885 /* clean_session Clean session flag */
3886 /* wait_option Timeout value */
3887 /* */
3888 /* */
3889 /* OUTPUT */
3890 /* */
3891 /* status Completion status */
3892 /* */
3893 /* CALLS */
3894 /* */
3895 /* tx_mutex_get */
3896 /* nx_tcp_socket_create Create TCP socket */
3897 /* nx_tcp_socket_receive_notify */
3898 /* nx_tcp_client_socket_bind */
3899 /* nxd_tcp_client_socket_connect */
3900 /* nx_tcp_client_socket_unbind */
3901 /* tx_mutex_put */
3902 /* nx_secure_tls_session_start */
3903 /* nx_secure_tls_session_send */
3904 /* nx_secure_tls_session_receive */
3905 /* _nxd_mqtt_packet_allocate */
3906 /* _nxd_mqtt_client_set_fixed_header */
3907 /* _nxd_mqtt_client_append_message */
3908 /* nx_tcp_socket_send */
3909 /* nx_packet_release */
3910 /* nx_tcp_socket_receive */
3911 /* tx_event_flag_set */
3912 /* _nxd_mqtt_release_transmit_packet */
3913 /* tx_timer_create */
3914 /* nx_secure_tls_session_receive */
3915 /* _nxd_mqtt_client_connection_end */
3916 /* */
3917 /* CALLED BY */
3918 /* */
3919 /* Application Code */
3920 /* */
3921 /* RELEASE HISTORY */
3922 /* */
3923 /* DATE NAME DESCRIPTION */
3924 /* */
3925 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
3926 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
3927 /* fixed return value when it */
3928 /* is set in CONNACK, corrected*/
3929 /* mqtt client state, */
3930 /* resulting in version 6.1 */
3931 /* 08-02-2021 Yuxin Zhou Modified comment(s), and */
3932 /* corrected the logic for */
3933 /* non-blocking mode, */
3934 /* resulting in version 6.1.8 */
3935 /* 07-29-2022 Spencer McDonough Modified comment(s), and */
3936 /* improved internal logic, */
3937 /* resulting in version 6.1.12 */
3938 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
3939 /* mqtt over websocket, */
3940 /* resulting in version 6.2.0 */
3941 /* */
3942 /**************************************************************************/
_nxd_mqtt_client_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT keepalive,UINT clean_session,ULONG wait_option)3943 UINT _nxd_mqtt_client_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
3944 UINT keepalive, UINT clean_session, ULONG wait_option)
3945 {
3946 NX_PACKET *packet_ptr;
3947 UINT status;
3948 TX_THREAD *thread_ptr;
3949 UINT new_priority;
3950 UINT old_priority;
3951
3952
3953 /* Obtain the mutex. */
3954 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3955
3956 if (status != TX_SUCCESS)
3957 {
3958 #ifdef NX_SECURE_ENABLE
3959 if (client_ptr -> nxd_mqtt_client_use_tls)
3960 {
3961 nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
3962 }
3963 #endif /* NX_SECURE_ENABLE */
3964
3965 return(NXD_MQTT_MUTEX_FAILURE);
3966 }
3967
3968 /* Do nothing if the client is already connected. */
3969 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
3970 {
3971 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3972 return(NXD_MQTT_ALREADY_CONNECTED);
3973 }
3974
3975 /* Check if client is connecting. */
3976 if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTING)
3977 {
3978 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3979 return(NXD_MQTT_CONNECTING);
3980 }
3981
3982 /* Client state must be in IDLE. */
3983 if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_IDLE)
3984 {
3985 #ifdef NX_SECURE_ENABLE
3986 if (client_ptr -> nxd_mqtt_client_use_tls)
3987 {
3988 nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
3989 }
3990 #endif /* NX_SECURE_ENABLE */
3991 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3992 return(NXD_MQTT_INVALID_STATE);
3993 }
3994
3995 #if defined(NX_SECURE_ENABLE) && defined(NXD_MQTT_REQUIRE_TLS)
3996 if (!client_ptr -> nxd_mqtt_client_use_tls)
3997 {
3998
3999 /* NXD_MQTT_REQUIRE_TLS is defined but the application does not use TLS.
4000 This is security violation. Return with failure code. */
4001 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4002 return(NXD_MQTT_CONNECT_FAILURE);
4003 }
4004 #endif /* NX_SECURE_ENABLE && NXD_MQTT_REQUIRE_TLS*/
4005
4006 /* Record the keepalive value, converted to TX timer ticks. */
4007 client_ptr -> nxd_mqtt_keepalive = keepalive * NX_IP_PERIODIC_RATE;
4008 if (keepalive)
4009 {
4010 client_ptr -> nxd_mqtt_timer_value = NXD_MQTT_KEEPALIVE_TIMER_RATE;
4011 client_ptr -> nxd_mqtt_ping_timeout = NXD_MQTT_PING_TIMEOUT_DELAY;
4012
4013 /* Create timer */
4014 tx_timer_create(&(client_ptr -> nxd_mqtt_timer), "MQTT Timer", _nxd_mqtt_periodic_timer_entry, (ULONG)client_ptr,
4015 client_ptr -> nxd_mqtt_timer_value, client_ptr -> nxd_mqtt_timer_value, TX_AUTO_ACTIVATE);
4016 }
4017 else
4018 {
4019 client_ptr -> nxd_mqtt_timer_value = 0;
4020 client_ptr -> nxd_mqtt_ping_timeout = 0;
4021 }
4022
4023 /* Record the clean session flag. */
4024 client_ptr -> nxd_mqtt_clean_session = clean_session;
4025
4026 /* Set TCP connection establish notify for non-blocking mode. */
4027 if (wait_option == 0)
4028 {
4029 nx_tcp_socket_establish_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_tcp_establish_notify);
4030
4031 /* Set the receive callback. */
4032 nx_tcp_socket_receive_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_receive_callback);
4033
4034 #ifdef NXD_MQTT_OVER_WEBSOCKET
4035 if (client_ptr -> nxd_mqtt_client_use_websocket)
4036 {
4037
4038 /* Set the websocket connection callback. */
4039 nx_websocket_client_connection_status_callback_set(&client_ptr -> nxd_mqtt_client_websocket, client_ptr, _nxd_mqtt_client_websocket_connection_status_callback);
4040 }
4041 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4042 }
4043 else
4044 {
4045
4046 /* Clean receive callback. */
4047 client_ptr -> nxd_mqtt_client_socket.nx_tcp_receive_callback = NX_NULL;
4048
4049 #ifdef NXD_MQTT_OVER_WEBSOCKET
4050 if (client_ptr -> nxd_mqtt_client_use_websocket)
4051 {
4052
4053 /* Clean the websocket connection callback. */
4054 nx_websocket_client_connection_status_callback_set(&client_ptr -> nxd_mqtt_client_websocket, NX_NULL, NX_NULL);
4055 }
4056 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4057 }
4058
4059 /* Release mutex */
4060 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4061
4062 /* First attempt to bind the client socket. */
4063 nx_tcp_client_socket_bind(&(client_ptr -> nxd_mqtt_client_socket), NX_ANY_PORT, wait_option);
4064
4065 /* Obtain the mutex. */
4066 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4067
4068 /* Make state as NXD_MQTT_CLIENT_STATE_CONNECTING. */
4069 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_CONNECTING;
4070
4071 /* Release mutex. */
4072 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4073
4074 /* Connect to the MQTT server */
4075 status = nxd_tcp_client_socket_connect(&(client_ptr -> nxd_mqtt_client_socket), server_ip, server_port, wait_option);
4076 if ((status != NX_SUCCESS) && (status != NX_IN_PROGRESS))
4077 {
4078
4079 /* Obtain the mutex. */
4080 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4081
4082 /* Make state as NXD_MQTT_CLIENT_STATE_IDLE. */
4083 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
4084
4085 /* Release mutex. */
4086 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4087
4088 #ifdef NX_SECURE_ENABLE
4089 if (client_ptr -> nxd_mqtt_client_use_tls)
4090 {
4091 nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
4092 }
4093 #endif /* NX_SECURE_ENABLE */
4094 nx_tcp_client_socket_unbind(&(client_ptr -> nxd_mqtt_client_socket));
4095 tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
4096 return(NXD_MQTT_CONNECT_FAILURE);
4097 }
4098
4099 /* Just return for non-blocking mode. */
4100 if (wait_option == 0)
4101 {
4102 return(NX_IN_PROGRESS);
4103 }
4104
4105 /* Increase priority to the same of internal thread to avoid out of order packet process. */
4106 #ifndef NXD_MQTT_CLOUD_ENABLE
4107 thread_ptr = &(client_ptr -> nxd_mqtt_thread);
4108 #else
4109 thread_ptr = &(client_ptr -> nxd_mqtt_client_cloud_ptr -> nx_cloud_thread);
4110 #endif /* NXD_MQTT_CLOUD_ENABLE */
4111 tx_thread_info_get(thread_ptr, NX_NULL, NX_NULL, NX_NULL,
4112 &new_priority, NX_NULL, NX_NULL, NX_NULL, NX_NULL);
4113 tx_thread_priority_change(tx_thread_identify(), new_priority, &old_priority);
4114
4115 /* If TLS is enabled, start TLS */
4116 #ifdef NX_SECURE_ENABLE
4117 if (client_ptr -> nxd_mqtt_client_use_tls)
4118 {
4119
4120 status = nx_secure_tls_session_start(&(client_ptr -> nxd_mqtt_tls_session), &(client_ptr -> nxd_mqtt_client_socket), wait_option);
4121
4122 if (status != NX_SUCCESS)
4123 {
4124
4125 /* Revert thread priority. */
4126 tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4127
4128 /* End connection. */
4129 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4130
4131 return(NXD_MQTT_CONNECT_FAILURE);
4132 }
4133 }
4134 #endif /* NX_SECURE_ENABLE */
4135
4136 #ifdef NXD_MQTT_OVER_WEBSOCKET
4137 if (client_ptr -> nxd_mqtt_client_use_websocket)
4138 {
4139 #ifdef NX_SECURE_ENABLE
4140 if (client_ptr -> nxd_mqtt_client_use_tls)
4141 {
4142 status = nx_websocket_client_secure_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_tls_session),
4143 client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
4144 client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
4145 (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
4146 wait_option);
4147 }
4148 else
4149 #endif /* NX_SECURE_ENABLE */
4150 {
4151 status = nx_websocket_client_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_client_socket),
4152 client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
4153 client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
4154 (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
4155 wait_option);
4156 }
4157
4158 if (status != NX_SUCCESS)
4159 {
4160
4161 /* Revert thread priority. */
4162 tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4163
4164 /* End connection. */
4165 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4166
4167 return(NXD_MQTT_CONNECT_FAILURE);
4168 }
4169 }
4170
4171 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4172
4173 /* Start to send connect packet. */
4174 status = _nxd_mqtt_client_connect_packet_send(client_ptr, wait_option);
4175
4176 if (status != NX_SUCCESS)
4177 {
4178
4179 /* Revert thread priority. */
4180 tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4181
4182 /* End connection. */
4183 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4184 return(NXD_MQTT_CONNECT_FAILURE);
4185 }
4186
4187 /* Call a receive. */
4188 status = _nxd_mqtt_packet_receive(client_ptr, &packet_ptr, wait_option);
4189
4190 /* Revert thread priority. */
4191 tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4192
4193 /* Check status. */
4194 if (status)
4195 {
4196
4197 /* End connection. */
4198 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4199 return(NXD_MQTT_COMMUNICATION_FAILURE);
4200 }
4201
4202 /* Process CONNACK message. */
4203 status = _nxd_mqtt_process_connack(client_ptr, packet_ptr, wait_option);
4204
4205 /* Release the packet. */
4206 nx_packet_release(packet_ptr);
4207
4208 /* Check status. */
4209 if (status == NX_SUCCESS)
4210 {
4211
4212 /* Set the receive callback. */
4213 nx_tcp_socket_receive_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_receive_callback);
4214 }
4215
4216 return(status);
4217 }
4218
4219 /**************************************************************************/
4220 /* */
4221 /* FUNCTION RELEASE */
4222 /* */
4223 /* _nxd_mqtt_client_connect_packet_send PORTABLE C */
4224 /* 6.2.0 */
4225 /* AUTHOR */
4226 /* */
4227 /* Yuxin Zhou, Microsoft Corporation */
4228 /* */
4229 /* DESCRIPTION */
4230 /* */
4231 /* This function sends CONNECT packet to MQTT server. */
4232 /* */
4233 /* */
4234 /* INPUT */
4235 /* */
4236 /* client_ptr Pointer to MQTT Client */
4237 /* keepalive Keepalive flag */
4238 /* clean_session Clean session flag */
4239 /* wait_option Timeout value */
4240 /* */
4241 /* */
4242 /* OUTPUT */
4243 /* */
4244 /* status Completion status */
4245 /* */
4246 /* CALLS */
4247 /* */
4248 /* nx_packet_release */
4249 /* _nxd_mqtt_packet_allocate */
4250 /* _nxd_mqtt_release_transmit_packet */
4251 /* _nxd_mqtt_client_connection_end */
4252 /* _nxd_mqtt_client_set_fixed_header */
4253 /* _nxd_mqtt_client_append_message */
4254 /* _nxd_mqtt_packet_send */
4255 /* */
4256 /* CALLED BY */
4257 /* */
4258 /* _nxd_mqtt_client_connect */
4259 /* _nxd_mqtt_tcp_establish_process */
4260 /* _nxd_mqtt_tls_establish_process */
4261 /* */
4262 /* RELEASE HISTORY */
4263 /* */
4264 /* DATE NAME DESCRIPTION */
4265 /* */
4266 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4267 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4268 /* resulting in version 6.1 */
4269 /* 07-29-2022 Spencer McDonough Modified comment(s), */
4270 /* improved internal logic, */
4271 /* resulting in version 6.1.12 */
4272 /* 10-31-2022 Bo Chen Modified comment(s), improved */
4273 /* the logic of sending packet,*/
4274 /* resulting in version 6.2.0 */
4275 /* */
4276 /**************************************************************************/
_nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)4277 UINT _nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
4278 {
4279 NX_PACKET *packet_ptr;
4280 UINT status;
4281 UINT length = 0;
4282 UCHAR connection_flags = 0;
4283 UINT ret = NXD_MQTT_SUCCESS;
4284 UCHAR temp_data[4];
4285 UINT keepalive = (client_ptr -> nxd_mqtt_keepalive/NX_IP_PERIODIC_RATE);
4286
4287
4288 /* Construct connect flags by taking the connect flag user supplies, or'ing the username and
4289 password bits, if they are supplied. */
4290 if (client_ptr -> nxd_mqtt_client_username)
4291 {
4292 connection_flags |= MQTT_CONNECT_FLAGS_USERNAME;
4293
4294 /* Add the password flag only if username is supplied. */
4295 if (client_ptr -> nxd_mqtt_client_password)
4296 {
4297 connection_flags |= MQTT_CONNECT_FLAGS_PASSWORD;
4298 }
4299 }
4300
4301 if (client_ptr -> nxd_mqtt_client_will_topic)
4302 {
4303 connection_flags = connection_flags | MQTT_CONNECT_FLAGS_WILL_FLAG;
4304
4305
4306 if (client_ptr -> nxd_mqtt_client_will_qos_retain & 0x80)
4307 {
4308 connection_flags = connection_flags | MQTT_CONNECT_FLAGS_WILL_RETAIN;
4309 }
4310
4311 connection_flags = (UCHAR)(connection_flags | ((client_ptr -> nxd_mqtt_client_will_qos_retain & 0x3) << 3));
4312 }
4313
4314 if (client_ptr -> nxd_mqtt_clean_session == NX_TRUE)
4315 {
4316 connection_flags = connection_flags | MQTT_CONNECT_FLAGS_CLEAN_SESSION;
4317
4318 /* Clear any transmit blocks from the previous session. */
4319 while (client_ptr -> message_transmit_queue_head)
4320 {
4321 _nxd_mqtt_release_transmit_packet(client_ptr, client_ptr -> message_transmit_queue_head, NX_NULL);
4322 }
4323 }
4324
4325 /* Set the length of the packet. */
4326 length = 10;
4327
4328 /* Add the size of the client Identifier. */
4329 length += (client_ptr -> nxd_mqtt_client_id_length + 2);
4330
4331 /* Add the will topic, if present. */
4332 if (connection_flags & MQTT_CONNECT_FLAGS_WILL_FLAG)
4333 {
4334 length += (client_ptr -> nxd_mqtt_client_will_topic_length + 2);
4335 length += (client_ptr -> nxd_mqtt_client_will_message_length + 2);
4336 }
4337 if (connection_flags & MQTT_CONNECT_FLAGS_USERNAME)
4338 {
4339 length += (UINT)(client_ptr -> nxd_mqtt_client_username_length + 2);
4340 }
4341 if (connection_flags & MQTT_CONNECT_FLAGS_PASSWORD)
4342 {
4343 length += (UINT)(client_ptr -> nxd_mqtt_client_password_length + 2);
4344 }
4345
4346 /* Check for invalid length. */
4347 if (length > (127 * 127 * 127 * 127))
4348 {
4349 return(NXD_MQTT_INTERNAL_ERROR);
4350 }
4351
4352 status = _nxd_mqtt_packet_allocate(client_ptr, &packet_ptr, wait_option);
4353
4354 if (status)
4355 {
4356 return(status);
4357 }
4358
4359 /* Construct MQTT CONNECT message. */
4360 temp_data[0] = ((MQTT_CONTROL_PACKET_TYPE_CONNECT << 4) & 0xF0);
4361
4362 /* Fill in fixed header. */
4363 ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, temp_data[0], length, wait_option);
4364
4365 if (ret)
4366 {
4367 /* Release the packet. */
4368 nx_packet_release(packet_ptr);
4369 return(NXD_MQTT_PACKET_POOL_FAILURE);
4370 }
4371
4372 /* Fill in protocol name. */
4373 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, "MQTT", 4, wait_option);
4374
4375 if (ret)
4376 {
4377 /* Release the packet. */
4378 nx_packet_release(packet_ptr);
4379 return(NXD_MQTT_PACKET_POOL_FAILURE);
4380 }
4381
4382 /* Fill in protocol level, */
4383 temp_data[0] = MQTT_PROTOCOL_LEVEL;
4384
4385 /* Fill in byte 8: connect flags */
4386 temp_data[1] = connection_flags;
4387
4388 /* Fill in byte 9 and 10: keep alive */
4389 temp_data[2] = (keepalive >> 8) & 0xFF;
4390 temp_data[3] = (keepalive & 0xFF);
4391
4392 ret = nx_packet_data_append(packet_ptr, temp_data, 4, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4393
4394 if (ret)
4395 {
4396
4397 /* Release the packet. */
4398 nx_packet_release(packet_ptr);
4399
4400 return(NXD_MQTT_PACKET_POOL_FAILURE);
4401 }
4402
4403 /* Fill in payload area, in the order of: client identifier, will topic, will message,
4404 user name, and password. */
4405 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_id,
4406 client_ptr -> nxd_mqtt_client_id_length, wait_option);
4407
4408 /* Next fill will topic and will message if the will flag is set. */
4409 if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_WILL_FLAG))
4410 {
4411 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, (CHAR *)client_ptr -> nxd_mqtt_client_will_topic,
4412 client_ptr -> nxd_mqtt_client_will_topic_length, wait_option);
4413
4414 if (!ret)
4415 {
4416 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, (CHAR *)client_ptr -> nxd_mqtt_client_will_message,
4417 client_ptr -> nxd_mqtt_client_will_message_length, wait_option);
4418 }
4419 }
4420
4421 /* Fill username if username flag is set */
4422 if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_USERNAME))
4423 {
4424 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_username,
4425 client_ptr -> nxd_mqtt_client_username_length, wait_option);
4426 }
4427
4428 /* Fill password if password flag is set */
4429 if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_PASSWORD))
4430 {
4431 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_password,
4432 client_ptr -> nxd_mqtt_client_password_length, wait_option);
4433 }
4434
4435 if (ret)
4436 {
4437
4438 /* Release the packet. */
4439 nx_packet_release(packet_ptr);
4440
4441 return(NXD_MQTT_PACKET_POOL_FAILURE);
4442 }
4443
4444 /* Ready to send the connect message to the server. */
4445 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
4446
4447 if (status)
4448 {
4449
4450 /* Release the packet. */
4451 nx_packet_release(packet_ptr);
4452 }
4453
4454 /* Update the timeout value. */
4455 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
4456
4457 return(status);
4458 }
4459
4460 /**************************************************************************/
4461 /* */
4462 /* FUNCTION RELEASE */
4463 /* */
4464 /* _nxd_mqtt_client_secure_connect PORTABLE C */
4465 /* 6.1 */
4466 /* AUTHOR */
4467 /* */
4468 /* Yuxin Zhou, Microsoft Corporation */
4469 /* */
4470 /* DESCRIPTION */
4471 /* */
4472 /* This function makes an initial secure (TLS) connection to */
4473 /* the MQTT server. */
4474 /* */
4475 /* */
4476 /* INPUT */
4477 /* */
4478 /* client_ptr Pointer to MQTT Client */
4479 /* server_ip Server IP address structure */
4480 /* server_port Server port number, in host */
4481 /* byte order */
4482 /* tls_setup User-supplied callback */
4483 /* function to set up TLS */
4484 /* parameters. */
4485 /* username User name, or NULL if user */
4486 /* name is not required */
4487 /* username_length Length of the user name, or */
4488 /* 0 if user name is NULL */
4489 /* password Password string, or NULL if */
4490 /* password is not required */
4491 /* password_length Length of the password, or */
4492 /* 0 if password is NULL */
4493 /* connection_flag Flag passed to the server */
4494 /* timeout Timeout value */
4495 /* */
4496 /* */
4497 /* OUTPUT */
4498 /* */
4499 /* status Completion status */
4500 /* */
4501 /* CALLS */
4502 /* */
4503 /* _nxd_mqtt_client_connect Actual MQTT Client connect */
4504 /* call */
4505 /* */
4506 /* CALLED BY */
4507 /* */
4508 /* Application Code */
4509 /* */
4510 /* RELEASE HISTORY */
4511 /* */
4512 /* DATE NAME DESCRIPTION */
4513 /* */
4514 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4515 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4516 /* resulting in version 6.1 */
4517 /* */
4518 /**************************************************************************/
4519 #ifdef NX_SECURE_ENABLE
_nxd_mqtt_client_secure_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT (* tls_setup)(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION *,NX_SECURE_X509_CERT *,NX_SECURE_X509_CERT *),UINT keepalive,UINT clean_session,ULONG wait_option)4520 UINT _nxd_mqtt_client_secure_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
4521 UINT (*tls_setup)(NXD_MQTT_CLIENT *client_ptr, NX_SECURE_TLS_SESSION *,
4522 NX_SECURE_X509_CERT *, NX_SECURE_X509_CERT *),
4523 UINT keepalive, UINT clean_session, ULONG wait_option)
4524 {
4525 UINT ret;
4526
4527 /* Set up TLS session information. */
4528 ret = (*tls_setup)(client_ptr, &client_ptr -> nxd_mqtt_tls_session,
4529 &client_ptr -> nxd_mqtt_tls_certificate,
4530 &client_ptr -> nxd_mqtt_tls_trusted_certificate);
4531
4532 if (ret)
4533 {
4534 return(ret);
4535 }
4536
4537 /* Mark the connection as secure. */
4538 client_ptr -> nxd_mqtt_client_use_tls = 1;
4539
4540 ret = _nxd_mqtt_client_connect(client_ptr, server_ip, server_port, keepalive, clean_session, wait_option);
4541
4542 return(ret);
4543 }
4544
4545 #endif /* NX_SECURE_ENABLE */
4546
4547
4548 /**************************************************************************/
4549 /* */
4550 /* FUNCTION RELEASE */
4551 /* */
4552 /* _nxd_mqtt_client_delete PORTABLE C */
4553 /* 6.2.0 */
4554 /* AUTHOR */
4555 /* */
4556 /* Yuxin Zhou, Microsoft Corporation */
4557 /* */
4558 /* DESCRIPTION */
4559 /* */
4560 /* This function deletes a previously created MQTT client instance. */
4561 /* If the NXD_MQTT_SOCKET_TIMEOUT is set to NX_WAIT_FOREVER, this may */
4562 /* suspend infinitely. This is because the MQTT Client must */
4563 /* disconnect with the server, and if the network link is disabled or */
4564 /* the server is not responding, this will blocks this function from */
4565 /* completing. */
4566 /* */
4567 /* INPUT */
4568 /* */
4569 /* client_ptr Pointer to MQTT Client */
4570 /* */
4571 /* OUTPUT */
4572 /* */
4573 /* status Completion status */
4574 /* */
4575 /* CALLS */
4576 /* */
4577 /* tx_event_flags_set */
4578 /* */
4579 /* CALLED BY */
4580 /* */
4581 /* Application Code */
4582 /* */
4583 /* RELEASE HISTORY */
4584 /* */
4585 /* DATE NAME DESCRIPTION */
4586 /* */
4587 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4588 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4589 /* resulting in version 6.1 */
4590 /* 10-31-2022 Bo Chen Modified comment(s), supported*/
4591 /* mqtt over websocket, */
4592 /* resulting in version 6.2.0 */
4593 /* */
4594 /**************************************************************************/
_nxd_mqtt_client_delete(NXD_MQTT_CLIENT * client_ptr)4595 UINT _nxd_mqtt_client_delete(NXD_MQTT_CLIENT *client_ptr)
4596 {
4597
4598
4599 /* Set the event flag for DELETE. Next time when the MQTT client thread
4600 wakes up, it will perform the deletion process. */
4601 #ifndef NXD_MQTT_CLOUD_ENABLE
4602 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_DELETE_EVENT, TX_OR);
4603 #else
4604 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_DELETE_EVENT);
4605 #endif /* NXD_MQTT_CLOUD_ENABLE */
4606
4607 /* Check if the MQTT client thread has completed. */
4608 while((client_ptr -> nxd_mqtt_client_socket).nx_tcp_socket_id != 0)
4609 {
4610 tx_thread_sleep(NX_IP_PERIODIC_RATE);
4611 }
4612
4613 #ifndef NXD_MQTT_CLOUD_ENABLE
4614 /* Now we can delete the Client instance. */
4615 tx_thread_delete(&(client_ptr -> nxd_mqtt_thread));
4616 #else
4617 /* Deregister mqtt module from cloud helper. */
4618 nx_cloud_module_deregister(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module));
4619
4620 /* Delete own created cloud. */
4621 if (client_ptr -> nxd_mqtt_client_cloud.nx_cloud_id == NX_CLOUD_ID)
4622 nx_cloud_delete(&(client_ptr -> nxd_mqtt_client_cloud));
4623 #endif /* NXD_MQTT_CLOUD_ENABLE */
4624
4625 #ifdef NXD_MQTT_OVER_WEBSOCKET
4626 if (client_ptr -> nxd_mqtt_client_use_websocket)
4627 {
4628 nx_websocket_client_delete(&client_ptr -> nxd_mqtt_client_websocket);
4629 client_ptr -> nxd_mqtt_client_use_websocket = NX_FALSE;
4630 }
4631 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4632
4633 return(NXD_MQTT_SUCCESS);
4634 }
4635
4636 /**************************************************************************/
4637 /* */
4638 /* FUNCTION RELEASE */
4639 /* */
4640 /* _nxd_mqtt_client_publish_packet_send PORTABLE C */
4641 /* 6.2.0 */
4642 /* AUTHOR */
4643 /* */
4644 /* Yuxin Zhou, Microsoft Corporation */
4645 /* */
4646 /* DESCRIPTION */
4647 /* */
4648 /* This function sends a publish packet to the connected broker. */
4649 /* */
4650 /* */
4651 /* INPUT */
4652 /* */
4653 /* client_ptr Pointer to MQTT Client */
4654 /* packet_ptr Pointer to publish packet */
4655 /* packet_id Current packet ID */
4656 /* QoS Quality of service */
4657 /* wait_option Suspension option */
4658 /* */
4659 /* OUTPUT */
4660 /* */
4661 /* status Completion status */
4662 /* */
4663 /* CALLS */
4664 /* */
4665 /* tx_mutex_get */
4666 /* tx_mutex_put */
4667 /* nx_packet_release */
4668 /* _nxd_mqtt_copy_transmit_packet */
4669 /* _nxd_mqtt_packet_send */
4670 /* */
4671 /* CALLED BY */
4672 /* */
4673 /* _nxd_mqtt_client_publish */
4674 /* */
4675 /* RELEASE HISTORY */
4676 /* */
4677 /* DATE NAME DESCRIPTION */
4678 /* */
4679 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4680 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4681 /* resulting in version 6.1 */
4682 /* 08-02-2021 Yuxin Zhou Modified comment(s), */
4683 /* supported maximum transmit */
4684 /* queue depth, */
4685 /* resulting in version 6.1.8 */
4686 /* 10-31-2022 Bo Chen Modified comment(s), improved */
4687 /* the logic of sending packet,*/
4688 /* resulting in version 6.2.0 */
4689 /* */
4690 /**************************************************************************/
_nxd_mqtt_client_publish_packet_send(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,USHORT packet_id,UINT QoS,ULONG wait_option)4691 UINT _nxd_mqtt_client_publish_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr,
4692 USHORT packet_id, UINT QoS, ULONG wait_option)
4693 {
4694
4695 UINT status;
4696 UINT ret = NXD_MQTT_SUCCESS;
4697
4698 if (QoS != 0)
4699 {
4700 /* This packet needs to be stored locally for possible retransmission. */
4701 NX_PACKET *transmit_packet_ptr;
4702
4703 /* Copy packet for retransmission. */
4704 if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
4705 packet_id, NX_TRUE, wait_option))
4706 {
4707 return(NXD_MQTT_PACKET_POOL_FAILURE);
4708 }
4709
4710 /* Obtain the mutex. */
4711 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4712
4713 if (status != TX_SUCCESS)
4714 {
4715 nx_packet_release(transmit_packet_ptr);
4716
4717 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
4718 /* Decrease the transmit queue depth. */
4719 client_ptr -> message_transmit_queue_depth--;
4720 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
4721 return(NXD_MQTT_MUTEX_FAILURE);
4722 }
4723
4724 if (client_ptr -> message_transmit_queue_head == NX_NULL)
4725 {
4726 client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
4727 }
4728 else
4729 {
4730 client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
4731 }
4732 client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
4733 }
4734 else
4735 {
4736
4737 /* Obtain the mutex. */
4738 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4739
4740 if (status != TX_SUCCESS)
4741 {
4742 return(NXD_MQTT_MUTEX_FAILURE);
4743 }
4744 }
4745
4746 /* Update the timeout value. */
4747 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
4748
4749
4750 /* Release the mutex. */
4751 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4752
4753 /* Ready to send the connect message to the server. */
4754 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
4755
4756 if (status)
4757 {
4758 ret = NXD_MQTT_COMMUNICATION_FAILURE;
4759 }
4760
4761 return(ret);
4762 }
4763
4764 /**************************************************************************/
4765 /* */
4766 /* FUNCTION RELEASE */
4767 /* */
4768 /* _nxd_mqtt_client_publish PORTABLE C */
4769 /* 6.1.12 */
4770 /* AUTHOR */
4771 /* */
4772 /* Yuxin Zhou, Microsoft Corporation */
4773 /* */
4774 /* DESCRIPTION */
4775 /* */
4776 /* This function publishes a message to the connected broker. */
4777 /* */
4778 /* */
4779 /* INPUT */
4780 /* */
4781 /* client_ptr Pointer to MQTT Client */
4782 /* topic_name Name of the topic */
4783 /* topic_name_length Length of the topic name */
4784 /* message Message string */
4785 /* message_length Length of the message, */
4786 /* in bytes */
4787 /* retain The retain flag, whether */
4788 /* or not the broker should */
4789 /* store this message */
4790 /* QoS Expected QoS level */
4791 /* wait_option Suspension option */
4792 /* */
4793 /* OUTPUT */
4794 /* */
4795 /* status Completion status */
4796 /* */
4797 /* CALLS */
4798 /* */
4799 /* tx_mutex_get */
4800 /* _nxd_mqtt_packet_allocate */
4801 /* _nxd_mqtt_client_set_fixed_header */
4802 /* _nxd_mqtt_client_append_message */
4803 /* tx_mutex_put */
4804 /* nx_packet_release */
4805 /* _nxd_mqtt_client_publish_packet_send */
4806 /* */
4807 /* CALLED BY */
4808 /* */
4809 /* Application Code */
4810 /* */
4811 /* RELEASE HISTORY */
4812 /* */
4813 /* DATE NAME DESCRIPTION */
4814 /* */
4815 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
4816 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
4817 /* resulting in version 6.1 */
4818 /* 07-29-2022 Spencer McDonough Modified comment(s), */
4819 /* improved internal logic, */
4820 /* resulting in version 6.1.12 */
4821 /* */
4822 /**************************************************************************/
_nxd_mqtt_client_publish(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,CHAR * message,UINT message_length,UINT retain,UINT QoS,ULONG wait_option)4823 UINT _nxd_mqtt_client_publish(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length,
4824 CHAR *message, UINT message_length, UINT retain, UINT QoS, ULONG wait_option)
4825 {
4826
4827 NX_PACKET *packet_ptr;
4828 UINT status;
4829 UINT length = 0;
4830 UCHAR flags;
4831 USHORT packet_id = 0;
4832 UINT ret = NXD_MQTT_SUCCESS;
4833
4834 if (QoS == 2)
4835 {
4836 return(NXD_MQTT_QOS2_NOT_SUPPORTED);
4837 }
4838
4839
4840 /* Do nothing if the client is already connected. */
4841 if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTED)
4842 {
4843 return(NXD_MQTT_NOT_CONNECTED);
4844 }
4845
4846 status = _nxd_mqtt_packet_allocate(client_ptr, &packet_ptr, wait_option);
4847
4848 if (status != NXD_MQTT_SUCCESS)
4849 {
4850 return(NXD_MQTT_PACKET_POOL_FAILURE);
4851 }
4852
4853 flags = (UCHAR)((MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4) | (QoS << 1));
4854
4855 if (retain)
4856 {
4857 flags = flags | MQTT_PUBLISH_RETAIN;
4858 }
4859
4860
4861 /* Compute the remaining length. */
4862
4863 /* Topic Name. */
4864 /* Compute Topic Name length. */
4865 length = topic_name_length + 2;
4866
4867 /* Count packet ID for QoS 1 or QoS 2 message. */
4868 if ((QoS == 1) || (QoS == 2))
4869 {
4870 length += 2;
4871 }
4872
4873 /* Count message. */
4874 if ((message != NX_NULL) && (message_length != 0))
4875 {
4876 length += message_length;
4877 }
4878
4879 /* Write out the control header and remaining length field. */
4880 ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, flags, length, wait_option);
4881
4882 if (ret)
4883 {
4884
4885 /* Release the packet. */
4886 nx_packet_release(packet_ptr);
4887
4888 return(NXD_MQTT_INTERNAL_ERROR);
4889 }
4890
4891
4892 /* Write out topic */
4893 ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, topic_name, topic_name_length, wait_option);
4894
4895 if (ret)
4896 {
4897
4898 /* Release the packet. */
4899 nx_packet_release(packet_ptr);
4900
4901 return(NXD_MQTT_INTERNAL_ERROR);
4902 }
4903
4904 /* Append Packet Identifier for QoS level 1 or 2 MQTT 3.3.2.2 */
4905 if ((QoS == 1) || (QoS == 2))
4906 {
4907 UCHAR identifier[2];
4908
4909 /* Obtain the mutex. */
4910 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4911
4912 if (status != TX_SUCCESS)
4913 {
4914 return(NXD_MQTT_MUTEX_FAILURE);
4915 }
4916
4917 packet_id = (USHORT)client_ptr -> nxd_mqtt_client_packet_identifier;
4918 identifier[0] = (UCHAR)(client_ptr -> nxd_mqtt_client_packet_identifier >> 8);
4919 identifier[1] = (client_ptr -> nxd_mqtt_client_packet_identifier & 0xFF);
4920
4921 /* Update packet id. */
4922 client_ptr -> nxd_mqtt_client_packet_identifier = (client_ptr -> nxd_mqtt_client_packet_identifier + 1) & 0xFFFF;
4923
4924 /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
4925 if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
4926 client_ptr -> nxd_mqtt_client_packet_identifier = 1;
4927
4928 /* Release the mutex. */
4929 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4930
4931 ret = nx_packet_data_append(packet_ptr, identifier, 2,
4932 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4933
4934 if (ret)
4935 {
4936
4937 /* Release the packet. */
4938 nx_packet_release(packet_ptr);
4939
4940 return(NXD_MQTT_INTERNAL_ERROR);
4941 }
4942 }
4943
4944 /* Append message. */
4945 if ((message != NX_NULL) && (message_length) != 0)
4946 {
4947
4948 /* Use nx_packet_data_append to move user-supplied message data into the packet.
4949 nx_packet_data_append uses chained packet if the additional storage space is
4950 needed. */
4951 ret = nx_packet_data_append(packet_ptr, message, message_length,
4952 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4953 if(ret)
4954 {
4955 /* Unable to obtain a new packet to store the message. */
4956
4957 /* Release the packet. */
4958 nx_packet_release(packet_ptr);
4959
4960 return(NXD_MQTT_INTERNAL_ERROR);
4961 }
4962 }
4963
4964 /* Send publish packet. */
4965 ret = _nxd_mqtt_client_publish_packet_send(client_ptr, packet_ptr, packet_id, QoS, wait_option);
4966
4967 if (ret)
4968 {
4969
4970 /* Release the packet. */
4971 nx_packet_release(packet_ptr);
4972 }
4973 return(ret);
4974 }
4975
4976 /**************************************************************************/
4977 /* */
4978 /* FUNCTION RELEASE */
4979 /* */
4980 /* _nxd_mqtt_client_subscribe PORTABLE C */
4981 /* 6.1.2 */
4982 /* AUTHOR */
4983 /* */
4984 /* Yuxin Zhou, Microsoft Corporation */
4985 /* */
4986 /* DESCRIPTION */
4987 /* */
4988 /* This function sends a subscribe message to the broker. */
4989 /* */
4990 /* */
4991 /* INPUT */
4992 /* */
4993 /* client_ptr Pointer to MQTT Client */
4994 /* topic_name Pointer to the topic string */
4995 /* to subscribe to */
4996 /* topic_name_length Length of the topic string */
4997 /* in bytes */
4998 /* QoS Expected QoS level */
4999 /* */
5000 /* OUTPUT */
5001 /* */
5002 /* status Completion status */
5003 /* */
5004 /* CALLS */
5005 /* */
5006 /* _nxd_mqtt_client_sub_unsub The actual routine that */
5007 /* performs the sub/unsub */
5008 /* action. */
5009 /* */
5010 /* CALLED BY */
5011 /* */
5012 /* Application Code */
5013 /* */
5014 /* RELEASE HISTORY */
5015 /* */
5016 /* DATE NAME DESCRIPTION */
5017 /* */
5018 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5019 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5020 /* resulting in version 6.1 */
5021 /* 11-09-2020 Yuxin Zhou Modified comment(s), and */
5022 /* added packet id parameter, */
5023 /* resulting in version 6.1.2 */
5024 /* */
5025 /**************************************************************************/
_nxd_mqtt_client_subscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,UINT QoS)5026 UINT _nxd_mqtt_client_subscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length, UINT QoS)
5027 {
5028
5029 if (QoS == 2)
5030 {
5031 return(NXD_MQTT_QOS2_NOT_SUPPORTED);
5032 }
5033
5034 return(_nxd_mqtt_client_sub_unsub(client_ptr, (MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02,
5035 topic_name, topic_name_length, NX_NULL, QoS));
5036 }
5037
5038
5039
5040
5041 /**************************************************************************/
5042 /* */
5043 /* FUNCTION RELEASE */
5044 /* */
5045 /* _nxd_mqtt_client_unsubscribe PORTABLE C */
5046 /* 6.1.2 */
5047 /* AUTHOR */
5048 /* */
5049 /* Yuxin Zhou, Microsoft Corporation */
5050 /* */
5051 /* DESCRIPTION */
5052 /* */
5053 /* This function unsubscribes a topic from the broker. */
5054 /* */
5055 /* */
5056 /* INPUT */
5057 /* */
5058 /* client_ptr Pointer to MQTT Client */
5059 /* topic_name Pointer to the topic string */
5060 /* to subscribe to */
5061 /* topic_name_length Length of the topic string */
5062 /* in bytes */
5063 /* */
5064 /* OUTPUT */
5065 /* */
5066 /* status Completion status */
5067 /* */
5068 /* CALLS */
5069 /* */
5070 /* _nxd_mqtt_client_sub_unsub */
5071 /* */
5072 /* CALLED BY */
5073 /* */
5074 /* Application Code */
5075 /* */
5076 /* RELEASE HISTORY */
5077 /* */
5078 /* DATE NAME DESCRIPTION */
5079 /* */
5080 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5081 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5082 /* resulting in version 6.1 */
5083 /* 11-09-2020 Yuxin Zhou Modified comment(s), and */
5084 /* added packet id parameter, */
5085 /* resulting in version 6.1.2 */
5086 /* */
5087 /**************************************************************************/
_nxd_mqtt_client_unsubscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length)5088 UINT _nxd_mqtt_client_unsubscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length)
5089 {
5090 return(_nxd_mqtt_client_sub_unsub(client_ptr, (MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE << 4) | 0x02,
5091 topic_name, topic_name_length, NX_NULL, 0));
5092 }
5093
5094 /**************************************************************************/
5095 /* */
5096 /* FUNCTION RELEASE */
5097 /* */
5098 /* _nxd_mqtt_send_simple_message PORTABLE C */
5099 /* 6.2.0 */
5100 /* AUTHOR */
5101 /* */
5102 /* Yuxin Zhou, Microsoft Corporation */
5103 /* */
5104 /* DESCRIPTION */
5105 /* */
5106 /* This internal function handles the transmission of PINGREQ or */
5107 /* DISCONNECT message. */
5108 /* */
5109 /* */
5110 /* INPUT */
5111 /* */
5112 /* client_ptr Pointer to MQTT Client */
5113 /* header_value Value to be programmed into */
5114 /* MQTT header. */
5115 /* */
5116 /* OUTPUT */
5117 /* */
5118 /* status Completion status */
5119 /* */
5120 /* CALLS */
5121 /* */
5122 /* tx_mutex_get */
5123 /* _nxd_mqtt_packet_allocate */
5124 /* tx_mutex_put */
5125 /* _nxd_mqtt_packet_send */
5126 /* nx_packet_release */
5127 /* */
5128 /* CALLED BY */
5129 /* */
5130 /* _nxd_mqtt_client_ping */
5131 /* _nxd_mqtt_client_disconnect */
5132 /* */
5133 /* RELEASE HISTORY */
5134 /* */
5135 /* DATE NAME DESCRIPTION */
5136 /* */
5137 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5138 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5139 /* resulting in version 6.1 */
5140 /* 07-29-2022 Spencer McDonough Modified comment(s), */
5141 /* improved internal logic, */
5142 /* resulting in version 6.1.12 */
5143 /* 10-31-2022 Bo Chen Modified comment(s), improved */
5144 /* the logic of sending packet,*/
5145 /* resulting in version 6.2.0 */
5146 /* */
5147 /**************************************************************************/
_nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT * client_ptr,UCHAR header_value)5148 static UINT _nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT *client_ptr, UCHAR header_value)
5149 {
5150
5151 NX_PACKET *packet_ptr;
5152 UINT status;
5153 UINT status_mutex;
5154 UCHAR *byte;
5155
5156 status = _nxd_mqtt_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
5157 if (status)
5158 {
5159 return(NXD_MQTT_INTERNAL_ERROR);
5160 }
5161
5162 if (2u > ((ULONG)(packet_ptr -> nx_packet_data_end) - (ULONG)(packet_ptr -> nx_packet_append_ptr)))
5163 {
5164 nx_packet_release(packet_ptr);
5165
5166 /* Packet buffer is too small to hold the message. */
5167 return(NX_SIZE_ERROR);
5168 }
5169
5170 byte = packet_ptr -> nx_packet_prepend_ptr;
5171
5172 *byte = (UCHAR)(header_value << 4);
5173 *(byte + 1) = 0;
5174
5175 packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + 2;
5176 packet_ptr -> nx_packet_length = 2;
5177
5178
5179 /* Release MQTT protection before making NetX/TLS calls. */
5180 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5181
5182 /* Send packet to server. */
5183 status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
5184
5185 status_mutex = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5186 if (status)
5187 {
5188
5189 /* Release the packet. */
5190 nx_packet_release(packet_ptr);
5191
5192 status = NXD_MQTT_COMMUNICATION_FAILURE;
5193 }
5194 if (status_mutex)
5195 {
5196 return(NXD_MQTT_MUTEX_FAILURE);
5197 }
5198
5199 if (header_value == MQTT_CONTROL_PACKET_TYPE_PINGREQ)
5200 {
5201 /* Do not update the ping sent time if the outstanding ping has not been responded yet */
5202 if (client_ptr -> nxd_mqtt_ping_not_responded != 1)
5203 {
5204 /* Record the time we send out the PINGREG */
5205 client_ptr -> nxd_mqtt_ping_sent_time = tx_time_get();
5206 client_ptr -> nxd_mqtt_ping_not_responded = 1;
5207 }
5208 }
5209
5210 /* Update the timeout value. */
5211 client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
5212
5213 return(status);
5214 }
5215
5216
5217 /**************************************************************************/
5218 /* */
5219 /* FUNCTION RELEASE */
5220 /* */
5221 /* _nxd_mqtt_client_disconnect PORTABLE C */
5222 /* 6.1 */
5223 /* AUTHOR */
5224 /* */
5225 /* Yuxin Zhou, Microsoft Corporation */
5226 /* */
5227 /* DESCRIPTION */
5228 /* */
5229 /* This function disconnects the MQTT client from a server. */
5230 /* */
5231 /* */
5232 /* INPUT */
5233 /* */
5234 /* client_ptr Pointer to MQTT Client */
5235 /* */
5236 /* OUTPUT */
5237 /* */
5238 /* status Completion status */
5239 /* */
5240 /* CALLS */
5241 /* */
5242 /* _nxd_mqtt_send_simple_message */
5243 /* _nxd_mqtt_process_disconnect */
5244 /* */
5245 /* CALLED BY */
5246 /* */
5247 /* Application Code */
5248 /* */
5249 /* RELEASE HISTORY */
5250 /* */
5251 /* DATE NAME DESCRIPTION */
5252 /* */
5253 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5254 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5255 /* resulting in version 6.1 */
5256 /* */
5257 /**************************************************************************/
_nxd_mqtt_client_disconnect(NXD_MQTT_CLIENT * client_ptr)5258 UINT _nxd_mqtt_client_disconnect(NXD_MQTT_CLIENT *client_ptr)
5259 {
5260 UINT status;
5261
5262 /* Obtain the mutex. */
5263 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
5264 if (status != TX_SUCCESS)
5265 {
5266 /* Disable timer if timer has been started. */
5267 if (client_ptr -> nxd_mqtt_keepalive)
5268 {
5269 tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
5270 }
5271
5272 return(NXD_MQTT_MUTEX_FAILURE);
5273 }
5274
5275 /* Let the server know we are ending the MQTT session. */
5276 _nxd_mqtt_send_simple_message(client_ptr, MQTT_CONTROL_PACKET_TYPE_DISCONNECT);
5277
5278 /* Call the disconnect routine to disconnect the socket,
5279 release transmit packets, release received packets,
5280 and delete the client timer. */
5281 _nxd_mqtt_process_disconnect(client_ptr);
5282
5283 /* Release the mutex. */
5284 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5285
5286 return(status);
5287 }
5288
5289 /**************************************************************************/
5290 /* */
5291 /* FUNCTION RELEASE */
5292 /* */
5293 /* _nxd_mqtt_client_receive_notify_set PORTABLE C */
5294 /* 6.1 */
5295 /* AUTHOR */
5296 /* */
5297 /* Yuxin Zhou, Microsoft Corporation */
5298 /* */
5299 /* DESCRIPTION */
5300 /* */
5301 /* This function installs the MQTT client publish notify callback */
5302 /* function. */
5303 /* */
5304 /* */
5305 /* INPUT */
5306 /* */
5307 /* client_ptr Pointer to MQTT Client */
5308 /* mqtt_client_receive_notify User-supplied callback */
5309 /* function, which is invoked */
5310 /* upon receiving a publish */
5311 /* message. */
5312 /* */
5313 /* OUTPUT */
5314 /* */
5315 /* status Completion status */
5316 /* */
5317 /* CALLS */
5318 /* */
5319 /* tx_mutex_get */
5320 /* tx_mutex_put */
5321 /* */
5322 /* CALLED BY */
5323 /* */
5324 /* Application Code */
5325 /* */
5326 /* RELEASE HISTORY */
5327 /* */
5328 /* DATE NAME DESCRIPTION */
5329 /* */
5330 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5331 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5332 /* resulting in version 6.1 */
5333 /* */
5334 /**************************************************************************/
_nxd_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* receive_notify)(NXD_MQTT_CLIENT * client_ptr,UINT message_count))5335 UINT _nxd_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT *client_ptr,
5336 VOID (*receive_notify)(NXD_MQTT_CLIENT *client_ptr, UINT message_count))
5337 {
5338
5339 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5340
5341 client_ptr -> nxd_mqtt_client_receive_notify = receive_notify;
5342
5343 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5344
5345 return(NXD_MQTT_SUCCESS);
5346 }
5347
5348 /**************************************************************************/
5349 /* */
5350 /* FUNCTION RELEASE */
5351 /* */
5352 /* _nxd_mqtt_client_message_get PORTABLE C */
5353 /* 6.1 */
5354 /* AUTHOR */
5355 /* */
5356 /* Yuxin Zhou, Microsoft Corporation */
5357 /* */
5358 /* DESCRIPTION */
5359 /* */
5360 /* This function retrieves a published MQTT message. */
5361 /* */
5362 /* */
5363 /* INPUT */
5364 /* */
5365 /* client_ptr Pointer to MQTT Client */
5366 /* topic_buffer Pointer to the topic buffer */
5367 /* where topic is copied to */
5368 /* topic_buffer_size Size of the topic buffer. */
5369 /* actual_topic_length Number of bytes copied into */
5370 /* topic_buffer */
5371 /* message_buffer Pointer to the buffer where */
5372 /* message is copied to */
5373 /* message_buffer_size Size of the message_buffer */
5374 /* actual_message_length Number of bytes copied into */
5375 /* the message buffer. */
5376 /* */
5377 /* OUTPUT */
5378 /* */
5379 /* status Completion status */
5380 /* */
5381 /* CALLS */
5382 /* */
5383 /* _nxd_mqtt_client_disconnect Actual MQTT Client disconnect */
5384 /* call */
5385 /* _nxd_mqtt_read_remaining_length Skip the remaining length */
5386 /* field */
5387 /* tx_mutex_get */
5388 /* tx_mutex_put */
5389 /* */
5390 /* CALLED BY */
5391 /* */
5392 /* Application Code */
5393 /* */
5394 /* RELEASE HISTORY */
5395 /* */
5396 /* DATE NAME DESCRIPTION */
5397 /* */
5398 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5399 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
5400 /* fixed uninitialized value, */
5401 /* resulting in version 6.1 */
5402 /* */
5403 /**************************************************************************/
_nxd_mqtt_client_message_get(NXD_MQTT_CLIENT * client_ptr,UCHAR * topic_buffer,UINT topic_buffer_size,UINT * actual_topic_length,UCHAR * message_buffer,UINT message_buffer_size,UINT * actual_message_length)5404 UINT _nxd_mqtt_client_message_get(NXD_MQTT_CLIENT *client_ptr, UCHAR *topic_buffer, UINT topic_buffer_size, UINT *actual_topic_length,
5405 UCHAR *message_buffer, UINT message_buffer_size, UINT *actual_message_length)
5406 {
5407
5408 UINT status;
5409 NX_PACKET *packet_ptr;
5410 ULONG topic_offset;
5411 USHORT topic_length;
5412 ULONG message_offset;
5413 ULONG message_length;
5414
5415 tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5416 while (client_ptr -> message_receive_queue_depth)
5417 {
5418 packet_ptr = client_ptr -> message_receive_queue_head;
5419 status = _nxd_mqtt_process_publish_packet(packet_ptr, &topic_offset, &topic_length, &message_offset, &message_length);
5420 if (status == NXD_MQTT_SUCCESS)
5421 {
5422 if ((topic_buffer_size < topic_length) ||
5423 (message_buffer_size < message_length))
5424 {
5425 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5426 return(NXD_MQTT_INSUFFICIENT_BUFFER_SPACE);
5427 }
5428 }
5429
5430 client_ptr -> message_receive_queue_head = packet_ptr -> nx_packet_queue_next;
5431 if (client_ptr -> message_receive_queue_tail == packet_ptr)
5432 {
5433 client_ptr -> message_receive_queue_tail = NX_NULL;
5434 }
5435 client_ptr -> message_receive_queue_depth--;
5436
5437 if (status == NXD_MQTT_SUCCESS)
5438 {
5439
5440 /* Set topic and message lengths to avoid uninitialized value. */
5441 *actual_topic_length = 0;
5442 *actual_message_length = 0;
5443 nx_packet_data_extract_offset(packet_ptr, topic_offset, topic_buffer,
5444 topic_length, (ULONG *)actual_topic_length);
5445 nx_packet_data_extract_offset(packet_ptr, message_offset, message_buffer,
5446 message_length, (ULONG *)actual_message_length);
5447 nx_packet_release(packet_ptr);
5448
5449 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5450 return(NXD_MQTT_SUCCESS);
5451 }
5452 nx_packet_release(packet_ptr);
5453 }
5454 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5455 return(NXD_MQTT_NO_MESSAGE);
5456 }
5457
5458 /**************************************************************************/
5459 /* */
5460 /* FUNCTION RELEASE */
5461 /* */
5462 /* _nxde_mqtt_client_create PORTABLE C */
5463 /* 6.1 */
5464 /* AUTHOR */
5465 /* */
5466 /* Yuxin Zhou, Microsoft Corporation */
5467 /* */
5468 /* DESCRIPTION */
5469 /* */
5470 /* This function checks for errors in nxd_mqt_client_create call. */
5471 /* */
5472 /* */
5473 /* INPUT */
5474 /* */
5475 /* client_ptr Pointer to MQTT Client */
5476 /* client_name Name string used in by the */
5477 /* client */
5478 /* client_id Client ID used by the client */
5479 /* client_id_length Length of Client ID, in bytes */
5480 /* ip_ptr Pointer to IP instance */
5481 /* pool_ptr Pointer to packet pool */
5482 /* stack_ptr Client thread's stack pointer */
5483 /* stack_size Client thread's stack size */
5484 /* mqtt_thread_priority Priority for MQTT thread */
5485 /* memory_ptr Deprecated and not used */
5486 /* memory_size Deprecated and not used */
5487 /* */
5488 /* OUTPUT */
5489 /* */
5490 /* status Completion status */
5491 /* */
5492 /* CALLS */
5493 /* */
5494 /* _nxd_mqtt_client_create Actual client create call */
5495 /* */
5496 /* CALLED BY */
5497 /* */
5498 /* Application Code */
5499 /* */
5500 /* RELEASE HISTORY */
5501 /* */
5502 /* DATE NAME DESCRIPTION */
5503 /* */
5504 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5505 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5506 /* resulting in version 6.1 */
5507 /* */
5508 /**************************************************************************/
_nxde_mqtt_client_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority,VOID * memory_ptr,ULONG memory_size)5509 UINT _nxde_mqtt_client_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name, CHAR *client_id, UINT client_id_length,
5510 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
5511 VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority,
5512 VOID *memory_ptr, ULONG memory_size)
5513 {
5514
5515
5516 /* Check for invalid input pointers. */
5517 if ((client_ptr == NX_NULL) || (ip_ptr == NX_NULL) || (ip_ptr -> nx_ip_id != NX_IP_ID) ||
5518 (stack_ptr == NX_NULL) || (stack_size == 0) || (pool_ptr == NX_NULL))
5519 {
5520 return(NX_PTR_ERROR);
5521 }
5522
5523 return(_nxd_mqtt_client_create(client_ptr, client_name, client_id, client_id_length, ip_ptr,
5524 pool_ptr, stack_ptr, stack_size, mqtt_thread_priority,
5525 memory_ptr, memory_size));
5526 }
5527
5528
5529 /**************************************************************************/
5530 /* */
5531 /* FUNCTION RELEASE */
5532 /* */
5533 /* _nxde_mqtt_client_connect PORTABLE C */
5534 /* 6.1 */
5535 /* AUTHOR */
5536 /* */
5537 /* Yuxin Zhou, Microsoft Corporation */
5538 /* */
5539 /* DESCRIPTION */
5540 /* */
5541 /* This function checks for errors in MQTT client stop call. */
5542 /* */
5543 /* */
5544 /* INPUT */
5545 /* */
5546 /* client_ptr Pointer to MQTT Client */
5547 /* server_ip Server IP address structure */
5548 /* server_port Server port number, in host */
5549 /* byte order */
5550 /* keepalive The MQTT keepalive timer */
5551 /* clean_session Clean session flag */
5552 /* wait_option Suspension option */
5553 /* */
5554 /* */
5555 /* OUTPUT */
5556 /* */
5557 /* status Completion status */
5558 /* */
5559 /* CALLS */
5560 /* */
5561 /* _nxd_mqtt_client_connect Actual MQTT Client connect */
5562 /* call */
5563 /* */
5564 /* CALLED BY */
5565 /* */
5566 /* Application Code */
5567 /* */
5568 /* RELEASE HISTORY */
5569 /* */
5570 /* DATE NAME DESCRIPTION */
5571 /* */
5572 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5573 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
5574 /* corrected mqtt client state,*/
5575 /* resulting in version 6.1 */
5576 /* */
5577 /**************************************************************************/
_nxde_mqtt_client_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT keepalive,UINT clean_session,ULONG wait_option)5578 UINT _nxde_mqtt_client_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
5579 UINT keepalive, UINT clean_session, ULONG wait_option)
5580 {
5581
5582 UINT status;
5583
5584 if (client_ptr == NX_NULL)
5585 {
5586 return(NX_PTR_ERROR);
5587 }
5588
5589 if (server_ip == NX_NULL)
5590 {
5591 return(NX_PTR_ERROR);
5592 }
5593
5594 /* Test for IP version flag. */
5595 if ((server_ip -> nxd_ip_version != 4) && (server_ip -> nxd_ip_version != 6))
5596 {
5597 return(NXD_MQTT_INVALID_PARAMETER);
5598 }
5599
5600 if (server_port == 0)
5601 {
5602 return(NX_INVALID_PORT);
5603 }
5604
5605 status = _nxd_mqtt_client_connect(client_ptr, server_ip, server_port, keepalive, clean_session, wait_option);
5606
5607 return(status);
5608 }
5609
5610 /**************************************************************************/
5611 /* */
5612 /* FUNCTION RELEASE */
5613 /* */
5614 /* _nxde_mqtt_client_secure_connect PORTABLE C */
5615 /* 6.1 */
5616 /* AUTHOR */
5617 /* */
5618 /* Yuxin Zhou, Microsoft Corporation */
5619 /* */
5620 /* DESCRIPTION */
5621 /* */
5622 /* This function checks for errors in MQTT client TLS secure connect. */
5623 /* */
5624 /* */
5625 /* INPUT */
5626 /* */
5627 /* client_ptr Pointer to MQTT Client */
5628 /* server_ip Server IP address structure */
5629 /* server_port Server port number, in host */
5630 /* byte order */
5631 /* tls_setup User-supplied callback */
5632 /* function to set up TLS */
5633 /* parameters. */
5634 /* keepalive The MQTT keepalive timer */
5635 /* wait_option Suspension option */
5636 /* */
5637 /* */
5638 /* OUTPUT */
5639 /* */
5640 /* status Completion status */
5641 /* */
5642 /* CALLS */
5643 /* */
5644 /* _nxd_mqtt_client_connect Actual MQTT Client connect */
5645 /* call */
5646 /* */
5647 /* CALLED BY */
5648 /* */
5649 /* Application Code */
5650 /* */
5651 /* RELEASE HISTORY */
5652 /* */
5653 /* DATE NAME DESCRIPTION */
5654 /* */
5655 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5656 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
5657 /* corrected mqtt client state,*/
5658 /* resulting in version 6.1 */
5659 /* */
5660 /**************************************************************************/
5661 #ifdef NX_SECURE_ENABLE
5662
_nxde_mqtt_client_secure_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT (* tls_setup)(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION *,NX_SECURE_X509_CERT *,NX_SECURE_X509_CERT *),UINT keepalive,UINT clean_session,ULONG wait_option)5663 UINT _nxde_mqtt_client_secure_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
5664 UINT (*tls_setup)(NXD_MQTT_CLIENT *client_ptr, NX_SECURE_TLS_SESSION *,
5665 NX_SECURE_X509_CERT *, NX_SECURE_X509_CERT *),
5666 UINT keepalive, UINT clean_session, ULONG wait_option)
5667 {
5668
5669 UINT status;
5670
5671 if (client_ptr == NX_NULL)
5672 {
5673 return(NX_PTR_ERROR);
5674 }
5675
5676 if (server_ip == NX_NULL)
5677 {
5678 return(NX_PTR_ERROR);
5679 }
5680
5681 if (server_port == 0)
5682 {
5683 return(NX_INVALID_PORT);
5684 }
5685
5686 status = _nxd_mqtt_client_secure_connect(client_ptr, server_ip, server_port, tls_setup,
5687 keepalive, clean_session, wait_option);
5688
5689 return(status);
5690 }
5691 #endif /* NX_SECURE_ENABLE */
5692
5693
5694 /**************************************************************************/
5695 /* */
5696 /* FUNCTION RELEASE */
5697 /* */
5698 /* _nxde_mqtt_client_delete PORTABLE C */
5699 /* 6.1 */
5700 /* AUTHOR */
5701 /* */
5702 /* Yuxin Zhou, Microsoft Corporation */
5703 /* */
5704 /* DESCRIPTION */
5705 /* */
5706 /* This function checks for errors in MQTT client delete call. */
5707 /* */
5708 /* */
5709 /* INPUT */
5710 /* */
5711 /* client_ptr Pointer to MQTT Client */
5712 /* */
5713 /* OUTPUT */
5714 /* */
5715 /* status Completion status */
5716 /* */
5717 /* CALLS */
5718 /* */
5719 /* _nxd_mqtt_client_delete Actual MQTT Client delete */
5720 /* call. */
5721 /* */
5722 /* CALLED BY */
5723 /* */
5724 /* Application Code */
5725 /* */
5726 /* RELEASE HISTORY */
5727 /* */
5728 /* DATE NAME DESCRIPTION */
5729 /* */
5730 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5731 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5732 /* resulting in version 6.1 */
5733 /* */
5734 /**************************************************************************/
_nxde_mqtt_client_delete(NXD_MQTT_CLIENT * client_ptr)5735 UINT _nxde_mqtt_client_delete(NXD_MQTT_CLIENT *client_ptr)
5736 {
5737
5738 UINT status;
5739
5740 if (client_ptr == NX_NULL)
5741 {
5742 return(NX_PTR_ERROR);
5743 }
5744
5745 status = _nxd_mqtt_client_delete(client_ptr);
5746
5747 return(status);
5748 }
5749
5750
5751
5752 /**************************************************************************/
5753 /* */
5754 /* FUNCTION RELEASE */
5755 /* */
5756 /* _nxde_mqtt_client_publish PORTABLE C */
5757 /* 6.1 */
5758 /* AUTHOR */
5759 /* */
5760 /* Yuxin Zhou, Microsoft Corporation */
5761 /* */
5762 /* DESCRIPTION */
5763 /* */
5764 /* This function performs error checking to the publish service. */
5765 /* */
5766 /* */
5767 /* INPUT */
5768 /* */
5769 /* client_ptr Pointer to MQTT Client */
5770 /* topic_name Name of the topic */
5771 /* topic_name_length Length of the topic name */
5772 /* message Message string */
5773 /* message_length Length of the message, */
5774 /* in bytes */
5775 /* retain The retain flag, whether */
5776 /* or not the broker should */
5777 /* store this message */
5778 /* QoS Expected QoS level */
5779 /* wait_option Suspension option */
5780 /* */
5781 /* OUTPUT */
5782 /* */
5783 /* status Completion status */
5784 /* */
5785 /* CALLS */
5786 /* */
5787 /* */
5788 /* CALLED BY */
5789 /* */
5790 /* Application Code */
5791 /* */
5792 /* RELEASE HISTORY */
5793 /* */
5794 /* DATE NAME DESCRIPTION */
5795 /* */
5796 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5797 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5798 /* resulting in version 6.1 */
5799 /* */
5800 /**************************************************************************/
_nxde_mqtt_client_publish(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,CHAR * message,UINT message_length,UINT retain,UINT QoS,ULONG wait_option)5801 UINT _nxde_mqtt_client_publish(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length,
5802 CHAR *message, UINT message_length, UINT retain, UINT QoS, ULONG wait_option)
5803 {
5804 /* Validate client_ptr */
5805 if (client_ptr == NX_NULL)
5806 {
5807 return(NX_PTR_ERROR);
5808 }
5809
5810 /* Validate topic_name */
5811 if ((topic_name == NX_NULL) || (topic_name_length == 0))
5812 {
5813 return(NXD_MQTT_INVALID_PARAMETER);
5814 }
5815
5816 /* Validate message length. */
5817 if (message && (message_length == 0))
5818 {
5819 return(NXD_MQTT_INVALID_PARAMETER);
5820 }
5821
5822 /* Validate QoS value. */
5823 if (QoS > 3)
5824 {
5825 return(NXD_MQTT_INVALID_PARAMETER);
5826 }
5827
5828 return(_nxd_mqtt_client_publish(client_ptr, topic_name, topic_name_length, message, message_length, retain, QoS, wait_option));
5829 }
5830
5831
5832 /**************************************************************************/
5833 /* */
5834 /* FUNCTION RELEASE */
5835 /* */
5836 /* _nxde_mqtt_client_subscribe PORTABLE C */
5837 /* 6.1 */
5838 /* AUTHOR */
5839 /* */
5840 /* Yuxin Zhou, Microsoft Corporation */
5841 /* */
5842 /* DESCRIPTION */
5843 /* */
5844 /* This function performs error checking to the subscribe service. */
5845 /* */
5846 /* */
5847 /* INPUT */
5848 /* */
5849 /* client_ptr Pointer to MQTT Client */
5850 /* topic_name Pointer to the topic string */
5851 /* to subscribe to */
5852 /* topic_name_length Length of the topic string */
5853 /* in bytes */
5854 /* */
5855 /* OUTPUT */
5856 /* */
5857 /* client_ptr Pointer to MQTT Client */
5858 /* topic_name Pointer to the topic string */
5859 /* to subscribe to */
5860 /* topic_name_length Length of the topic string */
5861 /* in bytes */
5862 /* QoS Expected QoS level */
5863 /* */
5864 /* CALLS */
5865 /* */
5866 /* _nxd_mqtt_client_subscribe */
5867 /* */
5868 /* CALLED BY */
5869 /* */
5870 /* Application Code */
5871 /* */
5872 /* RELEASE HISTORY */
5873 /* */
5874 /* DATE NAME DESCRIPTION */
5875 /* */
5876 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5877 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5878 /* resulting in version 6.1 */
5879 /* */
5880 /**************************************************************************/
_nxde_mqtt_client_subscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,UINT QoS)5881 UINT _nxde_mqtt_client_subscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length, UINT QoS)
5882 {
5883
5884
5885 /* Validate client_ptr */
5886 if (client_ptr == NX_NULL)
5887 {
5888 return(NX_PTR_ERROR);
5889 }
5890
5891 /* Validate topic_name */
5892 if ((topic_name == NX_NULL) || (topic_name_length == 0))
5893 {
5894 return(NXD_MQTT_INVALID_PARAMETER);
5895 }
5896
5897 /* Validate QoS value. */
5898 if (QoS > 2)
5899 {
5900 return(NXD_MQTT_INVALID_PARAMETER);
5901 }
5902
5903 return(_nxd_mqtt_client_subscribe(client_ptr, topic_name, topic_name_length, QoS));
5904 }
5905
5906
5907
5908
5909 /**************************************************************************/
5910 /* */
5911 /* FUNCTION RELEASE */
5912 /* */
5913 /* _nxde_mqtt_client_unsubscribe PORTABLE C */
5914 /* 6.1 */
5915 /* AUTHOR */
5916 /* */
5917 /* Yuxin Zhou, Microsoft Corporation */
5918 /* */
5919 /* DESCRIPTION */
5920 /* */
5921 /* This function performs error checking to the unsubscribe service. */
5922 /* */
5923 /* */
5924 /* INPUT */
5925 /* */
5926 /* client_ptr Pointer to MQTT Client */
5927 /* topic_name Pointer to the topic string */
5928 /* to unsubscribe */
5929 /* topic_name_length Length of the topic string */
5930 /* in bytes */
5931 /* */
5932 /* OUTPUT */
5933 /* */
5934 /* status Completion status */
5935 /* */
5936 /* CALLS */
5937 /* */
5938 /* */
5939 /* CALLED BY */
5940 /* */
5941 /* Application Code */
5942 /* */
5943 /* RELEASE HISTORY */
5944 /* */
5945 /* DATE NAME DESCRIPTION */
5946 /* */
5947 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
5948 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
5949 /* resulting in version 6.1 */
5950 /* */
5951 /**************************************************************************/
_nxde_mqtt_client_unsubscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length)5952 UINT _nxde_mqtt_client_unsubscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length)
5953 {
5954 /* Validate client_ptr */
5955 if (client_ptr == NX_NULL)
5956 {
5957 return(NX_PTR_ERROR);
5958 }
5959
5960 /* Validate topic_name */
5961 if ((topic_name == NX_NULL) || (topic_name_length == 0))
5962 {
5963 return(NXD_MQTT_INVALID_PARAMETER);
5964 }
5965
5966 return(_nxd_mqtt_client_unsubscribe(client_ptr, topic_name, topic_name_length));
5967 }
5968
5969
5970 /**************************************************************************/
5971 /* */
5972 /* FUNCTION RELEASE */
5973 /* */
5974 /* _nxde_mqtt_client_disconnect PORTABLE C */
5975 /* 6.1 */
5976 /* AUTHOR */
5977 /* */
5978 /* Yuxin Zhou, Microsoft Corporation */
5979 /* */
5980 /* DESCRIPTION */
5981 /* */
5982 /* This function checks for errors in MQTT client disconnect call. */
5983 /* */
5984 /* */
5985 /* INPUT */
5986 /* */
5987 /* client_ptr Pointer to MQTT Client */
5988 /* */
5989 /* OUTPUT */
5990 /* */
5991 /* status Completion status */
5992 /* */
5993 /* CALLS */
5994 /* */
5995 /* _nxd_mqtt_client_disconnect Actual MQTT Client disconnect */
5996 /* call */
5997 /* */
5998 /* CALLED BY */
5999 /* */
6000 /* Application Code */
6001 /* */
6002 /* RELEASE HISTORY */
6003 /* */
6004 /* DATE NAME DESCRIPTION */
6005 /* */
6006 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6007 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6008 /* resulting in version 6.1 */
6009 /* */
6010 /**************************************************************************/
_nxde_mqtt_client_disconnect(NXD_MQTT_CLIENT * client_ptr)6011 UINT _nxde_mqtt_client_disconnect(NXD_MQTT_CLIENT *client_ptr)
6012 {
6013 /* Validate client_ptr */
6014 if (client_ptr == NX_NULL)
6015 {
6016 return(NX_PTR_ERROR);
6017 }
6018
6019 return(_nxd_mqtt_client_disconnect(client_ptr));
6020 }
6021
6022
6023 /**************************************************************************/
6024 /* */
6025 /* FUNCTION RELEASE */
6026 /* */
6027 /* _nxde_mqtt_client_message_get PORTABLE C */
6028 /* 6.1 */
6029 /* AUTHOR */
6030 /* */
6031 /* Yuxin Zhou, Microsoft Corporation */
6032 /* */
6033 /* DESCRIPTION */
6034 /* */
6035 /* This function checks for errors in MQTT client message get call. */
6036 /* */
6037 /* */
6038 /* INPUT */
6039 /* */
6040 /* client_ptr Pointer to MQTT Client */
6041 /* topic_buffer Pointer to the topic buffer */
6042 /* where topic is copied to */
6043 /* topic_buffer_size Size of the topic buffer. */
6044 /* actual_topic_length Number of bytes copied into */
6045 /* topic_buffer */
6046 /* message_buffer Pointer to the buffer where */
6047 /* message is copied to */
6048 /* message_buffer_size Size of the message_buffer */
6049 /* actual_message_length Number of bytes copied into */
6050 /* the message buffer. */
6051 /* */
6052 /* OUTPUT */
6053 /* */
6054 /* status Completion status */
6055 /* */
6056 /* CALLS */
6057 /* */
6058 /* _nxd_mqtt_client_message_get */
6059 /* */
6060 /* */
6061 /* CALLED BY */
6062 /* */
6063 /* Application Code */
6064 /* */
6065 /* RELEASE HISTORY */
6066 /* */
6067 /* DATE NAME DESCRIPTION */
6068 /* */
6069 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6070 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6071 /* resulting in version 6.1 */
6072 /* */
6073 /**************************************************************************/
_nxde_mqtt_client_message_get(NXD_MQTT_CLIENT * client_ptr,UCHAR * topic_buffer,UINT topic_buffer_size,UINT * actual_topic_length,UCHAR * message_buffer,UINT message_buffer_size,UINT * actual_message_length)6074 UINT _nxde_mqtt_client_message_get(NXD_MQTT_CLIENT *client_ptr, UCHAR *topic_buffer, UINT topic_buffer_size, UINT *actual_topic_length,
6075 UCHAR *message_buffer, UINT message_buffer_size, UINT *actual_message_length)
6076 {
6077
6078 /* Validate client_ptr */
6079 if (client_ptr == NX_NULL)
6080 {
6081 return(NX_PTR_ERROR);
6082 }
6083
6084 /* Topic and topic_length can be NULL if caller does not care the topic string. */
6085
6086 /* Validate message. Message_length can be NULL if caller does not care message length. */
6087 if ((message_buffer == NX_NULL) || (topic_buffer == NX_NULL))
6088 {
6089 return(NXD_MQTT_INVALID_PARAMETER);
6090 }
6091
6092
6093 return(_nxd_mqtt_client_message_get(client_ptr, topic_buffer, topic_buffer_size, actual_topic_length,
6094 message_buffer, message_buffer_size, actual_message_length));
6095 }
6096
6097
6098 /**************************************************************************/
6099 /* */
6100 /* FUNCTION RELEASE */
6101 /* */
6102 /* _nxde_mqtt_client_receive_notify_set PORTABLE C */
6103 /* 6.1 */
6104 /* AUTHOR */
6105 /* */
6106 /* Yuxin Zhou, Microsoft Corporation */
6107 /* */
6108 /* DESCRIPTION */
6109 /* */
6110 /* This function checks for errors in MQTT client publish notify call. */
6111 /* */
6112 /* */
6113 /* INPUT */
6114 /* */
6115 /* client_ptr Pointer to MQTT Client */
6116 /* receive_notify User-supplied callback */
6117 /* function, which is invoked */
6118 /* upon receiving a publish */
6119 /* message. */
6120 /* */
6121 /* OUTPUT */
6122 /* */
6123 /* status Completion status */
6124 /* */
6125 /* CALLS */
6126 /* */
6127 /* _nxd_mqtt_client_receive_notify_set */
6128 /* */
6129 /* */
6130 /* CALLED BY */
6131 /* */
6132 /* Application Code */
6133 /* */
6134 /* RELEASE HISTORY */
6135 /* */
6136 /* DATE NAME DESCRIPTION */
6137 /* */
6138 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6139 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6140 /* resulting in version 6.1 */
6141 /* */
6142 /**************************************************************************/
_nxde_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* receive_notify)(NXD_MQTT_CLIENT * client_ptr,UINT message_count))6143 UINT _nxde_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT *client_ptr,
6144 VOID (*receive_notify)(NXD_MQTT_CLIENT *client_ptr, UINT message_count))
6145 {
6146 /* Validate client_ptr */
6147 if (client_ptr == NX_NULL)
6148 {
6149 return(NX_PTR_ERROR);
6150 }
6151
6152 if (receive_notify == NX_NULL)
6153 {
6154 return(NX_PTR_ERROR);
6155 }
6156
6157 return(_nxd_mqtt_client_receive_notify_set(client_ptr, receive_notify));
6158 }
6159
6160
6161
6162 /**************************************************************************/
6163 /* */
6164 /* FUNCTION RELEASE */
6165 /* */
6166 /* _nxd_mqtt_client_disconnect_notify_set PORTABLE C */
6167 /* 6.1 */
6168 /* AUTHOR */
6169 /* */
6170 /* Yuxin Zhou, Microsoft Corporation */
6171 /* */
6172 /* DESCRIPTION */
6173 /* */
6174 /* This function sets the notify function for the disconnect event. */
6175 /* */
6176 /* INPUT */
6177 /* */
6178 /* client_ptr Pointer to MQTT Client */
6179 /* disconnect_notify The notify function to be */
6180 /* used when the client is */
6181 /* disconnected from the */
6182 /* server. */
6183 /* */
6184 /* OUTPUT */
6185 /* */
6186 /* status Completion status */
6187 /* */
6188 /* CALLS */
6189 /* */
6190 /* None */
6191 /* */
6192 /* CALLED BY */
6193 /* */
6194 /* Application Code */
6195 /* */
6196 /* RELEASE HISTORY */
6197 /* */
6198 /* DATE NAME DESCRIPTION */
6199 /* */
6200 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6201 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6202 /* resulting in version 6.1 */
6203 /* */
6204 /**************************************************************************/
_nxd_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* disconnect_notify)(NXD_MQTT_CLIENT *))6205 UINT _nxd_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT *client_ptr, VOID (*disconnect_notify)(NXD_MQTT_CLIENT *))
6206 {
6207
6208 client_ptr -> nxd_mqtt_disconnect_notify = disconnect_notify;
6209
6210 return(NXD_MQTT_SUCCESS);
6211 }
6212
6213
6214 /**************************************************************************/
6215 /* */
6216 /* FUNCTION RELEASE */
6217 /* */
6218 /* _nxde_mqtt_client_disconnect_notify_set PORTABLE C */
6219 /* 6.1 */
6220 /* AUTHOR */
6221 /* */
6222 /* Yuxin Zhou, Microsoft Corporation */
6223 /* */
6224 /* DESCRIPTION */
6225 /* */
6226 /* This function checks for errors in setting MQTT client disconnect */
6227 /* callback function. */
6228 /* */
6229 /* INPUT */
6230 /* */
6231 /* client_ptr Pointer to MQTT Client */
6232 /* disconnect_callback The callback function to be */
6233 /* used when an on-going */
6234 /* connection is disconnected. */
6235 /* */
6236 /* OUTPUT */
6237 /* */
6238 /* status Completion status */
6239 /* */
6240 /* CALLS */
6241 /* */
6242 /* _nxd_mqtt_client_disconnect_notify_set */
6243 /* */
6244 /* CALLED BY */
6245 /* */
6246 /* Application Code */
6247 /* */
6248 /* RELEASE HISTORY */
6249 /* */
6250 /* DATE NAME DESCRIPTION */
6251 /* */
6252 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6253 /* 09-30-2020 Yuxin Zhou Modified comment(s), */
6254 /* resulting in version 6.1 */
6255 /* */
6256 /**************************************************************************/
_nxde_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* disconnect_notify)(NXD_MQTT_CLIENT *))6257 UINT _nxde_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT *client_ptr, VOID (*disconnect_notify)(NXD_MQTT_CLIENT *))
6258 {
6259
6260 /* Validate client_ptr */
6261 if (client_ptr == NX_NULL)
6262 {
6263 return(NX_PTR_ERROR);
6264 }
6265 return(_nxd_mqtt_client_disconnect_notify_set(client_ptr, disconnect_notify));
6266 }
6267
6268
6269 #ifdef NXD_MQTT_CLOUD_ENABLE
6270 /**************************************************************************/
6271 /* */
6272 /* FUNCTION RELEASE */
6273 /* */
6274 /* _nxd_mqtt_client_cloud_create PORTABLE C */
6275 /* 6.1 */
6276 /* AUTHOR */
6277 /* */
6278 /* Yuxin Zhou, Microsoft Corporation */
6279 /* */
6280 /* DESCRIPTION */
6281 /* */
6282 /* This function creates mqtt client running on cloud helper thread. */
6283 /* */
6284 /* */
6285 /* INPUT */
6286 /* */
6287 /* client_ptr Pointer to MQTT Client */
6288 /* client_name Name string used in by the */
6289 /* client */
6290 /* client_id Client ID used by the client */
6291 /* client_id_length Length of Client ID, in bytes */
6292 /* ip_ptr Pointer to IP instance */
6293 /* pool_ptr Pointer to packet pool */
6294 /* cloud_ptr Pointer to Cloud instance */
6295 /* */
6296 /* OUTPUT */
6297 /* */
6298 /* status Completion status */
6299 /* */
6300 /* CALLS */
6301 /* */
6302 /* _nxd_mqtt_client_create Actual client create call */
6303 /* */
6304 /* CALLED BY */
6305 /* */
6306 /* Application Code */
6307 /* */
6308 /* RELEASE HISTORY */
6309 /* */
6310 /* DATE NAME DESCRIPTION */
6311 /* */
6312 /* 05-19-2020 Yuxin Zhou Initial Version 6.0 */
6313 /* 09-30-2020 Yuxin Zhou Modified comment(s), and */
6314 /* corrected mqtt client state,*/
6315 /* resulting in version 6.1 */
6316 /* */
6317 /**************************************************************************/
_nxd_mqtt_client_cloud_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,NX_CLOUD * cloud_ptr)6318 UINT _nxd_mqtt_client_cloud_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name, CHAR *client_id, UINT client_id_length,
6319 NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr, NX_CLOUD *cloud_ptr)
6320 {
6321
6322 UINT status;
6323
6324
6325 /* Check for invalid input pointers. */
6326 if ((client_ptr == NX_NULL) || (ip_ptr == NX_NULL) || (ip_ptr -> nx_ip_id != NX_IP_ID) ||
6327 (pool_ptr == NX_NULL) || (cloud_ptr == NX_NULL) || (cloud_ptr -> nx_cloud_id != NX_CLOUD_ID))
6328 {
6329 return(NX_PTR_ERROR);
6330 }
6331
6332 /* Create MQTT client. */
6333 status = _nxd_mqtt_client_create_internal(client_ptr, client_name, client_id, client_id_length, ip_ptr,
6334 pool_ptr, NX_NULL, 0, 0);
6335
6336 /* Check status. */
6337 if (status)
6338 {
6339 return(status);
6340 }
6341
6342 /* Save the cloud pointer. */
6343 client_ptr -> nxd_mqtt_client_cloud_ptr = cloud_ptr;
6344
6345 /* Save the mutex pointer. */
6346 client_ptr -> nxd_mqtt_client_mutex_ptr = &(cloud_ptr -> nx_cloud_mutex);
6347
6348 /* Register MQTT on cloud helper. */
6349 status = nx_cloud_module_register(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module), client_name, NX_CLOUD_MODULE_MQTT_EVENT,
6350 _nxd_mqtt_client_event_process, client_ptr);
6351
6352 /* Determine if an error occurred. */
6353 if (status != NX_SUCCESS)
6354 {
6355
6356 /* Delete internal resource created in _nxd_mqtt_client_create_internal(). */
6357
6358 /* Delete socket. */
6359 nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
6360
6361 return(NXD_MQTT_INTERNAL_ERROR);
6362 }
6363
6364 /* Update state. */
6365 client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
6366
6367 return(NXD_MQTT_SUCCESS);
6368 }
6369 #endif /* NXD_MQTT_CLOUD_ENABLE */
6370
6371 #ifdef NXD_MQTT_OVER_WEBSOCKET
6372 /**************************************************************************/
6373 /* */
6374 /* FUNCTION RELEASE */
6375 /* */
6376 /* _nxd_mqtt_client_websocket_connection_status_callback */
6377 /* PORTABLE C */
6378 /* 6.2.0 */
6379 /* AUTHOR */
6380 /* */
6381 /* Yuxin Zhou, Microsoft Corporation */
6382 /* */
6383 /* DESCRIPTION */
6384 /* */
6385 /* This function is the websocket connection status callback. */
6386 /* */
6387 /* INPUT */
6388 /* */
6389 /* websocket_client_ptr Pointer to websocket client */
6390 /* context Pointer to MQTT client */
6391 /* status Websocket connection status */
6392 /* */
6393 /* OUTPUT */
6394 /* */
6395 /* None */
6396 /* */
6397 /* CALLS */
6398 /* */
6399 /* _nxd_mqtt_client_connect_packet_send */
6400 /* _nxd_mqtt_client_connection_end */
6401 /* */
6402 /* CALLED BY */
6403 /* */
6404 /* _nxd_mqtt_client_event_process */
6405 /* */
6406 /* RELEASE HISTORY */
6407 /* */
6408 /* DATE NAME DESCRIPTION */
6409 /* */
6410 /* 10-31-2022 Yuxin Zhou Initial Version 6.2.0 */
6411 /* */
6412 /**************************************************************************/
_nxd_mqtt_client_websocket_connection_status_callback(NX_WEBSOCKET_CLIENT * websocket_client_ptr,VOID * context,UINT status)6413 VOID _nxd_mqtt_client_websocket_connection_status_callback(NX_WEBSOCKET_CLIENT *websocket_client_ptr, VOID *context, UINT status)
6414 {
6415 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)context;
6416
6417
6418 NX_PARAMETER_NOT_USED(websocket_client_ptr);
6419
6420 if (status == NX_SUCCESS)
6421 {
6422
6423 /* Start to send MQTT connect packet. */
6424 status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
6425 }
6426
6427 /* If an error occurs. */
6428 if (status)
6429 {
6430
6431 /* End connection. */
6432 _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
6433
6434 /* Check callback function. */
6435 if (client_ptr -> nxd_mqtt_connect_notify)
6436 {
6437 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
6438 }
6439 }
6440 }
6441
6442 /**************************************************************************/
6443 /* */
6444 /* FUNCTION RELEASE */
6445 /* */
6446 /* _nxd_mqtt_client_websocket_set PORTABLE C */
6447 /* 6.2.0 */
6448 /* AUTHOR */
6449 /* */
6450 /* Yuxin Zhou, Microsoft Corporation */
6451 /* */
6452 /* DESCRIPTION */
6453 /* */
6454 /* This function sets the websocket. */
6455 /* */
6456 /* INPUT */
6457 /* */
6458 /* client_ptr Pointer to MQTT Client */
6459 /* host Host used by the client */
6460 /* host_length Length of host, in bytes */
6461 /* uri_path URI path used by the client */
6462 /* uri_path_length Length of uri path, in bytes */
6463 /* */
6464 /* OUTPUT */
6465 /* */
6466 /* status Completion status */
6467 /* */
6468 /* CALLS */
6469 /* */
6470 /* None */
6471 /* */
6472 /* CALLED BY */
6473 /* */
6474 /* Application Code */
6475 /* */
6476 /* RELEASE HISTORY */
6477 /* */
6478 /* DATE NAME DESCRIPTION */
6479 /* */
6480 /* 10-31-2022 Yuxin Zhou Initial Version 6.2.0 */
6481 /* */
6482 /**************************************************************************/
_nxd_mqtt_client_websocket_set(NXD_MQTT_CLIENT * client_ptr,UCHAR * host,UINT host_length,UCHAR * uri_path,UINT uri_path_length)6483 UINT _nxd_mqtt_client_websocket_set(NXD_MQTT_CLIENT *client_ptr, UCHAR *host, UINT host_length, UCHAR *uri_path, UINT uri_path_length)
6484 {
6485 UINT status;
6486
6487 /* Obtain the mutex. */
6488 status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
6489
6490 if (status != TX_SUCCESS)
6491 {
6492 return(NXD_MQTT_MUTEX_FAILURE);
6493 }
6494
6495 /* Set the host info. */
6496 client_ptr -> nxd_mqtt_client_websocket_host = host;
6497 client_ptr -> nxd_mqtt_client_websocket_host_length = host_length;
6498 client_ptr -> nxd_mqtt_client_websocket_uri_path = uri_path;
6499 client_ptr -> nxd_mqtt_client_websocket_uri_path_length = uri_path_length;
6500
6501 /* Create WebSocket. */
6502 status = nx_websocket_client_create(&client_ptr -> nxd_mqtt_client_websocket, (UCHAR *)"",
6503 client_ptr -> nxd_mqtt_client_ip_ptr,
6504 client_ptr -> nxd_mqtt_client_packet_pool_ptr);
6505
6506 /* Check status. */
6507 if (status)
6508 {
6509 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
6510 return(status);
6511 }
6512
6513 client_ptr -> nxd_mqtt_client_use_websocket = NX_TRUE;
6514
6515 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
6516 return(NXD_MQTT_SUCCESS);
6517 }
6518
6519
6520 /**************************************************************************/
6521 /* */
6522 /* FUNCTION RELEASE */
6523 /* */
6524 /* _nxde_mqtt_client_websocket_set PORTABLE C */
6525 /* 6.2.0 */
6526 /* AUTHOR */
6527 /* */
6528 /* Yuxin Zhou, Microsoft Corporation */
6529 /* */
6530 /* DESCRIPTION */
6531 /* */
6532 /* This function checks for errors in setting MQTT client websocket. */
6533 /* */
6534 /* INPUT */
6535 /* */
6536 /* client_ptr Pointer to MQTT Client */
6537 /* host Host used by the client */
6538 /* host_length Length of host, in bytes */
6539 /* uri_path URI path used by the client */
6540 /* uri_path_length Length of uri path, in bytes */
6541 /* */
6542 /* OUTPUT */
6543 /* */
6544 /* status Completion status */
6545 /* */
6546 /* CALLS */
6547 /* */
6548 /* _nxd_mqtt_client_websocket_set */
6549 /* */
6550 /* CALLED BY */
6551 /* */
6552 /* Application Code */
6553 /* */
6554 /* RELEASE HISTORY */
6555 /* */
6556 /* DATE NAME DESCRIPTION */
6557 /* */
6558 /* 10-31-2022 Yuxin Zhou Initial Version 6.2.0 */
6559 /* */
6560 /**************************************************************************/
_nxde_mqtt_client_websocket_set(NXD_MQTT_CLIENT * client_ptr,UCHAR * host,UINT host_length,UCHAR * uri_path,UINT uri_path_length)6561 UINT _nxde_mqtt_client_websocket_set(NXD_MQTT_CLIENT *client_ptr, UCHAR *host, UINT host_length, UCHAR *uri_path, UINT uri_path_length)
6562 {
6563
6564 /* Validate the parameters. */
6565 if ((client_ptr == NX_NULL) || (host == NX_NULL) || (host_length == 0) ||
6566 (uri_path == NX_NULL) || (uri_path_length == 0))
6567 {
6568 return(NX_PTR_ERROR);
6569 }
6570
6571 return(_nxd_mqtt_client_websocket_set(client_ptr, host, host_length, uri_path, uri_path_length));
6572 }
6573 #endif /* NXD_MQTT_OVER_WEBSOCKET */
6574