1 /***************************************************************************
2  * Copyright (c) 2024 Microsoft Corporation
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the MIT License which is available at
6  * https://opensource.org/licenses/MIT.
7  *
8  * SPDX-License-Identifier: MIT
9  **************************************************************************/
10 
11 
12 /**************************************************************************/
13 /**************************************************************************/
14 /**                                                                       */
15 /** NetX Component                                                        */
16 /**                                                                       */
17 /**   MQTT (MQTT)                                                         */
18 /**                                                                       */
19 /**************************************************************************/
20 /**************************************************************************/
21 
22 #define NXD_MQTT_CLIENT_SOURCE_CODE
23 
24 
25 /* Force error checking to be disabled in this module */
26 
27 #ifndef NX_DISABLE_ERROR_CHECKING
28 #define NX_DISABLE_ERROR_CHECKING
29 #endif
30 
31 
32 /* Include necessary system files.  */
33 
34 #include    "tx_api.h"
35 #include    "nx_api.h"
36 #include    "nx_ip.h"
37 #include    "nxd_mqtt_client.h"
38 
39 /* Bring in externals for caller checking code.  */
40 
41 #define MQTT_ALL_EVENTS               ((ULONG)0xFFFFFFFF)
42 #define MQTT_TIMEOUT_EVENT            ((ULONG)0x00000001)
43 #define MQTT_PACKET_RECEIVE_EVENT     ((ULONG)0x00000002)
44 #define MQTT_START_EVENT              ((ULONG)0x00000004)
45 #define MQTT_DELETE_EVENT             ((ULONG)0x00000008)
46 #define MQTT_PING_TIMEOUT_EVENT       ((ULONG)0x00000010)
47 #define MQTT_NETWORK_DISCONNECT_EVENT ((ULONG)0x00000020)
48 #define MQTT_TCP_ESTABLISH_EVENT      ((ULONG)0x00000040)
49 
50 static UINT _nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
51                                              CHAR *client_id, UINT client_id_length,
52                                              NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
53                                              VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority);
54 static UINT _nxd_mqtt_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UINT wait_option);
55 static UINT _nxd_mqtt_packet_receive(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, UINT wait_option);
56 static UINT _nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET **new_packet_ptr,
57                                            USHORT packet_id, UCHAR set_duplicate_flag, UINT wait_option);
58 static VOID _nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr);
59 static VOID _nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr);
60 static UINT _nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option);
61 static UINT _nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option);
62 
63 /**************************************************************************/
64 /*                                                                        */
65 /*  FUNCTION                                               RELEASE        */
66 /*                                                                        */
67 /*    _nxd_mqtt_client_set_fixed_header                   PORTABLE C      */
68 /*                                                           6.1          */
69 /*  AUTHOR                                                                */
70 /*                                                                        */
71 /*    Yuxin Zhou, Microsoft Corporation                                   */
72 /*                                                                        */
73 /*  DESCRIPTION                                                           */
74 /*                                                                        */
75 /*    This function writes the fixed header filed in the outgoing         */
76 /*    MQTT packet.                                                        */
77 /*                                                                        */
78 /*    This function follows the logic outlined in 2.2 in MQTT             */
79 /*    specification.                                                      */
80 /*                                                                        */
81 /*  INPUT                                                                 */
82 /*                                                                        */
83 /*    client_ptr                            Pointer to MQTT Client        */
84 /*    packet_ptr                            Outgoing MQTT packet          */
85 /*    control_header                        Control byte                  */
86 /*    length                                Remaining length in bytes     */
87 /*    wait_option                           Wait option                   */
88 /*                                                                        */
89 /*  OUTPUT                                                                */
90 /*                                                                        */
91 /*    status                                                              */
92 /*                                                                        */
93 /*  CALLS                                                                 */
94 /*                                                                        */
95 /*    nx_packet_data_append                 Append packet data            */
96 /*                                                                        */
97 /*  CALLED BY                                                             */
98 /*                                                                        */
99 /*    _nxd_mqtt_client_sub_unsub                                          */
100 /*    _nxd_mqtt_client_connect                                            */
101 /*    _nxd_mqtt_client_publish                                            */
102 /*                                                                        */
103 /*  RELEASE HISTORY                                                       */
104 /*                                                                        */
105 /*    DATE              NAME                      DESCRIPTION             */
106 /*                                                                        */
107 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
108 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
109 /*                                            resulting in version 6.1    */
110 /*                                                                        */
111 /**************************************************************************/
_nxd_mqtt_client_set_fixed_header(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,UCHAR control_header,UINT length,UINT wait_option)112 UINT _nxd_mqtt_client_set_fixed_header(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UCHAR control_header, UINT length, UINT wait_option)
113 {
114 UCHAR  fixed_header[5];
115 UCHAR *byte = fixed_header;
116 UINT   count = 0;
117 UINT   ret;
118 
119     *byte = control_header;
120     byte++;
121 
122     do
123     {
124         if (length & 0xFFFFFF80)
125         {
126             *(byte + count) = (UCHAR)((length & 0x7F) | 0x80);
127         }
128         else
129         {
130             *(byte + count) = length & 0x7F;
131         }
132         length = length >> 7;
133 
134         count++;
135     } while (length != 0);
136 
137     ret = nx_packet_data_append(packet_ptr, fixed_header, count + 1,
138                                 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
139 
140     return(ret);
141 }
142 
143 
144 
145 /**************************************************************************/
146 /*                                                                        */
147 /*  FUNCTION                                               RELEASE        */
148 /*                                                                        */
149 /*    _nxd_mqtt_read_remaining_length                     PORTABLE C      */
150 /*                                                           6.1          */
151 /*  AUTHOR                                                                */
152 /*                                                                        */
153 /*    Yuxin Zhou, Microsoft Corporation                                   */
154 /*                                                                        */
155 /*  DESCRIPTION                                                           */
156 /*                                                                        */
157 /*    This function parses the remaining length filed in the incoming     */
158 /*    MQTT packet.                                                        */
159 /*                                                                        */
160 /*    This function follows the logic outlined in 2.2.3 in MQTT           */
161 /*    specification                                                       */
162 /*                                                                        */
163 /*  INPUT                                                                 */
164 /*                                                                        */
165 /*    packet_ptr                            Incoming MQTT packet.         */
166 /*    remaining_length                      remaining length in bytes,    */
167 /*                                            this is the return value.   */
168 /*    offset                                Pointer to offset of the      */
169 /*                                            remaining data              */
170 /*                                                                        */
171 /*  OUTPUT                                                                */
172 /*                                                                        */
173 /*    status                                                              */
174 /*                                                                        */
175 /*  CALLS                                                                 */
176 /*                                                                        */
177 /*    None                                                                */
178 /*                                                                        */
179 /*  CALLED BY                                                             */
180 /*                                                                        */
181 /*    _nxd_mqtt_process_publish                                           */
182 /*    _nxd_mqtt_client_message_get                                        */
183 /*    _nxd_mqtt_process_sub_unsub_ack                                     */
184 /*                                                                        */
185 /*  RELEASE HISTORY                                                       */
186 /*                                                                        */
187 /*    DATE              NAME                      DESCRIPTION             */
188 /*                                                                        */
189 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
190 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
191 /*                                            resulting in version 6.1    */
192 /*                                                                        */
193 /**************************************************************************/
_nxd_mqtt_read_remaining_length(NX_PACKET * packet_ptr,UINT * remaining_length,ULONG * offset_ptr)194 UINT _nxd_mqtt_read_remaining_length(NX_PACKET *packet_ptr, UINT *remaining_length, ULONG *offset_ptr)
195 {
196 UINT   value = 0;
197 UCHAR  bytes[4] = {0};
198 UINT   multiplier = 1;
199 UINT   byte_count = 0;
200 ULONG  bytes_copied;
201 
202     if (nx_packet_data_extract_offset(packet_ptr, 1, &bytes, sizeof(bytes), &bytes_copied))
203     {
204 
205         /* Packet is incomplete. */
206         return(NXD_MQTT_PARTIAL_PACKET);
207     }
208 
209     do
210     {
211         if (byte_count >= bytes_copied)
212         {
213             if (byte_count == 4)
214             {
215                 return(NXD_MQTT_INTERNAL_ERROR);
216             }
217             else
218             {
219 
220                 /* Packet is incomplete. */
221                 return(NXD_MQTT_PARTIAL_PACKET);
222             }
223         }
224         value += (((bytes[byte_count]) & 0x7F) * multiplier);
225         multiplier = multiplier << 7;
226     } while ((bytes[byte_count++]) & 0x80);
227 
228     if ((1 + byte_count + value) > packet_ptr -> nx_packet_length)
229     {
230 
231         /* Packet is incomplete. */
232         /* Remaining length is larger than packet size. */
233         return(NXD_MQTT_PARTIAL_PACKET);
234     }
235 
236     *remaining_length = value;
237     *offset_ptr = (1 + byte_count);
238 
239     return(NXD_MQTT_SUCCESS);
240 }
241 
242 
243 
244 /**************************************************************************/
245 /*                                                                        */
246 /*  FUNCTION                                               RELEASE        */
247 /*                                                                        */
248 /*    _nxd_mqtt_client_sub_unsub                          PORTABLE C      */
249 /*                                                           6.3.0        */
250 /*  AUTHOR                                                                */
251 /*                                                                        */
252 /*    Yuxin Zhou, Microsoft Corporation                                   */
253 /*                                                                        */
254 /*  DESCRIPTION                                                           */
255 /*                                                                        */
256 /*    This function sends a subscribe or unsubscribe message to the       */
257 /*    broker.                                                             */
258 /*                                                                        */
259 /*                                                                        */
260 /*  INPUT                                                                 */
261 /*                                                                        */
262 /*    client_ptr                            Pointer to MQTT Client        */
263 /*    op                                    Subscribe or Unsubscribe      */
264 /*    topic_name                            Pointer to the topic string   */
265 /*                                            to subscribe to             */
266 /*    topic_name_length                     Length of the topic string    */
267 /*                                            in bytes                    */
268 /*    packet_id_ptr                         Pointer to packet id that     */
269 /*                                            will be filled with         */
270 /*                                            assigned packet id for      */
271 /*                                            sub/unsub message           */
272 /*    QoS                                   Expected QoS level            */
273 /*                                                                        */
274 /*  OUTPUT                                                                */
275 /*                                                                        */
276 /*    status                                Completion status             */
277 /*                                                                        */
278 /*  CALLS                                                                 */
279 /*                                                                        */
280 /*    tx_mutex_get                                                        */
281 /*    _nxd_mqtt_client_packet_allocate                                    */
282 /*    _nxd_mqtt_client_set_fixed_header                                   */
283 /*    _nxd_mqtt_client_append_message                                     */
284 /*    tx_mutex_put                                                        */
285 /*    _nxd_mqtt_packet_send                                               */
286 /*    nx_packet_release                                                   */
287 /*    _nxd_mqtt_copy_transmit_packet                                      */
288 /*                                                                        */
289 /*  CALLED BY                                                             */
290 /*                                                                        */
291 /*    _nxd_mqtt_client_subscribe                                          */
292 /*    _nxd_mqtt_client_unsubscribe                                        */
293 /*                                                                        */
294 /*  RELEASE HISTORY                                                       */
295 /*                                                                        */
296 /*    DATE              NAME                      DESCRIPTION             */
297 /*                                                                        */
298 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
299 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
300 /*                                            resulting in version 6.1    */
301 /*  11-09-2020     Yuxin Zhou               Modified comment(s), and      */
302 /*                                            added packet id parameter,  */
303 /*                                            resulting in version 6.1.2  */
304 /*  07-29-2022     Spencer McDonough        Modified comment(s),          */
305 /*                                            improved internal logic,    */
306 /*                                            resulting in version 6.1.12 */
307 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
308 /*                                            the logic of sending packet,*/
309 /*                                            resulting in version 6.2.0  */
310 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
311 /*                                            internal logic,             */
312 /*                                            resulting in version 6.3.0  */
313 /*                                                                        */
314 /**************************************************************************/
_nxd_mqtt_client_sub_unsub(NXD_MQTT_CLIENT * client_ptr,UINT op,CHAR * topic_name,UINT topic_name_length,USHORT * packet_id_ptr,UINT QoS)315 UINT _nxd_mqtt_client_sub_unsub(NXD_MQTT_CLIENT *client_ptr, UINT op,
316                                 CHAR *topic_name, UINT topic_name_length,
317                                 USHORT *packet_id_ptr, UINT QoS)
318 {
319 
320 
321 NX_PACKET          *packet_ptr;
322 NX_PACKET          *transmit_packet_ptr;
323 UINT                status;
324 UINT                length = 0;
325 UINT                ret = NXD_MQTT_SUCCESS;
326 UCHAR               temp_data[2];
327 
328     /* Obtain the mutex. */
329     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
330 
331     if (status != TX_SUCCESS)
332     {
333         return(NXD_MQTT_MUTEX_FAILURE);
334     }
335 
336     /* Do nothing if the client is already connected. */
337     if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTED)
338     {
339         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
340         return(NXD_MQTT_NOT_CONNECTED);
341     }
342 
343     status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
344     if (status)
345     {
346         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
347         return(status);
348     }
349 
350     /* Compute the remaining length field, starting with 2 bytes of packet ID */
351     length = 2;
352 
353     /* Count the topic. */
354     length += (2 + topic_name_length);
355 
356     if (op == ((MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02))
357     {
358         /* Count one byte for QoS */
359         length++;
360     }
361 
362     /* Write out the control header and remaining length field. */
363     ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, (UCHAR )op, length, NX_WAIT_FOREVER);
364 
365     if (ret)
366     {
367 
368         /* Release the mutex. */
369         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
370 
371         /* Release the packet. */
372         nx_packet_release(packet_ptr);
373 
374         return(NXD_MQTT_PACKET_POOL_FAILURE);
375     }
376 
377     temp_data[0] = (UCHAR)(client_ptr -> nxd_mqtt_client_packet_identifier >> 8);
378     temp_data[1] = (client_ptr -> nxd_mqtt_client_packet_identifier &  0xFF);
379 
380     if (packet_id_ptr)
381     {
382         *packet_id_ptr = (USHORT)(client_ptr -> nxd_mqtt_client_packet_identifier & 0xFFFF);
383     }
384 
385     /* Append packet ID. */
386     ret = nx_packet_data_append(packet_ptr, temp_data, 2, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_WAIT_FOREVER);
387 
388     if (ret)
389     {
390 
391         /* Release the mutex. */
392         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
393 
394         /* Release the packet. */
395         nx_packet_release(packet_ptr);
396 
397         return(NXD_MQTT_PACKET_POOL_FAILURE);
398     }
399 
400     /* Append topic name */
401     ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, topic_name, topic_name_length, NX_WAIT_FOREVER);
402 
403     if (ret)
404     {
405 
406         /* Release the mutex. */
407         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
408 
409         /* Release the packet. */
410         nx_packet_release(packet_ptr);
411 
412         return(NXD_MQTT_PACKET_POOL_FAILURE);
413     }
414 
415     if (op == ((MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02))
416     {
417         /* Fill in QoS value. */
418         temp_data[0] = QoS & 0x3;
419 
420         ret = nx_packet_data_append(packet_ptr, temp_data, 1, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_WAIT_FOREVER);
421 
422         if (ret)
423         {
424 
425             /* Release the mutex. */
426             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
427 
428             /* Release the packet. */
429             nx_packet_release(packet_ptr);
430 
431             return(NXD_MQTT_PACKET_POOL_FAILURE);
432         }
433     }
434 
435     /* Copy packet for retransmission. */
436     if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
437                                        (USHORT)(client_ptr -> nxd_mqtt_client_packet_identifier),
438                                        NX_FALSE, NX_WAIT_FOREVER))
439     {
440         /* Release the mutex. */
441         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
442 
443         /* Release the packet. */
444         nx_packet_release(packet_ptr);
445 
446         return(NXD_MQTT_PACKET_POOL_FAILURE);
447     }
448 
449     if (client_ptr -> message_transmit_queue_head == NX_NULL)
450     {
451         client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
452     }
453     else
454     {
455         client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
456     }
457     client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
458 
459     client_ptr -> nxd_mqtt_client_packet_identifier = (client_ptr -> nxd_mqtt_client_packet_identifier + 1) & 0xFFFF;
460 
461     /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
462     if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
463         client_ptr -> nxd_mqtt_client_packet_identifier = 1;
464 
465     /* Update the timeout value. */
466     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
467 
468     /* Release the mutex. */
469     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
470 
471     /* Ready to send the connect message to the server. */
472     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
473 
474     if (status)
475     {
476         /* Release the packet. */
477         nx_packet_release(packet_ptr);
478 
479         ret = NXD_MQTT_COMMUNICATION_FAILURE;
480     }
481 
482     return(ret);
483 }
484 
485 
486 /**************************************************************************/
487 /*                                                                        */
488 /*  FUNCTION                                               RELEASE        */
489 /*                                                                        */
490 /*    _nxd_mqtt_client_packet_allocate                    PORTABLE C      */
491 /*                                                           6.3.0        */
492 /*  AUTHOR                                                                */
493 /*                                                                        */
494 /*    Yuxin Zhou, Microsoft Corporation                                   */
495 /*                                                                        */
496 /*  DESCRIPTION                                                           */
497 /*                                                                        */
498 /*    This function allocates a packet for transmitting MQTT message.     */
499 /*    Special care has to be taken for accommodating IPv4/IPv6 header,    */
500 /*    and possibly TLS record if TLS is being used. On failure, the       */
501 /*    TLS mutex is released and the caller can simply return.             */
502 /*                                                                        */
503 /*                                                                        */
504 /*  INPUT                                                                 */
505 /*                                                                        */
506 /*    client_ptr                            Pointer to MQTT Client        */
507 /*    packet_ptr                            Allocated packet to be        */
508 /*                                            returned to the caller.     */
509 /*                                                                        */
510 /*  OUTPUT                                                                */
511 /*                                                                        */
512 /*    status                                Completion status             */
513 /*                                                                        */
514 /*  CALLS                                                                 */
515 /*                                                                        */
516 /*    nx_secure_tls_packet_allocate         Allocate packet for MQTT      */
517 /*                                            over TLS socket             */
518 /*    nx_packet_allocate                    Allocate a packet for MQTT    */
519 /*                                            over regular TCP socket     */
520 /*    tx_mutex_put                          Release a mutex               */
521 /*                                                                        */
522 /*  CALLED BY                                                             */
523 /*                                                                        */
524 /*    _nxd_mqtt_process_publish                                           */
525 /*    _nxd_mqtt_client_connect                                            */
526 /*    _nxd_mqtt_client_publish                                            */
527 /*    _nxd_mqtt_client_subscribe                                          */
528 /*    _nxd_mqtt_client_unsubscribe                                        */
529 /*    _nxd_mqtt_client_send_simple_message                                */
530 /*                                                                        */
531 /*  RELEASE HISTORY                                                       */
532 /*                                                                        */
533 /*    DATE              NAME                      DESCRIPTION             */
534 /*                                                                        */
535 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
536 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
537 /*                                            resulting in version 6.1    */
538 /*  07-29-2022     Spencer McDonough        Modified comment(s), and      */
539 /*                                            improved internal logic,    */
540 /*                                            resulting in version 6.1.12 */
541 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
542 /*                                            mqtt over websocket,        */
543 /*                                            resulting in version 6.2.0  */
544 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
545 /*                                            internal logic,             */
546 /*                                            resulting in version 6.3.0  */
547 /*                                                                        */
548 /**************************************************************************/
_nxd_mqtt_client_packet_allocate(NXD_MQTT_CLIENT * client_ptr,NX_PACKET ** packet_ptr,ULONG wait_option)549 UINT _nxd_mqtt_client_packet_allocate(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, ULONG wait_option)
550 {
551 UINT status = NXD_MQTT_SUCCESS;
552 
553 #ifdef NXD_MQTT_OVER_WEBSOCKET
554     if (client_ptr -> nxd_mqtt_client_use_websocket)
555     {
556 
557         /* Use WebSocket packet allocate since it is able to count for WebSocket-related header space */
558         status = nx_websocket_client_packet_allocate(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, wait_option);
559     }
560     else
561 #endif /* NXD_MQTT_OVER_WEBSOCKET */
562 #ifdef NX_SECURE_ENABLE
563     if (client_ptr -> nxd_mqtt_client_use_tls)
564     {
565         /* Use TLS packet allocate.  The TLS packet allocate is able to count for
566            TLS-related header space including crypto initial vector area. */
567         status = nx_secure_tls_packet_allocate(&client_ptr -> nxd_mqtt_tls_session, client_ptr -> nxd_mqtt_client_packet_pool_ptr,
568                                                packet_ptr, wait_option);
569     }
570     /* Allocate a packet  */
571     else
572     {
573 #endif /* NX_SECURE_ENABLE */
574         if (client_ptr -> nxd_mqtt_client_socket.nx_tcp_socket_connect_ip.nxd_ip_version == NX_IP_VERSION_V4)
575         {
576             status = nx_packet_allocate(client_ptr -> nxd_mqtt_client_packet_pool_ptr, packet_ptr, NX_IPv4_TCP_PACKET,
577                                         wait_option);
578         }
579         else
580         {
581             status = nx_packet_allocate(client_ptr -> nxd_mqtt_client_packet_pool_ptr, packet_ptr, NX_IPv6_TCP_PACKET,
582                                         wait_option);
583         }
584 #ifdef NX_SECURE_ENABLE
585     }
586 #endif /* NX_SECURE_ENABLE */
587 
588     if (status != NX_SUCCESS)
589     {
590         return(NXD_MQTT_PACKET_POOL_FAILURE);
591     }
592 
593     return(NXD_MQTT_SUCCESS);
594 }
595 
596 /**************************************************************************/
597 /*                                                                        */
598 /*  FUNCTION                                               RELEASE        */
599 /*                                                                        */
600 /*    _nxd_mqtt_packet_send                               PORTABLE C      */
601 /*                                                           6.2.0        */
602 /*  AUTHOR                                                                */
603 /*                                                                        */
604 /*    Yuxin Zhou, Microsoft Corporation                                   */
605 /*                                                                        */
606 /*  DESCRIPTION                                                           */
607 /*                                                                        */
608 /*    This function sends out a packet.                                   */
609 /*                                                                        */
610 /*                                                                        */
611 /*  INPUT                                                                 */
612 /*                                                                        */
613 /*    client_ptr                            Pointer to MQTT Client        */
614 /*    packet_ptr                            Pointer to packet             */
615 /*    wait_option                           Timeout value                 */
616 /*                                                                        */
617 /*  OUTPUT                                                                */
618 /*                                                                        */
619 /*    status                                Completion status             */
620 /*                                                                        */
621 /*  CALLS                                                                 */
622 /*                                                                        */
623 /*    nx_websocket_client_send                                            */
624 /*    nx_secure_tls_session_send                                          */
625 /*    nx_tcp_socket_send                                                  */
626 /*                                                                        */
627 /*  CALLED BY                                                             */
628 /*                                                                        */
629 /*                                                                        */
630 /*  RELEASE HISTORY                                                       */
631 /*                                                                        */
632 /*    DATE              NAME                      DESCRIPTION             */
633 /*                                                                        */
634 /*  10-31-2022     Bo Chen                  Initial Version 6.2.0         */
635 /*                                                                        */
636 /**************************************************************************/
_nxd_mqtt_packet_send(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,UINT wait_option)637 static UINT _nxd_mqtt_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UINT wait_option)
638 {
639 UINT status = NXD_MQTT_SUCCESS;
640 
641 #ifdef NXD_MQTT_OVER_WEBSOCKET
642     if (client_ptr -> nxd_mqtt_client_use_websocket)
643     {
644         status = nx_websocket_client_send(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, NX_WEBSOCKET_OPCODE_BINARY_FRAME, NX_TRUE, wait_option);
645     }
646     else
647 #endif /* NXD_MQTT_OVER_WEBSOCKET */
648 
649 #ifdef NX_SECURE_ENABLE
650     if (client_ptr -> nxd_mqtt_client_use_tls)
651     {
652         status = nx_secure_tls_session_send(&(client_ptr -> nxd_mqtt_tls_session), packet_ptr, wait_option);
653     }
654     else
655 #endif /* NX_SECURE_ENABLE */
656     {
657         status = nx_tcp_socket_send(&client_ptr -> nxd_mqtt_client_socket, packet_ptr, wait_option);
658     }
659 
660     return(status);
661 }
662 
663 /**************************************************************************/
664 /*                                                                        */
665 /*  FUNCTION                                               RELEASE        */
666 /*                                                                        */
667 /*    _nxd_mqtt_packet_receive                            PORTABLE C      */
668 /*                                                           6.2.0        */
669 /*  AUTHOR                                                                */
670 /*                                                                        */
671 /*    Yuxin Zhou, Microsoft Corporation                                   */
672 /*                                                                        */
673 /*  DESCRIPTION                                                           */
674 /*                                                                        */
675 /*    This function receives a packet.                                    */
676 /*                                                                        */
677 /*                                                                        */
678 /*  INPUT                                                                 */
679 /*                                                                        */
680 /*    client_ptr                            Pointer to MQTT Client        */
681 /*    packet_ptr                            Pointer to packet             */
682 /*    wait_option                           Timeout value                 */
683 /*                                                                        */
684 /*  OUTPUT                                                                */
685 /*                                                                        */
686 /*    status                                Completion status             */
687 /*                                                                        */
688 /*  CALLS                                                                 */
689 /*                                                                        */
690 /*    nx_websocket_client_receive                                         */
691 /*    nx_secure_tls_session_receive                                       */
692 /*    nx_tcp_socket_receive                                               */
693 /*                                                                        */
694 /*  CALLED BY                                                             */
695 /*                                                                        */
696 /*                                                                        */
697 /*  RELEASE HISTORY                                                       */
698 /*                                                                        */
699 /*    DATE              NAME                      DESCRIPTION             */
700 /*                                                                        */
701 /*  10-31-2022     Bo Chen                  Initial Version 6.2.0         */
702 /*                                                                        */
703 /**************************************************************************/
_nxd_mqtt_packet_receive(NXD_MQTT_CLIENT * client_ptr,NX_PACKET ** packet_ptr,UINT wait_option)704 static UINT _nxd_mqtt_packet_receive(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, UINT wait_option)
705 {
706 UINT status = NXD_MQTT_SUCCESS;
707 #ifdef NXD_MQTT_OVER_WEBSOCKET
708 UINT op_code = 0;
709 #endif /* NXD_MQTT_OVER_WEBSOCKET*/
710 
711 #ifdef NXD_MQTT_OVER_WEBSOCKET
712     if (client_ptr -> nxd_mqtt_client_use_websocket)
713     {
714         status = nx_websocket_client_receive(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, &op_code, wait_option);
715         if ((status == NX_SUCCESS) && (op_code != NX_WEBSOCKET_OPCODE_BINARY_FRAME))
716         {
717             return(NX_INVALID_PACKET);
718         }
719     }
720     else
721 #endif /* NXD_MQTT_OVER_WEBSOCKET */
722 
723 #ifdef NX_SECURE_ENABLE
724     if (client_ptr -> nxd_mqtt_client_use_tls)
725     {
726         status = nx_secure_tls_session_receive(&(client_ptr -> nxd_mqtt_tls_session), packet_ptr, wait_option);
727     }
728     else
729 #endif /* NX_SECURE_ENABLE */
730     {
731         status = nx_tcp_socket_receive(&(client_ptr -> nxd_mqtt_client_socket), packet_ptr, wait_option);
732     }
733 
734     return(status);
735 }
736 
737 /**************************************************************************/
738 /*                                                                        */
739 /*  FUNCTION                                               RELEASE        */
740 /*                                                                        */
741 /*    _nxd_mqtt_tcp_establish_notify                      PORTABLE C      */
742 /*                                                           6.1          */
743 /*  AUTHOR                                                                */
744 /*                                                                        */
745 /*    Yuxin Zhou, Microsoft Corporation                                   */
746 /*                                                                        */
747 /*  DESCRIPTION                                                           */
748 /*                                                                        */
749 /*    This internal function is installed as TCP connection establish     */
750 /*    callback function.                                                  */
751 /*                                                                        */
752 /*  INPUT                                                                 */
753 /*                                                                        */
754 /*    socket_ptr                            The socket that receives      */
755 /*                                           the message.                 */
756 /*                                                                        */
757 /*  OUTPUT                                                                */
758 /*                                                                        */
759 /*    None                                                                */
760 /*                                                                        */
761 /*  CALLS                                                                 */
762 /*                                                                        */
763 /*    tx_event_flags_set                                                  */
764 /*                                                                        */
765 /*  CALLED BY                                                             */
766 /*                                                                        */
767 /*    _nxd_mqtt_thread_entry                                              */
768 /*    _nxd_mqtt_client_event_process                                      */
769 /*                                                                        */
770 /*  RELEASE HISTORY                                                       */
771 /*                                                                        */
772 /*    DATE              NAME                      DESCRIPTION             */
773 /*                                                                        */
774 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
775 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
776 /*                                            resulting in version 6.1    */
777 /*                                                                        */
778 /**************************************************************************/
_nxd_mqtt_tcp_establish_notify(NX_TCP_SOCKET * socket_ptr)779 static VOID _nxd_mqtt_tcp_establish_notify(NX_TCP_SOCKET *socket_ptr)
780 {
781 NXD_MQTT_CLIENT *client_ptr;
782 
783     client_ptr = (NXD_MQTT_CLIENT *)socket_ptr -> nx_tcp_socket_reserved_ptr;
784 
785     if (&(client_ptr -> nxd_mqtt_client_socket) == socket_ptr)
786     {
787 
788         /* Set the event flag. */
789 #ifndef NXD_MQTT_CLOUD_ENABLE
790         tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_TCP_ESTABLISH_EVENT, TX_OR);
791 #else
792         nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_TCP_ESTABLISH_EVENT);
793 #endif /* NXD_MQTT_CLOUD_ENABLE */
794     }
795 }
796 
797 /**************************************************************************/
798 /*                                                                        */
799 /*  FUNCTION                                               RELEASE        */
800 /*                                                                        */
801 /*    _nxd_mqtt_receive_callback                          PORTABLE C      */
802 /*                                                           6.1          */
803 /*  AUTHOR                                                                */
804 /*                                                                        */
805 /*    Yuxin Zhou, Microsoft Corporation                                   */
806 /*                                                                        */
807 /*  DESCRIPTION                                                           */
808 /*                                                                        */
809 /*    This internal function is installed as TCP receive callback         */
810 /*    function.  On receiving a TCP message, the callback function        */
811 /*    sets an event flag to trigger MQTT client to process received       */
812 /*    message.                                                            */
813 /*                                                                        */
814 /*                                                                        */
815 /*  INPUT                                                                 */
816 /*                                                                        */
817 /*    socket_ptr                            The socket that receives      */
818 /*                                           the message.                 */
819 /*                                                                        */
820 /*  OUTPUT                                                                */
821 /*                                                                        */
822 /*    None                                                                */
823 /*                                                                        */
824 /*  CALLS                                                                 */
825 /*                                                                        */
826 /*    tx_event_flags_set                                                  */
827 /*                                                                        */
828 /*  CALLED BY                                                             */
829 /*                                                                        */
830 /*    _nxd_mqtt_thread_entry                                              */
831 /*    _nxd_mqtt_client_event_process                                      */
832 /*                                                                        */
833 /*  RELEASE HISTORY                                                       */
834 /*                                                                        */
835 /*    DATE              NAME                      DESCRIPTION             */
836 /*                                                                        */
837 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
838 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
839 /*                                            resulting in version 6.1    */
840 /*                                                                        */
841 /**************************************************************************/
_nxd_mqtt_receive_callback(NX_TCP_SOCKET * socket_ptr)842 static VOID _nxd_mqtt_receive_callback(NX_TCP_SOCKET *socket_ptr)
843 {
844 NXD_MQTT_CLIENT *client_ptr;
845 
846     client_ptr = (NXD_MQTT_CLIENT *)socket_ptr -> nx_tcp_socket_reserved_ptr;
847 
848     if (&(client_ptr -> nxd_mqtt_client_socket) == socket_ptr)
849     {
850         /* Set the event flag. */
851 #ifndef NXD_MQTT_CLOUD_ENABLE
852         tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_PACKET_RECEIVE_EVENT, TX_OR);
853 #else
854         nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PACKET_RECEIVE_EVENT);
855 #endif /* NXD_MQTT_CLOUD_ENABLE */
856     }
857 }
858 
859 /**************************************************************************/
860 /*                                                                        */
861 /*  FUNCTION                                               RELEASE        */
862 /*                                                                        */
863 /*    _nxd_mqtt_copy_transmit_packet                      PORTABLE C      */
864 /*                                                           6.1.8        */
865 /*  AUTHOR                                                                */
866 /*                                                                        */
867 /*    Yuxin Zhou, Microsoft Corporation                                   */
868 /*                                                                        */
869 /*  DESCRIPTION                                                           */
870 /*                                                                        */
871 /*    This internal function saves a transmit packet.                     */
872 /*    A transmit packet is allocated to store QoS 1 and 2 messages.       */
873 /*    Upon a message being properly acknowledged, the packet will         */
874 /*    be released.                                                        */
875 /*                                                                        */
876 /*                                                                        */
877 /*  INPUT                                                                 */
878 /*                                                                        */
879 /*    client_ptr                            Pointer to MQTT Client        */
880 /*    packet_ptr                            Pointer to the MQTT message   */
881 /*                                            packet to be saved          */
882 /*    new_packet_ptr                        Return a copied packet        */
883 /*    packet_id                             Current packet ID             */
884 /*    set_duplicate_flag                    Set duplicate flag for fixed  */
885 /*                                            header or not               */
886 /*    wait_option                           Timeout value                 */
887 /*                                                                        */
888 /*  OUTPUT                                                                */
889 /*                                                                        */
890 /*    status                                                              */
891 /*                                                                        */
892 /*  CALLS                                                                 */
893 /*                                                                        */
894 /*    nx_packet_copy                                                      */
895 /*                                                                        */
896 /*  CALLED BY                                                             */
897 /*                                                                        */
898 /*    _nxd_mqtt_client_sub_unsub                                          */
899 /*    _nxd_mqtt_process_publish                                           */
900 /*                                                                        */
901 /*  RELEASE HISTORY                                                       */
902 /*                                                                        */
903 /*    DATE              NAME                      DESCRIPTION             */
904 /*                                                                        */
905 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
906 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
907 /*                                            resulting in version 6.1    */
908 /*  08-02-2021     Yuxin Zhou               Modified comment(s),          */
909 /*                                            supported maximum transmit  */
910 /*                                            queue depth,                */
911 /*                                            resulting in version 6.1.8  */
912 /*                                                                        */
913 /**************************************************************************/
_nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET ** new_packet_ptr,USHORT packet_id,UCHAR set_duplicate_flag,UINT wait_option)914 static UINT _nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET **new_packet_ptr,
915                                            USHORT packet_id, UCHAR set_duplicate_flag, UINT wait_option)
916 {
917 UINT status;
918 
919 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
920     if (client_ptr -> message_transmit_queue_depth >= NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH)
921     {
922 
923         /* Hit the transmit queue depeth. No more packets should be queued. */
924         return(NX_TX_QUEUE_DEPTH);
925     }
926 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
927 
928     /* Copy current packet. */
929     status = nx_packet_copy(packet_ptr, new_packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
930     if (status)
931     {
932 
933         /* No available packet to be stored. */
934         return(NXD_MQTT_PACKET_POOL_FAILURE);
935     }
936 
937     /* Save packet_id at the beginning of packet. */
938     *((USHORT *)(*new_packet_ptr) -> nx_packet_data_start) = packet_id;
939 
940     if (set_duplicate_flag)
941     {
942 
943         /* Update duplicate flag in fixed header. */
944         *((*new_packet_ptr) -> nx_packet_prepend_ptr) = (*((*new_packet_ptr) -> nx_packet_prepend_ptr)) | MQTT_PUBLISH_DUP_FLAG;
945     }
946 
947 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
948     /* Increase the transmit queue depth.  */
949     client_ptr -> message_transmit_queue_depth++;
950 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
951 
952     return(NXD_MQTT_SUCCESS);
953 }
954 
955 /**************************************************************************/
956 /*                                                                        */
957 /*  FUNCTION                                               RELEASE        */
958 /*                                                                        */
959 /*    _nxd_mqtt_release_transmit_packet                   PORTABLE C      */
960 /*                                                           6.1.8        */
961 /*  AUTHOR                                                                */
962 /*                                                                        */
963 /*    Yuxin Zhou, Microsoft Corporation                                   */
964 /*                                                                        */
965 /*  DESCRIPTION                                                           */
966 /*                                                                        */
967 /*    This internal function releases a transmit packet.                  */
968 /*    A transmit packet is allocated to store QoS 1 and 2 messages.       */
969 /*    Upon a message being properly acknowledged, the packet can          */
970 /*    be released.                                                        */
971 /*                                                                        */
972 /*                                                                        */
973 /*  INPUT                                                                 */
974 /*                                                                        */
975 /*    client_ptr                            Pointer to MQTT Client        */
976 /*    packet_ptr                            Pointer to the MQTT message   */
977 /*                                            packet to be removed        */
978 /*    previous_packet_ptr                   Pointer to the previous packet*/
979 /*                                            or NULL if none exists      */
980 /*                                                                        */
981 /*  OUTPUT                                                                */
982 /*                                                                        */
983 /*    None                                                                */
984 /*                                                                        */
985 /*  CALLS                                                                 */
986 /*                                                                        */
987 /*    None                                                                */
988 /*                                                                        */
989 /*  CALLED BY                                                             */
990 /*                                                                        */
991 /*    _nxd_mqtt_thread_entry                                              */
992 /*    _nxd_mqtt_client_event_process                                      */
993 /*                                                                        */
994 /*  RELEASE HISTORY                                                       */
995 /*                                                                        */
996 /*    DATE              NAME                      DESCRIPTION             */
997 /*                                                                        */
998 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
999 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1000 /*                                            resulting in version 6.1    */
1001 /*  08-02-2021     Yuxin Zhou               Modified comment(s),          */
1002 /*                                            supported maximum transmit  */
1003 /*                                            queue depth,                */
1004 /*                                            resulting in version 6.1.8  */
1005 /*                                                                        */
1006 /**************************************************************************/
_nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET * previous_packet_ptr)1007 static VOID _nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr)
1008 {
1009 
1010     if (previous_packet_ptr)
1011     {
1012         previous_packet_ptr -> nx_packet_queue_next = packet_ptr -> nx_packet_queue_next;
1013     }
1014     else
1015     {
1016         client_ptr -> message_transmit_queue_head = packet_ptr -> nx_packet_queue_next;
1017     }
1018 
1019     if (packet_ptr == client_ptr -> message_transmit_queue_tail)
1020     {
1021         client_ptr -> message_transmit_queue_tail = previous_packet_ptr;
1022     }
1023     nx_packet_release(packet_ptr);
1024 
1025 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
1026     client_ptr -> message_transmit_queue_depth--;
1027 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
1028 }
1029 
1030 /**************************************************************************/
1031 /*                                                                        */
1032 /*  FUNCTION                                               RELEASE        */
1033 /*                                                                        */
1034 /*    _nxd_mqtt_release_receive_packet                    PORTABLE C      */
1035 /*                                                           6.1          */
1036 /*  AUTHOR                                                                */
1037 /*                                                                        */
1038 /*    Yuxin Zhou, Microsoft Corporation                                   */
1039 /*                                                                        */
1040 /*  DESCRIPTION                                                           */
1041 /*                                                                        */
1042 /*    This internal function releases a receive packet.                   */
1043 /*    A receive packet is allocated to store QoS 1 and 2 messages.        */
1044 /*    Upon a message being properly acknowledged, the packet can          */
1045 /*    be released.                                                        */
1046 /*                                                                        */
1047 /*                                                                        */
1048 /*  INPUT                                                                 */
1049 /*                                                                        */
1050 /*    client_ptr                            Pointer to MQTT Client        */
1051 /*    packet_ptr                            Pointer to the MQTT message   */
1052 /*                                            packet to be removed        */
1053 /*    previous_packet_ptr                   Pointer to the previous packet*/
1054 /*                                            or NULL if none exists      */
1055 /*                                                                        */
1056 /*  OUTPUT                                                                */
1057 /*                                                                        */
1058 /*    None                                                                */
1059 /*                                                                        */
1060 /*  CALLS                                                                 */
1061 /*                                                                        */
1062 /*    None                                                                */
1063 /*                                                                        */
1064 /*  CALLED BY                                                             */
1065 /*                                                                        */
1066 /*    _nxd_mqtt_thread_entry                                              */
1067 /*    _nxd_mqtt_client_event_process                                      */
1068 /*                                                                        */
1069 /*  RELEASE HISTORY                                                       */
1070 /*                                                                        */
1071 /*    DATE              NAME                      DESCRIPTION             */
1072 /*                                                                        */
1073 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1074 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1075 /*                                            resulting in version 6.1    */
1076 /*                                                                        */
1077 /**************************************************************************/
_nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET * previous_packet_ptr)1078 static VOID _nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr)
1079 {
1080 
1081     if (previous_packet_ptr)
1082     {
1083         previous_packet_ptr -> nx_packet_queue_next = packet_ptr -> nx_packet_queue_next;
1084     }
1085     else
1086     {
1087         client_ptr -> message_receive_queue_head = packet_ptr -> nx_packet_queue_next;
1088     }
1089 
1090     if (packet_ptr == client_ptr -> message_receive_queue_tail)
1091     {
1092         client_ptr -> message_receive_queue_tail = previous_packet_ptr;
1093     }
1094 
1095     client_ptr -> message_receive_queue_depth--;
1096 }
1097 
1098 /**************************************************************************/
1099 /*                                                                        */
1100 /*  FUNCTION                                               RELEASE        */
1101 /*                                                                        */
1102 /*    _nxd_mqtt_process_connack                           PORTABLE C      */
1103 /*                                                           6.1          */
1104 /*  AUTHOR                                                                */
1105 /*                                                                        */
1106 /*    Yuxin Zhou, Microsoft Corporation                                   */
1107 /*                                                                        */
1108 /*  DESCRIPTION                                                           */
1109 /*                                                                        */
1110 /*    This internal function processes a CONNACK message from the broker. */
1111 /*                                                                        */
1112 /*                                                                        */
1113 /*  INPUT                                                                 */
1114 /*                                                                        */
1115 /*    client_ptr                            Pointer to MQTT Client        */
1116 /*    packet_ptr                            Pointer to the packet         */
1117 /*    wait_option                           Timeout value                 */
1118 /*                                                                        */
1119 /*  OUTPUT                                                                */
1120 /*                                                                        */
1121 /*    status                                                              */
1122 /*                                                                        */
1123 /*  CALLS                                                                 */
1124 /*                                                                        */
1125 /*    [nxd_mqtt_connect_notify]             User supplied connect         */
1126 /*                                            callback function           */
1127 /*    tx_mutex_get                                                        */
1128 /*    tx_mutex_put                                                        */
1129 /*    _nxd_mqtt_client_retransmit_message                                 */
1130 /*    _nxd_mqtt_client_connection_end                                     */
1131 /*                                                                        */
1132 /*                                                                        */
1133 /*  CALLED BY                                                             */
1134 /*                                                                        */
1135 /*    _nxd_mqtt_packet_receive_process                                    */
1136 /*                                                                        */
1137 /*  RELEASE HISTORY                                                       */
1138 /*                                                                        */
1139 /*    DATE              NAME                      DESCRIPTION             */
1140 /*                                                                        */
1141 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1142 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1143 /*                                            resulting in version 6.1    */
1144 /*                                                                        */
1145 /**************************************************************************/
_nxd_mqtt_process_connack(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,ULONG wait_option)1146 static UINT _nxd_mqtt_process_connack(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, ULONG wait_option)
1147 {
1148 
1149 UINT    ret = NXD_MQTT_COMMUNICATION_FAILURE;
1150 MQTT_PACKET_CONNACK *connack_packet_ptr = (MQTT_PACKET_CONNACK *)(packet_ptr -> nx_packet_prepend_ptr);
1151 
1152 
1153     /* Check the length.  */
1154     if ((packet_ptr -> nx_packet_length != sizeof(MQTT_PACKET_CONNACK)) ||
1155         (connack_packet_ptr -> mqtt_connack_packet_header >> 4 != MQTT_CONTROL_PACKET_TYPE_CONNACK))
1156     {
1157         /* Invalid packet length.  Free the packet and process error. */
1158         ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1159     }
1160     else
1161     {
1162 
1163         /* Check remaining length.  */
1164         if (connack_packet_ptr -> mqtt_connack_packet_remaining_length != 2)
1165         {
1166             ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1167         }
1168         /* Follow MQTT-3.2.2-1 rule.  */
1169         else if ((client_ptr -> nxd_mqtt_clean_session) && (connack_packet_ptr -> mqtt_connack_packet_ack_flags & MQTT_CONNACK_CONNECT_FLAGS_SP))
1170         {
1171 
1172             /* Client requested clean session, and server responded with Session Present.  This is a violation. */
1173             ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1174         }
1175         else if (connack_packet_ptr -> mqtt_connack_packet_return_code >  MQTT_CONNACK_CONNECT_RETURN_CODE_NOT_AUTHORIZED)
1176         {
1177             ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1178         }
1179         else if (connack_packet_ptr -> mqtt_connack_packet_return_code > 0)
1180         {
1181 
1182             /* Pass the server return code to the application. */
1183             ret = (UINT)(NXD_MQTT_ERROR_CONNECT_RETURN_CODE + connack_packet_ptr -> mqtt_connack_packet_return_code);
1184         }
1185         else
1186         {
1187             ret = NXD_MQTT_SUCCESS;
1188 
1189             /* Obtain mutex before we modify client control block. */
1190             tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
1191 
1192             client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_CONNECTED;
1193 
1194             /* Initialize the packet identification field. */
1195             client_ptr -> nxd_mqtt_client_packet_identifier = NXD_MQTT_INITIAL_PACKET_ID_VALUE;
1196 
1197             /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
1198             if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
1199                 client_ptr -> nxd_mqtt_client_packet_identifier = 1;
1200 
1201             /* Release mutex */
1202             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1203         }
1204     }
1205 
1206     /* Check callback function.  */
1207     if (client_ptr -> nxd_mqtt_connect_notify)
1208     {
1209         client_ptr -> nxd_mqtt_connect_notify(client_ptr, ret, client_ptr -> nxd_mqtt_connect_context);
1210     }
1211 
1212     if (ret == NXD_MQTT_SUCCESS)
1213     {
1214 
1215         /* If client doesn't start with Clean Session, and there are un-acked PUBLISH messages,
1216            we shall re-publish these messages. */
1217         if ((client_ptr -> nxd_mqtt_clean_session != NX_TRUE) && (client_ptr -> message_transmit_queue_head))
1218         {
1219 
1220             tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
1221 
1222             /* There are messages from the previous session that has not been acknowledged. */
1223             ret = _nxd_mqtt_client_retransmit_message(client_ptr, wait_option);
1224 
1225             /* Release mutex */
1226             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1227         }
1228     }
1229     else
1230     {
1231 
1232         /* End connection. */
1233         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
1234     }
1235 
1236     return(ret);
1237 }
1238 
1239 /**************************************************************************/
1240 /*                                                                        */
1241 /*  FUNCTION                                               RELEASE        */
1242 /*                                                                        */
1243 /*    _nxd_mqtt_process_publish_packet                    PORTABLE C      */
1244 /*                                                           6.1          */
1245 /*  AUTHOR                                                                */
1246 /*                                                                        */
1247 /*    Yuxin Zhou, Microsoft Corporation                                   */
1248 /*                                                                        */
1249 /*  DESCRIPTION                                                           */
1250 /*                                                                        */
1251 /*    This internal function processes a packet and parses topic and      */
1252 /*    message.                                                            */
1253 /*                                                                        */
1254 /*                                                                        */
1255 /*  INPUT                                                                 */
1256 /*                                                                        */
1257 /*    packet_ptr                            Pointer to the packet         */
1258 /*    topic_offset_ptr                      Return topic offset           */
1259 /*    topic_length_ptr                      Return topic length           */
1260 /*    message_offset_ptr                    Return message offset         */
1261 /*    message_length_ptr                    Return message length         */
1262 /*                                                                        */
1263 /*  OUTPUT                                                                */
1264 /*                                                                        */
1265 /*    Status                                                              */
1266 /*                                                                        */
1267 /*  CALLS                                                                 */
1268 /*                                                                        */
1269 /*    _nxd_mqtt_read_remaining_length                                     */
1270 /*    nx_packet_data_extract_offset                                       */
1271 /*                                                                        */
1272 /*                                                                        */
1273 /*  CALLED BY                                                             */
1274 /*                                                                        */
1275 /*    _nxd_mqtt_process_publish                                           */
1276 /*                                                                        */
1277 /*  RELEASE HISTORY                                                       */
1278 /*                                                                        */
1279 /*    DATE              NAME                      DESCRIPTION             */
1280 /*                                                                        */
1281 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1282 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1283 /*                                            resulting in version 6.1    */
1284 /*                                                                        */
1285 /**************************************************************************/
_nxd_mqtt_process_publish_packet(NX_PACKET * packet_ptr,ULONG * topic_offset_ptr,USHORT * topic_length_ptr,ULONG * message_offset_ptr,ULONG * message_length_ptr)1286 UINT _nxd_mqtt_process_publish_packet(NX_PACKET *packet_ptr, ULONG *topic_offset_ptr, USHORT *topic_length_ptr,
1287                                       ULONG *message_offset_ptr, ULONG *message_length_ptr)
1288 {
1289 UCHAR  QoS;
1290 UINT   remaining_length = 0;
1291 UINT   topic_length;
1292 ULONG  offset;
1293 UCHAR  bytes[2];
1294 ULONG  bytes_copied;
1295 
1296 
1297     QoS = (UCHAR)((*(packet_ptr -> nx_packet_prepend_ptr) & MQTT_PUBLISH_QOS_LEVEL_FIELD) >> 1);
1298 
1299     if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1300     {
1301         return(NXD_MQTT_INVALID_PACKET);
1302     }
1303 
1304     if (remaining_length < 2)
1305     {
1306         return(NXD_MQTT_INVALID_PACKET);
1307     }
1308 
1309     /* Get topic length fields. */
1310     if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1311         (bytes_copied != sizeof(bytes)))
1312     {
1313         return(NXD_MQTT_INVALID_PACKET);
1314     }
1315 
1316     topic_length = (UINT)(*(bytes) << 8) | (*(bytes + 1));
1317 
1318     if (topic_length > remaining_length - 2u)
1319     {
1320         return(NXD_MQTT_INVALID_PACKET);
1321     }
1322 
1323     *topic_offset_ptr = offset + 2;
1324     *topic_length_ptr = (USHORT)topic_length;
1325 
1326     remaining_length = remaining_length - topic_length - 2;
1327     if ((QoS == 1) || (QoS == 2))
1328     {
1329         offset += 2 + 2 + topic_length;
1330 
1331         if (remaining_length < 2)
1332         {
1333             return(NXD_MQTT_INVALID_PACKET);
1334         }
1335         remaining_length = remaining_length - 2;
1336     }
1337     else
1338     {
1339         offset += 2 + topic_length;
1340     }
1341 
1342     *message_offset_ptr = offset;
1343     *message_length_ptr = (ULONG)remaining_length;
1344 
1345     /* Return */
1346     return(NXD_MQTT_SUCCESS);
1347 }
1348 /**************************************************************************/
1349 /*                                                                        */
1350 /*  FUNCTION                                               RELEASE        */
1351 /*                                                                        */
1352 /*    _nxd_mqtt_process_publish                           PORTABLE C      */
1353 /*                                                           6.3.0        */
1354 /*  AUTHOR                                                                */
1355 /*                                                                        */
1356 /*    Yuxin Zhou, Microsoft Corporation                                   */
1357 /*                                                                        */
1358 /*  DESCRIPTION                                                           */
1359 /*                                                                        */
1360 /*    This internal function process a publish message from the broker.   */
1361 /*                                                                        */
1362 /*                                                                        */
1363 /*  INPUT                                                                 */
1364 /*                                                                        */
1365 /*    client_ptr                            Pointer to MQTT Client        */
1366 /*    packet_ptr                            Pointer to the packet         */
1367 /*                                                                        */
1368 /*  OUTPUT                                                                */
1369 /*                                                                        */
1370 /*    NX_TRUE - packet is consumed                                        */
1371 /*    NX_FALSE - packet is not consumed                                   */
1372 /*                                                                        */
1373 /*  CALLS                                                                 */
1374 /*                                                                        */
1375 /*    [receive_notify]                      User supplied receive         */
1376 /*                                            callback function           */
1377 /*    _nxd_mqtt_client_packet_allocate                                    */
1378 /*    _nxd_mqtt_packet_send                                               */
1379 /*    nx_packet_release                                                   */
1380 /*    nx_secure_tls_session_send                                          */
1381 /*    _nxd_mqtt_process_publish_packet                                    */
1382 /*    _nxd_mqtt_copy_transmit_packet                                      */
1383 /*                                                                        */
1384 /*                                                                        */
1385 /*  CALLED BY                                                             */
1386 /*                                                                        */
1387 /*    _nxd_mqtt_packet_receive_process                                    */
1388 /*                                                                        */
1389 /*  RELEASE HISTORY                                                       */
1390 /*                                                                        */
1391 /*    DATE              NAME                      DESCRIPTION             */
1392 /*                                                                        */
1393 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1394 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1395 /*                                            resulting in version 6.1    */
1396 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
1397 /*                                            the logic of sending packet,*/
1398 /*                                            resulting in version 6.2.0  */
1399 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
1400 /*                                            internal logic,             */
1401 /*                                            resulting in version 6.3.0  */
1402 /*                                                                        */
1403 /**************************************************************************/
_nxd_mqtt_process_publish(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1404 static UINT _nxd_mqtt_process_publish(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1405 {
1406 MQTT_PACKET_PUBLISH_RESPONSE *pubresp_ptr;
1407 UINT                          status;
1408 USHORT                        packet_id = 0;
1409 UCHAR                         QoS;
1410 UINT                          enqueue_message = 0;
1411 NX_PACKET                    *transmit_packet_ptr;
1412 UINT                          remaining_length = 0;
1413 UINT                          packet_consumed = NX_FALSE;
1414 UCHAR                         fixed_header;
1415 USHORT                        transmit_packet_id;
1416 UINT                          topic_length;
1417 ULONG                         offset;
1418 UCHAR                         bytes[2];
1419 ULONG                         bytes_copied;
1420 
1421     QoS = (UCHAR)((*(packet_ptr -> nx_packet_prepend_ptr) & MQTT_PUBLISH_QOS_LEVEL_FIELD) >> 1);
1422 
1423     if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1424     {
1425         return(NX_FALSE);
1426     }
1427 
1428     if (remaining_length < 2)
1429     {
1430         return(NXD_MQTT_INVALID_PACKET);
1431     }
1432 
1433     /* Get topic length fields. */
1434     if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1435         (bytes_copied != sizeof(bytes)))
1436     {
1437         return(NXD_MQTT_INVALID_PACKET);
1438     }
1439 
1440     topic_length = (UINT)(*(bytes) << 8) | (*(bytes + 1));
1441 
1442     if (topic_length > remaining_length - 2u)
1443     {
1444         return(NXD_MQTT_INVALID_PACKET);
1445     }
1446 
1447     if (QoS == 0)
1448     {
1449         enqueue_message = 1;
1450     }
1451     else
1452     {
1453         /* QoS 1 or QoS 2 messages. */
1454         /* Get packet id fields. */
1455         if (nx_packet_data_extract_offset(packet_ptr, offset + 2 + topic_length, &bytes, sizeof(bytes), &bytes_copied))
1456         {
1457             return(NXD_MQTT_INVALID_PACKET);
1458         }
1459 
1460         packet_id = (USHORT)(((*bytes) << 8) | (*(bytes + 1)));
1461 
1462         /* Look for an existing transmit packets with the same packet id */
1463         transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1464 
1465         while (transmit_packet_ptr)
1466         {
1467             fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1468             transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1469             if ((transmit_packet_id == packet_id) &&
1470                 ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_PUBREC << 4)))
1471             {
1472 
1473                 /* Found a packet containing the packet_id */
1474                 break;
1475             }
1476             transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1477         }
1478 
1479         if (transmit_packet_ptr)
1480         {
1481             /* This published data is already in our system.  No need to deliver this message to the application. */
1482             enqueue_message = 0;
1483         }
1484         else
1485         {
1486             enqueue_message = 1;
1487         }
1488     }
1489 
1490     if (enqueue_message)
1491     {
1492         if (packet_ptr -> nx_packet_length > (offset + remaining_length))
1493         {
1494 
1495             /* This packet contains multiple messages. */
1496             if (nx_packet_copy(packet_ptr, &packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_NO_WAIT))
1497             {
1498 
1499                 /* No packet is available. */
1500                 return(NX_FALSE);
1501             }
1502         }
1503         else
1504         {
1505             packet_consumed = NX_TRUE;
1506         }
1507 
1508         /* Increment the queue depth counter. */
1509         client_ptr -> message_receive_queue_depth++;
1510 
1511         if (client_ptr -> message_receive_queue_head == NX_NULL)
1512         {
1513             client_ptr -> message_receive_queue_head = packet_ptr;
1514         }
1515         else
1516         {
1517             client_ptr -> message_receive_queue_tail -> nx_packet_queue_next = packet_ptr;
1518         }
1519         client_ptr -> message_receive_queue_tail = packet_ptr;
1520 
1521         /* Invoke the user-defined receive notify function if it is set. */
1522         if (client_ptr -> nxd_mqtt_client_receive_notify)
1523         {
1524             (*(client_ptr -> nxd_mqtt_client_receive_notify))(client_ptr, client_ptr -> message_receive_queue_depth);
1525         }
1526     }
1527 
1528     /* If the message QoS level is 0, we are done. */
1529     if (QoS == 0)
1530     {
1531         /* Return */
1532         return(packet_consumed);
1533     }
1534 
1535     /* Send out proper ACKs for QoS 1 and 2 messages. */
1536     /* Allocate a new packet so we can send out a response. */
1537     status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
1538     if (status)
1539     {
1540         /* Packet allocation fails. */
1541         return(packet_consumed);
1542     }
1543 
1544     /* Fill in the packet ID */
1545     pubresp_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)(packet_ptr -> nx_packet_prepend_ptr);
1546     pubresp_ptr -> mqtt_publish_response_packet_remaining_length = 2;
1547     pubresp_ptr -> mqtt_publish_response_packet_packet_identifier_msb = (UCHAR)(packet_id >> 8);
1548     pubresp_ptr -> mqtt_publish_response_packet_packet_identifier_lsb = (UCHAR)(packet_id & 0xFF);
1549 
1550     if (QoS == 1)
1551     {
1552 
1553         pubresp_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBACK << 4;
1554     }
1555     else
1556     {
1557         pubresp_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBREC << 4;
1558     }
1559 
1560     packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + sizeof(MQTT_PACKET_PUBLISH_RESPONSE);
1561     packet_ptr -> nx_packet_length = sizeof(MQTT_PACKET_PUBLISH_RESPONSE);
1562 
1563     if (QoS == 2)
1564     {
1565 
1566         /* Copy packet for checking duplicate publish packet. */
1567         if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
1568                                            packet_id, NX_FALSE, NX_WAIT_FOREVER))
1569         {
1570 
1571             /* Release the packet. */
1572             nx_packet_release(packet_ptr);
1573             return(packet_consumed);
1574         }
1575         if (client_ptr -> message_transmit_queue_head == NX_NULL)
1576         {
1577             client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
1578         }
1579         else
1580         {
1581             client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
1582         }
1583         client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
1584     }
1585 
1586     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1587 
1588     /* Send packet to server.  */
1589     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
1590 
1591     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
1592     if (status)
1593     {
1594 
1595         /* Release the packet. */
1596         nx_packet_release(packet_ptr);
1597     }
1598     else
1599     {
1600         /* Update the timeout value. */
1601         client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
1602     }
1603 
1604     /* Return */
1605     return(packet_consumed);
1606 }
1607 
1608 /**************************************************************************/
1609 /*                                                                        */
1610 /*  FUNCTION                                               RELEASE        */
1611 /*                                                                        */
1612 /*    _nxd_mqtt_process_publish_response                  PORTABLE C      */
1613 /*                                                           6.3.0        */
1614 /*  AUTHOR                                                                */
1615 /*                                                                        */
1616 /*    Yuxin Zhou, Microsoft Corporation                                   */
1617 /*                                                                        */
1618 /*  DESCRIPTION                                                           */
1619 /*                                                                        */
1620 /*    This internal function process a publish response messages.         */
1621 /*    Publish Response messages are: PUBACK, PUBREC, PUBREL               */
1622 /*                                                                        */
1623 /*  INPUT                                                                 */
1624 /*                                                                        */
1625 /*    client_ptr                            Pointer to MQTT Client        */
1626 /*    packet_ptr                            Pointer to the packet         */
1627 /*                                                                        */
1628 /*  OUTPUT                                                                */
1629 /*                                                                        */
1630 /*    Status                                                              */
1631 /*                                                                        */
1632 /*  CALLS                                                                 */
1633 /*                                                                        */
1634 /*    [nxd_mqtt_client_receive_notify]      User supplied publish         */
1635 /*                                            callback function           */
1636 /*    _nxd_mqtt_release_transmit_packet                                   */
1637 /*    _nxd_mqtt_packet_send                                               */
1638 /*    _nxd_mqtt_client_packet_allocate                                    */
1639 /*                                                                        */
1640 /*                                                                        */
1641 /*  CALLED BY                                                             */
1642 /*                                                                        */
1643 /*    _nxd_mqtt_packet_receive_process                                    */
1644 /*                                                                        */
1645 /*  RELEASE HISTORY                                                       */
1646 /*                                                                        */
1647 /*    DATE              NAME                      DESCRIPTION             */
1648 /*                                                                        */
1649 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1650 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
1651 /*                                            added ack receive notify,   */
1652 /*                                            resulting in version 6.1    */
1653 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
1654 /*                                            the logic of sending packet,*/
1655 /*                                            resulting in version 6.2.0  */
1656 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
1657 /*                                            internal logic,             */
1658 /*                                            resulting in version 6.3.0  */
1659 /*                                                                        */
1660 /**************************************************************************/
_nxd_mqtt_process_publish_response(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1661 static UINT _nxd_mqtt_process_publish_response(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1662 {
1663 MQTT_PACKET_PUBLISH_RESPONSE *response_ptr;
1664 USHORT                        packet_id;
1665 NX_PACKET                    *previous_packet_ptr;
1666 NX_PACKET                    *transmit_packet_ptr;
1667 NX_PACKET                    *response_packet;
1668 UINT                          ret;
1669 UCHAR                         fixed_header;
1670 USHORT                        transmit_packet_id;
1671 
1672     response_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)(packet_ptr -> nx_packet_prepend_ptr);
1673 
1674     /* Validate the packet. */
1675     if (response_ptr -> mqtt_publish_response_packet_remaining_length != 2)
1676     {
1677         /* Invalid remaining_length value. Return 1 so the caller can release
1678            the packet. */
1679 
1680         return(1);
1681     }
1682 
1683     packet_id = (USHORT)((response_ptr -> mqtt_publish_response_packet_packet_identifier_msb << 8) |
1684                          (response_ptr -> mqtt_publish_response_packet_packet_identifier_lsb));
1685 
1686     /* Search all the outstanding transmitted packets for a match. */
1687     previous_packet_ptr = NX_NULL;
1688     transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1689     while (transmit_packet_ptr)
1690     {
1691         fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1692         transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1693         if (transmit_packet_id == packet_id)
1694         {
1695 
1696             /* Found the matching packet id */
1697             if (((response_ptr -> mqtt_publish_response_packet_header) >> 4) == MQTT_CONTROL_PACKET_TYPE_PUBACK)
1698             {
1699 
1700                 /* PUBACK is the response to a PUBLISH packet with QoS Level 1*/
1701                 /* Therefore we verify that packet contains PUBLISH packet with QoS level 1*/
1702                 if ((fixed_header & 0xF6) == ((MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4) | MQTT_PUBLISH_QOS_LEVEL_1))
1703                 {
1704 
1705                     /* Check ack notify function.  */
1706                     if (client_ptr -> nxd_mqtt_ack_receive_notify)
1707                     {
1708 
1709                         /* Call notify function. Note: user routine should not release the packet.  */
1710                         client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_PUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1711                     }
1712 
1713                     /* QoS Level1 message receives an ACK. */
1714                     /* This message can be released. */
1715                     _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1716 
1717                     /* Return with value 1, so the caller will release packet_ptr */
1718                     return(1);
1719                 }
1720             }
1721             else if (((response_ptr -> mqtt_publish_response_packet_header) >> 4) == MQTT_CONTROL_PACKET_TYPE_PUBREL)
1722             {
1723 
1724                 /* QoS 2 publish Release received, part 2. */
1725                 /* Therefore we verify that packet contains PUBLISH packet with QoS level 2*/
1726                 if ((fixed_header & 0xF6) == (MQTT_CONTROL_PACKET_TYPE_PUBREC << 4))
1727                 {
1728 
1729                     /* QoS Level2 message receives an ACK. */
1730                     /* This message can be released. */
1731                     /* Send PUBCOMP */
1732 
1733                     /* Allocate a packet to send the response. */
1734                     ret = _nxd_mqtt_client_packet_allocate(client_ptr, &response_packet, NX_WAIT_FOREVER);
1735                     if (ret)
1736                     {
1737                         return(1);
1738                     }
1739 
1740                     if (4u > ((ULONG)(response_packet -> nx_packet_data_end) - (ULONG)(response_packet -> nx_packet_append_ptr)))
1741                     {
1742                         nx_packet_release(response_packet);
1743 
1744                         /* Packet buffer is too small to hold the message. */
1745                         return(NX_SIZE_ERROR);
1746                     }
1747 
1748                     response_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)response_packet -> nx_packet_prepend_ptr;
1749 
1750                     response_ptr ->  mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBCOMP << 4;
1751                     response_ptr ->  mqtt_publish_response_packet_remaining_length = 2;
1752 
1753                     /* Fill in packet ID */
1754                     response_packet -> nx_packet_prepend_ptr[3] = packet_ptr -> nx_packet_prepend_ptr[3];
1755                     response_packet -> nx_packet_prepend_ptr[4] = packet_ptr -> nx_packet_prepend_ptr[4];
1756                     response_packet -> nx_packet_append_ptr = response_packet -> nx_packet_prepend_ptr + 4;
1757                     response_packet -> nx_packet_length = 4;
1758 
1759                     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1760 
1761                     /* Send packet to server.  */
1762                     ret = _nxd_mqtt_packet_send(client_ptr, response_packet, NX_WAIT_FOREVER);
1763 
1764                     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
1765 
1766                     /* Update the timeout value. */
1767                     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
1768 
1769                     if (ret)
1770                     {
1771                         nx_packet_release(response_packet);
1772                     }
1773 
1774                     /* Check ack notify function.  */
1775                     if (client_ptr -> nxd_mqtt_ack_receive_notify)
1776                     {
1777 
1778                         /* Call notify function. Note: user routine should not release the packet.  */
1779                         client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_PUBREL, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1780                     }
1781 
1782                     /* This packet can be released. */
1783                     _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1784 
1785                     /* Return with value 1, so the caller will release packet_ptr */
1786                     return(1);
1787                 }
1788             }
1789         }
1790 
1791         /* Move on to the next packet */
1792         previous_packet_ptr = transmit_packet_ptr;
1793         transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1794     }
1795 
1796     /* nothing is found.  Return 1 to release the packet.*/
1797     return(1);
1798 }
1799 
1800 /**************************************************************************/
1801 /*                                                                        */
1802 /*  FUNCTION                                               RELEASE        */
1803 /*                                                                        */
1804 /*    _nxd_mqtt_process_sub_unsub_ack                     PORTABLE C      */
1805 /*                                                           6.1          */
1806 /*  AUTHOR                                                                */
1807 /*                                                                        */
1808 /*    Yuxin Zhou, Microsoft Corporation                                   */
1809 /*                                                                        */
1810 /*  DESCRIPTION                                                           */
1811 /*                                                                        */
1812 /*    This internal function process an ACK message for subscribe         */
1813 /*    or unsubscribe request.                                             */
1814 /*                                                                        */
1815 /*  INPUT                                                                 */
1816 /*                                                                        */
1817 /*    client_ptr                            Pointer to MQTT Client        */
1818 /*    packet_ptr                            Pointer to the packet         */
1819 /*                                                                        */
1820 /*  OUTPUT                                                                */
1821 /*                                                                        */
1822 /*    Status                                                              */
1823 /*                                                                        */
1824 /*  CALLS                                                                 */
1825 /*                                                                        */
1826 /*    [nxd_mqtt_client_receive_notify]      User supplied publish         */
1827 /*                                            callback function           */
1828 /*    _nxd_mqtt_release_transmit_packet                                   */
1829 /*                                          Release the memory block      */
1830 /*    _nxd_mqtt_read_remaining_length       Skip the remaining length     */
1831 /*                                            field                       */
1832 /*                                                                        */
1833 /*  CALLED BY                                                             */
1834 /*                                                                        */
1835 /*    _nxd_mqtt_packet_receive_process                                    */
1836 /*                                                                        */
1837 /*  RELEASE HISTORY                                                       */
1838 /*                                                                        */
1839 /*    DATE              NAME                      DESCRIPTION             */
1840 /*                                                                        */
1841 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1842 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
1843 /*                                            added ack receive notify,   */
1844 /*                                            resulting in version 6.1    */
1845 /*                                                                        */
1846 /**************************************************************************/
_nxd_mqtt_process_sub_unsub_ack(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1847 static UINT _nxd_mqtt_process_sub_unsub_ack(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1848 {
1849 
1850 USHORT     packet_id;
1851 NX_PACKET *previous_packet_ptr;
1852 NX_PACKET *transmit_packet_ptr;
1853 UCHAR      response_header;
1854 UCHAR      fixed_header;
1855 USHORT     transmit_packet_id;
1856 UINT       remaining_length;
1857 ULONG      offset;
1858 UCHAR      bytes[2];
1859 ULONG      bytes_copied;
1860 
1861 
1862     response_header = *(packet_ptr -> nx_packet_prepend_ptr);
1863 
1864     if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1865     {
1866 
1867         /* Unable to process the sub/unsub ack.  Simply return and release the packet. */
1868         return(NXD_MQTT_INVALID_PACKET);
1869     }
1870 
1871     /* Get packet id fields. */
1872     if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1873         (bytes_copied != sizeof(bytes)))
1874     {
1875         return(NXD_MQTT_INVALID_PACKET);
1876     }
1877 
1878     packet_id = (USHORT)(((*bytes) << 8) | (*(bytes + 1)));
1879 
1880     /* Search all the outstanding transmitted packets for a match. */
1881     previous_packet_ptr = NX_NULL;
1882     transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1883     while (transmit_packet_ptr)
1884     {
1885         fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1886         transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1887         if (transmit_packet_id == packet_id)
1888         {
1889 
1890             /* Found the matching packet id */
1891             if (((response_header >> 4) == MQTT_CONTROL_PACKET_TYPE_SUBACK) &&
1892                 ((fixed_header >> 4) == MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE))
1893             {
1894                 /* Validate the packet. */
1895                 if (remaining_length != 3)
1896                 {
1897                     /* Invalid remaining_length value. */
1898                     return(1);
1899                 }
1900 
1901                 /* Check ack notify function.  */
1902                 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1903                 {
1904 
1905                     /* Call notify function. Note: user routine should not release the packet.  */
1906                     client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_SUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1907                 }
1908 
1909                 /* Release the transmit packet. */
1910                 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1911 
1912                 return(1);
1913             }
1914             else if (((response_header >> 4) == MQTT_CONTROL_PACKET_TYPE_UNSUBACK) &&
1915                      ((fixed_header >> 4) == MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE))
1916             {
1917                 /* Validate the packet. */
1918                 if (remaining_length != 2)
1919                 {
1920                     /* Invalid remaining_length value. */
1921                     return(1);
1922                 }
1923 
1924                 /* Check ack notify function.  */
1925                 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1926                 {
1927 
1928                     /* Call notify function. Note: user routine should not release the packet.  */
1929                     client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_UNSUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1930                 }
1931 
1932                 /* Unsubscribe succeeded. */
1933                 /* Release the transmit packet. */
1934                 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1935 
1936                 return(1);
1937             }
1938         }
1939 
1940         /* Move on to the next packet */
1941         previous_packet_ptr = transmit_packet_ptr;
1942         transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1943     }
1944     return(1);
1945 }
1946 
1947 
1948 /**************************************************************************/
1949 /*                                                                        */
1950 /*  FUNCTION                                               RELEASE        */
1951 /*                                                                        */
1952 /*    _nxd_mqtt_process_pingresp                          PORTABLE C      */
1953 /*                                                           6.1          */
1954 /*  AUTHOR                                                                */
1955 /*                                                                        */
1956 /*    Yuxin Zhou, Microsoft Corporation                                   */
1957 /*                                                                        */
1958 /*  DESCRIPTION                                                           */
1959 /*                                                                        */
1960 /*    This internal function process a PINGRESP message.                  */
1961 /*                                                                        */
1962 /*  INPUT                                                                 */
1963 /*                                                                        */
1964 /*    client_ptr                            Pointer to MQTT Client        */
1965 /*    packet_ptr                            Pointer to the packet         */
1966 /*                                                                        */
1967 /*  OUTPUT                                                                */
1968 /*                                                                        */
1969 /*    Status                                                              */
1970 /*                                                                        */
1971 /*  CALLS                                                                 */
1972 /*                                                                        */
1973 /*    None                                                                */
1974 /*                                            callback function           */
1975 /*                                                                        */
1976 /*  CALLED BY                                                             */
1977 /*                                                                        */
1978 /*    _nxd_mqtt_packet_receive_process                                    */
1979 /*                                                                        */
1980 /*  RELEASE HISTORY                                                       */
1981 /*                                                                        */
1982 /*    DATE              NAME                      DESCRIPTION             */
1983 /*                                                                        */
1984 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1985 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1986 /*                                            resulting in version 6.1    */
1987 /*                                                                        */
1988 /**************************************************************************/
_nxd_mqtt_process_pingresp(NXD_MQTT_CLIENT * client_ptr)1989 static VOID _nxd_mqtt_process_pingresp(NXD_MQTT_CLIENT *client_ptr)
1990 {
1991 
1992 
1993     /* If there is an outstanding ping, mark it as responded. */
1994     if (client_ptr -> nxd_mqtt_ping_not_responded == NX_TRUE)
1995     {
1996         client_ptr -> nxd_mqtt_ping_not_responded = NX_FALSE;
1997 
1998         client_ptr -> nxd_mqtt_ping_sent_time = 0;
1999     }
2000 
2001     return;
2002 }
2003 
2004 
2005 /**************************************************************************/
2006 /*                                                                        */
2007 /*  FUNCTION                                               RELEASE        */
2008 /*                                                                        */
2009 /*    _nxd_mqtt_process_disconnect                        PORTABLE C      */
2010 /*                                                           6.1          */
2011 /*  AUTHOR                                                                */
2012 /*                                                                        */
2013 /*    Yuxin Zhou, Microsoft Corporation                                   */
2014 /*                                                                        */
2015 /*  DESCRIPTION                                                           */
2016 /*                                                                        */
2017 /*    This internal function process a DISCONNECT message.                */
2018 /*                                                                        */
2019 /*  INPUT                                                                 */
2020 /*                                                                        */
2021 /*    client_ptr                            Pointer to MQTT Client        */
2022 /*    packet_ptr                            Pointer to the packet         */
2023 /*                                                                        */
2024 /*  OUTPUT                                                                */
2025 /*                                                                        */
2026 /*    None                                                                */
2027 /*                                                                        */
2028 /*  CALLS                                                                 */
2029 /*                                                                        */
2030 /*   nx_secure_tls_session_send                                           */
2031 /*   nx_tcp_socket_disconnect                                             */
2032 /*   nx_tcp_client_socket_unbind                                          */
2033 /*   _nxd_mqtt_release_transmit_packet                                    */
2034 /*   _nxd_mqtt_release_receive_packet                                     */
2035 /*   _nxd_mqtt_client_connection_end                                      */
2036 /*                                                                        */
2037 /*  CALLED BY                                                             */
2038 /*                                                                        */
2039 /*   _nxd_mqtt_packet_receive_process                                     */
2040 /*                                                                        */
2041 /*  RELEASE HISTORY                                                       */
2042 /*                                                                        */
2043 /*    DATE              NAME                      DESCRIPTION             */
2044 /*                                                                        */
2045 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2046 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2047 /*                                            resulting in version 6.1    */
2048 /*                                                                        */
2049 /**************************************************************************/
_nxd_mqtt_process_disconnect(NXD_MQTT_CLIENT * client_ptr)2050 static VOID _nxd_mqtt_process_disconnect(NXD_MQTT_CLIENT *client_ptr)
2051 {
2052 NX_PACKET  *previous = NX_NULL;
2053 NX_PACKET  *current;
2054 NX_PACKET  *next;
2055 UINT        disconnect_callback = NX_FALSE;
2056 UINT        status;
2057 UCHAR       fixed_header;
2058 
2059     if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2060     {
2061         /* State changes from CONNECTED TO IDLE.  Call disconnect notify callback
2062            if the function is set. */
2063         disconnect_callback = NX_TRUE;
2064     }
2065     else if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTING)
2066     {
2067 
2068         /* If state isn't CONNECTED or CONNECTING, just return. */
2069         return;
2070     }
2071 
2072     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2073 
2074     /* End connection. */
2075     _nxd_mqtt_client_connection_end(client_ptr, NXD_MQTT_SOCKET_TIMEOUT);
2076 
2077     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
2078 
2079     /* Free up sub/unsub packets on the transmit queue. */
2080     current = client_ptr -> message_transmit_queue_head;
2081 
2082     while (current)
2083     {
2084         next = current -> nx_packet_queue_next;
2085         fixed_header = *(current -> nx_packet_prepend_ptr);
2086 
2087         if (((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4)) ||
2088             ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE << 4)))
2089         {
2090             _nxd_mqtt_release_transmit_packet(client_ptr, current, previous);
2091         }
2092         else
2093         {
2094             previous = current;
2095         }
2096         current = next;
2097     }
2098 
2099     /* If a callback notification is defined, call it now. */
2100     if ((disconnect_callback == NX_TRUE) && (client_ptr -> nxd_mqtt_disconnect_notify))
2101     {
2102         client_ptr -> nxd_mqtt_disconnect_notify(client_ptr);
2103     }
2104 
2105     /* If a connect callback notification is defined and is still in connecting stage, call it now. */
2106     if ((disconnect_callback == NX_FALSE) && (client_ptr -> nxd_mqtt_connect_notify))
2107     {
2108         client_ptr -> nxd_mqtt_connect_notify(client_ptr, NXD_MQTT_CONNECT_FAILURE, client_ptr -> nxd_mqtt_connect_context);
2109     }
2110 
2111     if (status == TX_SUCCESS)
2112     {
2113         /* Remove all the packets in the receive queue. */
2114         while (client_ptr -> message_receive_queue_head)
2115         {
2116             _nxd_mqtt_release_receive_packet(client_ptr, client_ptr -> message_receive_queue_head, NX_NULL);
2117         }
2118         client_ptr -> message_receive_queue_depth = 0;
2119 
2120         /* Clear the MQTT_PACKET_RECEIVE_EVENT */
2121 #ifndef NXD_MQTT_CLOUD_ENABLE
2122         tx_event_flags_set(&client_ptr -> nxd_mqtt_events, ~MQTT_PACKET_RECEIVE_EVENT, TX_AND);
2123 #else
2124         nx_cloud_module_event_clear(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PACKET_RECEIVE_EVENT);
2125 #endif /* NXD_MQTT_CLOUD_ENABLE */
2126     }
2127 
2128     /* Clear flags if keep alive is enabled. */
2129     if (client_ptr -> nxd_mqtt_keepalive)
2130     {
2131         client_ptr -> nxd_mqtt_ping_not_responded = 0;
2132         client_ptr -> nxd_mqtt_ping_sent_time = 0;
2133     }
2134 
2135     /* Clean up the information when disconnecting. */
2136     client_ptr -> nxd_mqtt_client_username = NX_NULL;
2137     client_ptr -> nxd_mqtt_client_password = NX_NULL;
2138     client_ptr -> nxd_mqtt_client_will_topic = NX_NULL;
2139     client_ptr -> nxd_mqtt_client_will_message = NX_NULL;
2140     client_ptr -> nxd_mqtt_client_will_qos_retain = 0;
2141 
2142     /* Release current processing packet. */
2143     if (client_ptr -> nxd_mqtt_client_processing_packet)
2144     {
2145         nx_packet_release(client_ptr -> nxd_mqtt_client_processing_packet);
2146         client_ptr -> nxd_mqtt_client_processing_packet = NX_NULL;
2147     }
2148 
2149     return;
2150 }
2151 
2152 
2153 /**************************************************************************/
2154 /*                                                                        */
2155 /*  FUNCTION                                               RELEASE        */
2156 /*                                                                        */
2157 /*    _nxd_mqtt_packet_receive_process                    PORTABLE C      */
2158 /*                                                           6.2.0        */
2159 /*  AUTHOR                                                                */
2160 /*                                                                        */
2161 /*    Yuxin Zhou, Microsoft Corporation                                   */
2162 /*                                                                        */
2163 /*  DESCRIPTION                                                           */
2164 /*                                                                        */
2165 /*    This internal function process MQTT message.                        */
2166 /*    NOTE: MQTT Mutex is NOT obtained on entering this function.         */
2167 /*    Therefore it shouldn't hold the mutex when it exists this function. */
2168 /*                                                                        */
2169 /*  INPUT                                                                 */
2170 /*                                                                        */
2171 /*    client_ptr                            Pointer to MQTT Client        */
2172 /*                                                                        */
2173 /*  OUTPUT                                                                */
2174 /*                                                                        */
2175 /*    None                                                                */
2176 /*                                                                        */
2177 /*  CALLS                                                                 */
2178 /*                                                                        */
2179 /*    nx_secure_tls_session_receive                                       */
2180 /*    nx_tcp_socket_receive                                               */
2181 /*    _nxd_mqtt_process_publish                                           */
2182 /*    _nxd_mqtt_process_publish_response                                  */
2183 /*    _nxd_mqtt_process_sub_unsub_ack                                     */
2184 /*    _nxd_mqtt_process_pingresp                                          */
2185 /*    _nxd_mqtt_process_disconnect                                        */
2186 /*    nx_packet_release                                                   */
2187 /*                                                                        */
2188 /*  CALLED BY                                                             */
2189 /*                                                                        */
2190 /*    _nxd_mqtt_client_event_process                                      */
2191 /*                                                                        */
2192 /*  RELEASE HISTORY                                                       */
2193 /*                                                                        */
2194 /*    DATE              NAME                      DESCRIPTION             */
2195 /*                                                                        */
2196 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2197 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2198 /*                                            resulting in version 6.1    */
2199 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
2200 /*                                            mqtt over websocket,        */
2201 /*                                            resulting in version 6.2.0  */
2202 /*                                                                        */
2203 /**************************************************************************/
_nxd_mqtt_packet_receive_process(NXD_MQTT_CLIENT * client_ptr)2204 static VOID _nxd_mqtt_packet_receive_process(NXD_MQTT_CLIENT *client_ptr)
2205 {
2206 NX_PACKET *packet_ptr;
2207 NX_PACKET *previous_packet_ptr;
2208 UINT       status;
2209 UCHAR      packet_type;
2210 UINT       remaining_length;
2211 UINT       packet_consumed;
2212 ULONG      offset;
2213 ULONG      bytes_copied;
2214 ULONG      packet_length;
2215 
2216     for (;;)
2217     {
2218 
2219         /* Release the mutex. */
2220         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2221 
2222         /* Make a receive call. */
2223         status = _nxd_mqtt_packet_receive(client_ptr, &packet_ptr, NX_NO_WAIT);
2224 
2225         if (status != NX_SUCCESS)
2226         {
2227             if ((status != NX_NO_PACKET) && (status != NX_CONTINUE))
2228             {
2229 
2230                 /* Network issue. Close the MQTT session. */
2231 #ifndef NXD_MQTT_CLOUD_ENABLE
2232                 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_NETWORK_DISCONNECT_EVENT, TX_OR);
2233 #else
2234                 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_NETWORK_DISCONNECT_EVENT);
2235 #endif /* NXD_MQTT_CLOUD_ENABLE */
2236             }
2237 
2238             break;
2239         }
2240 
2241         /* Obtain the mutex. */
2242         tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
2243 
2244         /* Is there a packet waiting for processing? */
2245         if (client_ptr -> nxd_mqtt_client_processing_packet)
2246         {
2247 
2248             /* Yes. Link received packet to existing one. */
2249             if (client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last)
2250             {
2251                 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last -> nx_packet_next = packet_ptr;
2252             }
2253             else
2254             {
2255                 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_next = packet_ptr;
2256             }
2257             if (packet_ptr -> nx_packet_last)
2258             {
2259                 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last = packet_ptr -> nx_packet_last;
2260             }
2261             else
2262             {
2263                 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last = packet_ptr;
2264             }
2265             client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_length += packet_ptr -> nx_packet_length;
2266 
2267             /* Start to process existing packet. */
2268             packet_ptr = client_ptr -> nxd_mqtt_client_processing_packet;
2269             client_ptr -> nxd_mqtt_client_processing_packet = NX_NULL;
2270         }
2271 
2272         /* Check notify function.  */
2273         if (client_ptr -> nxd_mqtt_packet_receive_notify)
2274         {
2275 
2276             /* Call notify function. Return NX_TRUE if the packet has been consumed.  */
2277             if (client_ptr -> nxd_mqtt_packet_receive_notify(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_packet_receive_context) == NX_TRUE)
2278             {
2279                 continue;
2280             }
2281         }
2282 
2283         packet_consumed = NX_FALSE;
2284         while (packet_ptr)
2285         {
2286             /* Parse the incoming packet. */
2287             status = _nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset);
2288             if (status == NXD_MQTT_PARTIAL_PACKET)
2289             {
2290 
2291                 /* We only have partial MQTT message.
2292                  * Put it to waiting list for more packets. */
2293                 client_ptr -> nxd_mqtt_client_processing_packet = packet_ptr;
2294                 packet_consumed = NX_TRUE;
2295                 break;
2296             }
2297             else if (status)
2298             {
2299 
2300                 /* Invalid packet. */
2301                 break;
2302             }
2303 
2304             /* Get packet type. */
2305             if (nx_packet_data_extract_offset(packet_ptr, 0, &packet_type, 1, &bytes_copied))
2306             {
2307 
2308                 /* Unable to read packet type. */
2309                 break;
2310             }
2311 
2312             /* Right shift 4 bits to get the packet type. */
2313             packet_type = packet_type >> 4;
2314 
2315             /* Process based on packet type. */
2316             switch (packet_type)
2317             {
2318             case MQTT_CONTROL_PACKET_TYPE_CONNECT:
2319                 /* Client does not accept connections.  Nothing needs to be done. */
2320                 break;
2321             case MQTT_CONTROL_PACKET_TYPE_CONNACK:
2322                 _nxd_mqtt_process_connack(client_ptr, packet_ptr, NX_NO_WAIT);
2323                 break;
2324 
2325             case MQTT_CONTROL_PACKET_TYPE_PUBLISH:
2326                 packet_consumed = _nxd_mqtt_process_publish(client_ptr, packet_ptr);
2327                 break;
2328 
2329             case MQTT_CONTROL_PACKET_TYPE_PUBACK:
2330             case MQTT_CONTROL_PACKET_TYPE_PUBREL:
2331                 _nxd_mqtt_process_publish_response(client_ptr, packet_ptr);
2332                 break;
2333 
2334             case MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE:
2335             case MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE:
2336                 /* Client should not process subscribe or unsubscribe message. */
2337                 break;
2338 
2339             case MQTT_CONTROL_PACKET_TYPE_SUBACK:
2340             case MQTT_CONTROL_PACKET_TYPE_UNSUBACK:
2341                 _nxd_mqtt_process_sub_unsub_ack(client_ptr, packet_ptr);
2342                 break;
2343 
2344 
2345             case MQTT_CONTROL_PACKET_TYPE_PINGREQ:
2346                 /* Client is not supposed to receive ping req.  Ignore it. */
2347                 break;
2348 
2349             case MQTT_CONTROL_PACKET_TYPE_PINGRESP:
2350                 _nxd_mqtt_process_pingresp(client_ptr);
2351                 break;
2352 
2353             case MQTT_CONTROL_PACKET_TYPE_DISCONNECT:
2354                 _nxd_mqtt_process_disconnect(client_ptr);
2355                 break;
2356 
2357             /* Publisher sender message type for QoS 2. Not supported. */
2358             case MQTT_CONTROL_PACKET_TYPE_PUBREC:
2359             case MQTT_CONTROL_PACKET_TYPE_PUBCOMP:
2360             default:
2361                 /* Unknown type. */
2362                 break;
2363             }
2364 
2365             if (packet_consumed)
2366             {
2367                 break;
2368             }
2369 
2370             /* Trim current packet. */
2371             offset += remaining_length;
2372             packet_length = packet_ptr -> nx_packet_length;
2373             if (packet_length > offset)
2374             {
2375 
2376                 /* Multiple MQTT message in one packet. */
2377                 packet_length = packet_ptr -> nx_packet_length - offset;
2378                 while ((ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr) <= offset)
2379                 {
2380                     offset -= (ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr);
2381 
2382                     /* Current packet can be released. */
2383                     previous_packet_ptr = packet_ptr;
2384                     packet_ptr = packet_ptr -> nx_packet_next;
2385                     previous_packet_ptr -> nx_packet_next = NX_NULL;
2386                     nx_packet_release(previous_packet_ptr);
2387                     if (packet_ptr == NX_NULL)
2388                     {
2389 
2390                         /* Invalid packet. */
2391                         break;
2392                     }
2393                 }
2394 
2395                 if (packet_ptr)
2396                 {
2397 
2398                     /* Adjust current packet. */
2399                     packet_ptr -> nx_packet_prepend_ptr = packet_ptr -> nx_packet_prepend_ptr + offset;
2400                     packet_ptr -> nx_packet_length = packet_length;
2401                 }
2402             }
2403             else
2404             {
2405 
2406                 /* All messages in current packet is processed. */
2407                 break;
2408             }
2409         }
2410 
2411         if (!packet_consumed)
2412         {
2413             nx_packet_release(packet_ptr);
2414         }
2415     }
2416 
2417     /* No more data in the receive queue.  Return. */
2418 
2419     return;
2420 }
2421 
2422 
2423 /**************************************************************************/
2424 /*                                                                        */
2425 /*  FUNCTION                                               RELEASE        */
2426 /*                                                                        */
2427 /*    _nxd_mqtt_tcp_establish_process                     PORTABLE C      */
2428 /*                                                           6.2.0        */
2429 /*  AUTHOR                                                                */
2430 /*                                                                        */
2431 /*    Yuxin Zhou, Microsoft Corporation                                   */
2432 /*                                                                        */
2433 /*  DESCRIPTION                                                           */
2434 /*                                                                        */
2435 /*    This function processes MQTT TCP connection establish event.        */
2436 /*                                                                        */
2437 /*  INPUT                                                                 */
2438 /*                                                                        */
2439 /*    client_ptr                            Pointer to MQTT Client        */
2440 /*                                                                        */
2441 /*  OUTPUT                                                                */
2442 /*                                                                        */
2443 /*    None                                                                */
2444 /*                                                                        */
2445 /*  CALLS                                                                 */
2446 /*                                                                        */
2447 /*    nx_secure_tls_session_start                                         */
2448 /*    _nxd_mqtt_client_connection_end                                     */
2449 /*    _nxd_mqtt_client_connect_packet_send                                */
2450 /*                                                                        */
2451 /*  CALLED BY                                                             */
2452 /*                                                                        */
2453 /*    _nxd_mqtt_client_event_process                                      */
2454 /*                                                                        */
2455 /*  RELEASE HISTORY                                                       */
2456 /*                                                                        */
2457 /*    DATE              NAME                      DESCRIPTION             */
2458 /*                                                                        */
2459 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2460 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2461 /*                                            resulting in version 6.1    */
2462 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
2463 /*                                            mqtt over websocket,        */
2464 /*                                            resulting in version 6.2.0  */
2465 /*                                                                        */
2466 /**************************************************************************/
_nxd_mqtt_tcp_establish_process(NXD_MQTT_CLIENT * client_ptr)2467 static VOID _nxd_mqtt_tcp_establish_process(NXD_MQTT_CLIENT *client_ptr)
2468 {
2469 UINT       status;
2470 
2471 
2472     /* TCP connection is established.  */
2473 
2474     /* If TLS is enabled, start TLS */
2475 #ifdef NX_SECURE_ENABLE
2476     if (client_ptr -> nxd_mqtt_client_use_tls)
2477     {
2478         status = nx_secure_tls_session_start(&(client_ptr -> nxd_mqtt_tls_session), &(client_ptr -> nxd_mqtt_client_socket), NX_NO_WAIT);
2479 
2480         if (status != NX_CONTINUE)
2481         {
2482 
2483             /* End connection. */
2484             _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2485 
2486             /* Check callback function.  */
2487             if (client_ptr -> nxd_mqtt_connect_notify)
2488             {
2489                 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2490             }
2491 
2492             return;
2493         }
2494 
2495         /* TLS in progress.  */
2496         client_ptr -> nxd_mqtt_tls_in_progress = NX_TRUE;
2497 
2498         return;
2499     }
2500 #endif /* NX_SECURE_ENABLE */
2501 
2502 #ifdef NXD_MQTT_OVER_WEBSOCKET
2503 
2504     /* If using websocket, start websocket connection.  */
2505     if (client_ptr -> nxd_mqtt_client_use_websocket)
2506     {
2507         status = nx_websocket_client_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_client_socket),
2508                                              client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
2509                                              client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
2510                                              (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
2511                                              NX_NO_WAIT);
2512 
2513         if (status != NX_IN_PROGRESS)
2514         {
2515 
2516             /* End connection. */
2517             _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2518 
2519             /* Check callback function.  */
2520             if (client_ptr -> nxd_mqtt_connect_notify)
2521             {
2522                 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2523             }
2524 
2525             return;
2526         }
2527 
2528         return;
2529     }
2530 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2531 
2532     /* Start to send MQTT connect packet.  */
2533     status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
2534 
2535     /* Check status.  */
2536     if (status)
2537     {
2538 
2539         /* End connection. */
2540         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2541 
2542         /* Check callback function.  */
2543         if (client_ptr -> nxd_mqtt_connect_notify)
2544         {
2545             client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2546         }
2547     }
2548 }
2549 
2550 
2551 #ifdef NX_SECURE_ENABLE
2552 /**************************************************************************/
2553 /*                                                                        */
2554 /*  FUNCTION                                               RELEASE        */
2555 /*                                                                        */
2556 /*    _nxd_mqtt_tls_establish_process                     PORTABLE C      */
2557 /*                                                           6.2.0        */
2558 /*  AUTHOR                                                                */
2559 /*                                                                        */
2560 /*    Yuxin Zhou, Microsoft Corporation                                   */
2561 /*                                                                        */
2562 /*  DESCRIPTION                                                           */
2563 /*                                                                        */
2564 /*    This function processes TLS connection establish event.             */
2565 /*                                                                        */
2566 /*  INPUT                                                                 */
2567 /*                                                                        */
2568 /*    client_ptr                            Pointer to MQTT Client        */
2569 /*                                                                        */
2570 /*  OUTPUT                                                                */
2571 /*                                                                        */
2572 /*    None                                                                */
2573 /*                                                                        */
2574 /*  CALLS                                                                 */
2575 /*                                                                        */
2576 /*    _nx_secure_tls_handshake_process                                    */
2577 /*    _nxd_mqtt_client_connect_packet_send                                */
2578 /*    _nxd_mqtt_client_connection_end                                     */
2579 /*                                                                        */
2580 /*  CALLED BY                                                             */
2581 /*                                                                        */
2582 /*    _nxd_mqtt_client_event_process                                      */
2583 /*                                                                        */
2584 /*  RELEASE HISTORY                                                       */
2585 /*                                                                        */
2586 /*    DATE              NAME                      DESCRIPTION             */
2587 /*                                                                        */
2588 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2589 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2590 /*                                            resulting in version 6.1    */
2591 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
2592 /*                                            mqtt over websocket,        */
2593 /*                                            resulting in version 6.2.0  */
2594 /*                                                                        */
2595 /**************************************************************************/
_nxd_mqtt_tls_establish_process(NXD_MQTT_CLIENT * client_ptr)2596 static VOID _nxd_mqtt_tls_establish_process(NXD_MQTT_CLIENT *client_ptr)
2597 {
2598 UINT       status;
2599 
2600 
2601     /* Directly call handshake process for async mode. */
2602     status = _nx_secure_tls_handshake_process(&(client_ptr -> nxd_mqtt_tls_session), NX_NO_WAIT);
2603     if (status == NX_SUCCESS)
2604     {
2605 
2606         /* TLS session established.   */
2607         client_ptr -> nxd_mqtt_tls_in_progress = NX_FALSE;
2608 
2609 #ifdef NXD_MQTT_OVER_WEBSOCKET
2610 
2611         /* If using websocket, start websocket connection.  */
2612         if (client_ptr -> nxd_mqtt_client_use_websocket)
2613         {
2614             status = nx_websocket_client_secure_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_tls_session),
2615                                                         client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
2616                                                         client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
2617                                                         (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
2618                                                         NX_NO_WAIT);
2619 
2620             if (status == NX_IN_PROGRESS)
2621             {
2622                 return;
2623             }
2624         }
2625         else
2626 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2627         {
2628 
2629             /* Start to send MQTT connect packet.  */
2630             status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
2631         }
2632     }
2633     else if (status == NX_CONTINUE)
2634     {
2635         return;
2636     }
2637 
2638     /* Check status.  */
2639     if (status)
2640     {
2641 
2642         /* End connection. */
2643         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2644 
2645         /* Check callback function.  */
2646         if (client_ptr -> nxd_mqtt_connect_notify)
2647         {
2648             client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2649         }
2650     }
2651 
2652     return;
2653 }
2654 #endif /* NX_SECURE_ENABLE */
2655 
2656 /**************************************************************************/
2657 /*                                                                        */
2658 /*  FUNCTION                                               RELEASE        */
2659 /*                                                                        */
2660 /*    _nxd_mqtt_client_append_message                     PORTABLE C      */
2661 /*                                                           6.1          */
2662 /*  AUTHOR                                                                */
2663 /*                                                                        */
2664 /*    Yuxin Zhou, Microsoft Corporation                                   */
2665 /*                                                                        */
2666 /*  DESCRIPTION                                                           */
2667 /*                                                                        */
2668 /*    This function writes the message length and message in the outgoing */
2669 /*    MQTT packet.                                                        */
2670 /*                                                                        */
2671 /*  INPUT                                                                 */
2672 /*                                                                        */
2673 /*    client_ptr                            Pointer to MQTT Client        */
2674 /*    packet_ptr                            Outgoing MQTT packet          */
2675 /*    message                               Pointer to the message        */
2676 /*    length                                Length of the message         */
2677 /*    wait_option                           Wait option                   */
2678 /*                                                                        */
2679 /*  OUTPUT                                                                */
2680 /*                                                                        */
2681 /*    status                                                              */
2682 /*                                                                        */
2683 /*  CALLS                                                                 */
2684 /*                                                                        */
2685 /*    nx_packet_data_append                 Append packet data            */
2686 /*                                                                        */
2687 /*  CALLED BY                                                             */
2688 /*                                                                        */
2689 /*    _nxd_mqtt_client_sub_unsub                                          */
2690 /*    _nxd_mqtt_client_connect                                            */
2691 /*    _nxd_mqtt_client_publish                                            */
2692 /*                                                                        */
2693 /*  RELEASE HISTORY                                                       */
2694 /*                                                                        */
2695 /*    DATE              NAME                      DESCRIPTION             */
2696 /*                                                                        */
2697 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2698 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2699 /*                                            resulting in version 6.1    */
2700 /*                                                                        */
2701 /**************************************************************************/
_nxd_mqtt_client_append_message(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,CHAR * message,UINT length,ULONG wait_option)2702 UINT _nxd_mqtt_client_append_message(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, CHAR *message, UINT length, ULONG wait_option)
2703 {
2704 UINT ret = 0;
2705 UCHAR len[2];
2706 
2707     len[0] = (length >> 8) & 0xFF;
2708     len[1] = length  & 0xFF;
2709 
2710     /* Append message length field. */
2711     ret = nx_packet_data_append(packet_ptr, len, 2, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
2712 
2713     if (ret)
2714     {
2715         return(ret);
2716     }
2717 
2718     if (length)
2719     {
2720         /* Copy the string into the packet. */
2721         ret = nx_packet_data_append(packet_ptr, message, length,
2722                                     client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
2723     }
2724 
2725     return(ret);
2726 }
2727 
2728 
2729 /**************************************************************************/
2730 /*                                                                        */
2731 /*  FUNCTION                                               RELEASE        */
2732 /*                                                                        */
2733 /*    _nxd_mqtt_client_connection_end                     PORTABLE C      */
2734 /*                                                           6.2.0        */
2735 /*  AUTHOR                                                                */
2736 /*                                                                        */
2737 /*    Yuxin Zhou, Microsoft Corporation                                   */
2738 /*                                                                        */
2739 /*  DESCRIPTION                                                           */
2740 /*                                                                        */
2741 /*    This function is used to end the MQTT connection.                   */
2742 /*                                                                        */
2743 /*  INPUT                                                                 */
2744 /*                                                                        */
2745 /*    client_ptr                            Pointer to MQTT Client        */
2746 /*    wait_option                           Wait option                   */
2747 /*                                                                        */
2748 /*  OUTPUT                                                                */
2749 /*                                                                        */
2750 /*    None                                                                */
2751 /*                                                                        */
2752 /*  CALLS                                                                 */
2753 /*                                                                        */
2754 /*    nx_secure_tls_session_end             End TLS session               */
2755 /*    nx_secure_tls_session_delete          Delete TLS session            */
2756 /*    nx_tcp_socket_disconnect              Close TCP connection          */
2757 /*    nx_tcp_client_socket_unbind           Unbind TCP socket             */
2758 /*    tx_timer_delete                       Delete timer                  */
2759 /*                                                                        */
2760 /*  CALLED BY                                                             */
2761 /*                                                                        */
2762 /*    _nxd_mqtt_client_connect                                            */
2763 /*    _nxd_mqtt_process_disconnect                                        */
2764 /*                                                                        */
2765 /*  RELEASE HISTORY                                                       */
2766 /*                                                                        */
2767 /*    DATE              NAME                      DESCRIPTION             */
2768 /*                                                                        */
2769 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2770 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2771 /*                                            resulting in version 6.1    */
2772 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
2773 /*                                            mqtt over websocket,        */
2774 /*                                            resulting in version 6.2.0  */
2775 /*                                                                        */
2776 /**************************************************************************/
_nxd_mqtt_client_connection_end(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)2777 VOID _nxd_mqtt_client_connection_end(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
2778 {
2779 
2780     /* Obtain the mutex. */
2781     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
2782 
2783     /* Mark the session as terminated. */
2784     client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
2785 
2786     /* Release the mutex. */
2787     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2788 
2789 #ifdef NXD_MQTT_OVER_WEBSOCKET
2790     if (client_ptr -> nxd_mqtt_client_use_websocket)
2791     {
2792         nx_websocket_client_disconnect(&(client_ptr -> nxd_mqtt_client_websocket), wait_option);
2793     }
2794 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2795 
2796 #ifdef NX_SECURE_ENABLE
2797     if (client_ptr -> nxd_mqtt_client_use_tls)
2798     {
2799         nx_secure_tls_session_end(&(client_ptr -> nxd_mqtt_tls_session), wait_option);
2800         nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
2801     }
2802 #endif
2803     nx_tcp_socket_disconnect(&(client_ptr -> nxd_mqtt_client_socket), wait_option);
2804     nx_tcp_client_socket_unbind(&(client_ptr -> nxd_mqtt_client_socket));
2805 
2806     /* Disable timer if timer has been started. */
2807     if (client_ptr -> nxd_mqtt_keepalive)
2808     {
2809          tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
2810     }
2811 }
2812 
2813 
2814 static UINT _nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT *client_ptr, UCHAR header_value);
2815 
2816 
2817 /* MQTT internal function */
2818 
2819 /**************************************************************************/
2820 /*                                                                        */
2821 /*  FUNCTION                                               RELEASE        */
2822 /*                                                                        */
2823 /*    _nxd_mqtt_periodic_timer_entry                      PORTABLE C      */
2824 /*                                                           6.1          */
2825 /*  AUTHOR                                                                */
2826 /*                                                                        */
2827 /*    Yuxin Zhou, Microsoft Corporation                                   */
2828 /*                                                                        */
2829 /*  DESCRIPTION                                                           */
2830 /*                                                                        */
2831 /*    This internal function is passed to MQTT client socket create call. */
2832 /*    This callback function notifies MQTT client thread when the TCP     */
2833 /*    connection is lost.                                                 */
2834 /*                                                                        */
2835 /*                                                                        */
2836 /*  INPUT                                                                 */
2837 /*                                                                        */
2838 /*    socket_ptr                            Pointer to TCP socket that    */
2839 /*                                            disconnected.               */
2840 /*                                                                        */
2841 /*  OUTPUT                                                                */
2842 /*                                                                        */
2843 /*    None                                                                */
2844 /*                                                                        */
2845 /*  CALLS                                                                 */
2846 /*                                                                        */
2847 /*    None                                                                */
2848 /*                                                                        */
2849 /*  CALLED BY                                                             */
2850 /*                                                                        */
2851 /*    TCP socket disconnect callback                                      */
2852 /*                                                                        */
2853 /*  RELEASE HISTORY                                                       */
2854 /*                                                                        */
2855 /*    DATE              NAME                      DESCRIPTION             */
2856 /*                                                                        */
2857 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2858 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2859 /*                                            resulting in version 6.1    */
2860 /*                                                                        */
2861 /**************************************************************************/
_nxd_mqtt_periodic_timer_entry(ULONG client)2862 static VOID _nxd_mqtt_periodic_timer_entry(ULONG client)
2863 {
2864 /* Check if it is time to send out a ping message. */
2865 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)client;
2866 
2867     /* If an outstanding ping response has not been received, and the client exceeds the time waiting for ping response,
2868        the client shall disconnect from the server. */
2869     if (client_ptr -> nxd_mqtt_ping_not_responded)
2870     {
2871         /* If current time is greater than the ping timeout */
2872         if ((tx_time_get() - client_ptr -> nxd_mqtt_ping_sent_time) >= client_ptr -> nxd_mqtt_ping_timeout)
2873         {
2874             /* Ping timed out.  Need to terminate the connection. */
2875 #ifndef NXD_MQTT_CLOUD_ENABLE
2876             tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_PING_TIMEOUT_EVENT, TX_OR);
2877 #else
2878             nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PING_TIMEOUT_EVENT);
2879 #endif /* NXD_MQTT_CLOUD_ENABLE */
2880 
2881             return;
2882         }
2883     }
2884 
2885     /* About to timeout? */
2886     if ((client_ptr -> nxd_mqtt_timeout - tx_time_get()) <= client_ptr -> nxd_mqtt_timer_value)
2887     {
2888         /* Set the flag so the MQTT thread can send the ping. */
2889 #ifndef NXD_MQTT_CLOUD_ENABLE
2890         tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_TIMEOUT_EVENT, TX_OR);
2891 #else
2892         nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_TIMEOUT_EVENT);
2893 #endif /* NXD_MQTT_CLOUD_ENABLE */
2894     }
2895 
2896     /* If keepalive is not enabled, just return. */
2897     return;
2898 }
2899 
2900 
2901 
2902 /**************************************************************************/
2903 /*                                                                        */
2904 /*  FUNCTION                                               RELEASE        */
2905 /*                                                                        */
2906 /*    _nxd_mqtt_client_event_process                      PORTABLE C      */
2907 /*                                                           6.1          */
2908 /*  AUTHOR                                                                */
2909 /*                                                                        */
2910 /*    Yuxin Zhou, Microsoft Corporation                                   */
2911 /*                                                                        */
2912 /*  DESCRIPTION                                                           */
2913 /*                                                                        */
2914 /*    This internal function serves as the entry point for the MQTT       */
2915 /*    client thread.                                                      */
2916 /*                                                                        */
2917 /*  INPUT                                                                 */
2918 /*                                                                        */
2919 /*    mqtt_client                           Pointer to MQTT Client        */
2920 /*                                                                        */
2921 /*  OUTPUT                                                                */
2922 /*                                                                        */
2923 /*    None                                                                */
2924 /*                                                                        */
2925 /*  CALLS                                                                 */
2926 /*                                                                        */
2927 /*    _nxd_mqtt_release_transmit_packet                                   */
2928 /*    _nxd_mqtt_release_receive_packet                                    */
2929 /*    _nxd_mqtt_send_simple_message                                       */
2930 /*    _nxd_mqtt_process_disconnect                                        */
2931 /*    _nxd_mqtt_packet_receive_process                                    */
2932 /*    tx_timer_delete                                                     */
2933 /*    tx_event_flags_delete                                               */
2934 /*    nx_tcp_socket_delete                                                */
2935 /*    tx_timer_delete                                                     */
2936 /*    tx_mutex_delete                                                     */
2937 /*                                                                        */
2938 /*  CALLED BY                                                             */
2939 /*                                                                        */
2940 /*   _nxd_mqtt_client_create                                              */
2941 /*                                                                        */
2942 /*  RELEASE HISTORY                                                       */
2943 /*                                                                        */
2944 /*    DATE              NAME                      DESCRIPTION             */
2945 /*                                                                        */
2946 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2947 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
2948 /*                                            corrected mqtt client state,*/
2949 /*                                            resulting in version 6.1    */
2950 /*                                                                        */
2951 /**************************************************************************/
_nxd_mqtt_client_event_process(VOID * mqtt_client,ULONG common_events,ULONG module_own_events)2952 static VOID _nxd_mqtt_client_event_process(VOID *mqtt_client, ULONG common_events, ULONG module_own_events)
2953 {
2954 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)mqtt_client;
2955 
2956 
2957     /* Obtain the mutex. */
2958     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
2959 
2960     /* Process common events.  */
2961     NX_PARAMETER_NOT_USED(common_events);
2962 
2963     if (module_own_events & MQTT_TIMEOUT_EVENT)
2964     {
2965         /* Send out PING only if the client is connected. */
2966         if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2967         {
2968             _nxd_mqtt_send_simple_message(client_ptr, MQTT_CONTROL_PACKET_TYPE_PINGREQ);
2969         }
2970     }
2971 
2972     if (module_own_events & MQTT_TCP_ESTABLISH_EVENT)
2973     {
2974         _nxd_mqtt_tcp_establish_process(client_ptr);
2975     }
2976 
2977     if (module_own_events & MQTT_PACKET_RECEIVE_EVENT)
2978     {
2979 #ifdef NX_SECURE_ENABLE
2980         /* TLS in progress on async mode.  */
2981         if (client_ptr -> nxd_mqtt_tls_in_progress)
2982         {
2983             _nxd_mqtt_tls_establish_process(client_ptr);
2984         }
2985         else
2986 #endif /* NX_SECURE_ENABLE */
2987 
2988         _nxd_mqtt_packet_receive_process(client_ptr);
2989     }
2990 
2991     if (module_own_events & MQTT_PING_TIMEOUT_EVENT)
2992     {
2993         /* The server/broker didn't respond to our ping request message. Disconnect from the server. */
2994         _nxd_mqtt_process_disconnect(client_ptr);
2995     }
2996     if (module_own_events & MQTT_NETWORK_DISCONNECT_EVENT)
2997     {
2998         /* The server closed TCP socket. We shall go through the disconnect code path. */
2999         _nxd_mqtt_process_disconnect(client_ptr);
3000     }
3001 
3002     if (module_own_events & MQTT_DELETE_EVENT)
3003     {
3004 
3005         /* Stop the client and disconnect from the server. */
3006         if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
3007         {
3008             _nxd_mqtt_process_disconnect(client_ptr);
3009         }
3010 
3011         /* Delete the timer. Check first if it is already deleted. */
3012         if ((client_ptr -> nxd_mqtt_timer).tx_timer_id != 0)
3013             tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
3014 
3015 #ifndef NXD_MQTT_CLOUD_ENABLE
3016         /* Delete the event flag. Check first if it is already deleted. */
3017         if ((client_ptr -> nxd_mqtt_events).tx_event_flags_group_id != 0)
3018             tx_event_flags_delete(&client_ptr -> nxd_mqtt_events);
3019 #endif /* NXD_MQTT_CLOUD_ENABLE */
3020 
3021         /* Release all the messages on the receive queue. */
3022         while (client_ptr -> message_receive_queue_head)
3023         {
3024             _nxd_mqtt_release_receive_packet(client_ptr, client_ptr -> message_receive_queue_head, NX_NULL);
3025         }
3026         client_ptr -> message_receive_queue_depth = 0;
3027 
3028         /* Delete all the messages sitting in the receive and transmit queue. */
3029         while (client_ptr -> message_transmit_queue_head)
3030         {
3031             _nxd_mqtt_release_transmit_packet(client_ptr, client_ptr -> message_transmit_queue_head, NX_NULL);
3032         }
3033 
3034         /* Release mutex */
3035         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3036 
3037 #ifndef NXD_MQTT_CLOUD_ENABLE
3038         /* Delete the mutex. */
3039         tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3040 #endif /* NXD_MQTT_CLOUD_ENABLE */
3041 
3042         /* Deleting the socket, (the socket ID is cleared); this signals it is ok to delete this thread. */
3043         nx_tcp_socket_delete(&client_ptr -> nxd_mqtt_client_socket);
3044     }
3045     else
3046     {
3047 
3048         /* Release mutex */
3049         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3050     }
3051 }
3052 
3053 
3054 #ifndef NXD_MQTT_CLOUD_ENABLE
3055 /**************************************************************************/
3056 /*                                                                        */
3057 /*  FUNCTION                                               RELEASE        */
3058 /*                                                                        */
3059 /*    _nxd_mqtt_thread_entry                              PORTABLE C      */
3060 /*                                                           6.1          */
3061 /*  AUTHOR                                                                */
3062 /*                                                                        */
3063 /*    Yuxin Zhou, Microsoft Corporation                                   */
3064 /*                                                                        */
3065 /*  DESCRIPTION                                                           */
3066 /*                                                                        */
3067 /*    This internal function serves as the entry point for the MQTT       */
3068 /*    client thread.                                                      */
3069 /*                                                                        */
3070 /*  INPUT                                                                 */
3071 /*                                                                        */
3072 /*    mqtt_client                           Pointer to MQTT Client        */
3073 /*                                                                        */
3074 /*  OUTPUT                                                                */
3075 /*                                                                        */
3076 /*    None                                                                */
3077 /*                                                                        */
3078 /*  CALLS                                                                 */
3079 /*                                                                        */
3080 /*    tx_event_flags_get                                                  */
3081 /*    _nxd_mqtt_client_event_process                                      */
3082 /*                                                                        */
3083 /*  CALLED BY                                                             */
3084 /*                                                                        */
3085 /*   _nxd_mqtt_client_create                                              */
3086 /*                                                                        */
3087 /*  RELEASE HISTORY                                                       */
3088 /*                                                                        */
3089 /*    DATE              NAME                      DESCRIPTION             */
3090 /*                                                                        */
3091 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3092 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3093 /*                                            resulting in version 6.1    */
3094 /*                                                                        */
3095 /**************************************************************************/
_nxd_mqtt_thread_entry(ULONG mqtt_client)3096 static VOID _nxd_mqtt_thread_entry(ULONG mqtt_client)
3097 {
3098 NXD_MQTT_CLIENT *client_ptr;
3099 ULONG            events;
3100 
3101     client_ptr = (NXD_MQTT_CLIENT *)mqtt_client;
3102 
3103     /* Loop to process events on the MQTT client */
3104     for (;;)
3105     {
3106 
3107         tx_event_flags_get(&client_ptr -> nxd_mqtt_events, MQTT_ALL_EVENTS, TX_OR_CLEAR, &events, TX_WAIT_FOREVER);
3108 
3109         /* Call the event processing routine.  */
3110         _nxd_mqtt_client_event_process(client_ptr, NX_NULL, events);
3111 
3112         if (events & MQTT_DELETE_EVENT)
3113         {
3114             break;
3115         }
3116     }
3117 }
3118 #endif /* NXD_MQTT_CLOUD_ENABLE */
3119 
3120 
3121 /**************************************************************************/
3122 /*                                                                        */
3123 /*  FUNCTION                                               RELEASE        */
3124 /*                                                                        */
3125 /*    _mqtt_client_disconnect_callback                    PORTABLE C      */
3126 /*                                                           6.1          */
3127 /*  AUTHOR                                                                */
3128 /*                                                                        */
3129 /*    Yuxin Zhou, Microsoft Corporation                                   */
3130 /*                                                                        */
3131 /*  DESCRIPTION                                                           */
3132 /*                                                                        */
3133 /*    This internal function is passed to MQTT client socket create call. */
3134 /*    This callback function notifies MQTT client thread when the TCP     */
3135 /*    connection is lost.                                                 */
3136 /*                                                                        */
3137 /*                                                                        */
3138 /*  INPUT                                                                 */
3139 /*                                                                        */
3140 /*    socket_ptr                            Pointer to TCP socket that    */
3141 /*                                            disconnected.               */
3142 /*                                                                        */
3143 /*  OUTPUT                                                                */
3144 /*                                                                        */
3145 /*    None                                                                */
3146 /*                                                                        */
3147 /*  CALLS                                                                 */
3148 /*                                                                        */
3149 /*    None                                                                */
3150 /*                                                                        */
3151 /*  CALLED BY                                                             */
3152 /*                                                                        */
3153 /*    TCP socket disconnect callback                                      */
3154 /*                                                                        */
3155 /*  RELEASE HISTORY                                                       */
3156 /*                                                                        */
3157 /*    DATE              NAME                      DESCRIPTION             */
3158 /*                                                                        */
3159 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3160 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3161 /*                                            resulting in version 6.1    */
3162 /*                                                                        */
3163 /**************************************************************************/
_mqtt_client_disconnect_callback(NX_TCP_SOCKET * socket_ptr)3164 static VOID _mqtt_client_disconnect_callback(NX_TCP_SOCKET *socket_ptr)
3165 {
3166 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)(socket_ptr -> nx_tcp_socket_reserved_ptr);
3167 
3168     /* Set the MQTT_NETWORK_DISCONNECT  event.  This event indicates
3169        that the disconnect is initiated from the network. */
3170 #ifndef NXD_MQTT_CLOUD_ENABLE
3171     tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_NETWORK_DISCONNECT_EVENT, TX_OR);
3172 #else
3173     nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_NETWORK_DISCONNECT_EVENT);
3174 #endif /* NXD_MQTT_CLOUD_ENABLE */
3175 
3176     return;
3177 }
3178 
3179 
3180 /**************************************************************************/
3181 /*                                                                        */
3182 /*  FUNCTION                                               RELEASE        */
3183 /*                                                                        */
3184 /*    _nxd_mqtt_client_create                             PORTABLE C      */
3185 /*                                                           6.1          */
3186 /*  AUTHOR                                                                */
3187 /*                                                                        */
3188 /*    Yuxin Zhou, Microsoft Corporation                                   */
3189 /*                                                                        */
3190 /*  DESCRIPTION                                                           */
3191 /*                                                                        */
3192 /*    This function creates an instance for MQTT Client.  It initializes  */
3193 /*    MQTT Client control block, creates a thread, mutex and event queue  */
3194 /*    for MQTT Client, and creates the TCP socket for MQTT messaging.     */
3195 /*                                                                        */
3196 /*                                                                        */
3197 /*  INPUT                                                                 */
3198 /*                                                                        */
3199 /*    client_ptr                            Pointer to MQTT Client        */
3200 /*    client_id                             Client ID for this instance   */
3201 /*    client_id_length                      Length of Client ID, in bytes */
3202 /*    ip_ptr                                Pointer to IP instance        */
3203 /*    pool_ptr                              Pointer to packet pool        */
3204 /*    stack_ptr                             Client thread's stack pointer */
3205 /*    stack_size                            Client thread's stack size    */
3206 /*    mqtt_thread_priority                  Priority for MQTT thread      */
3207 /*    memory_ptr                            Deprecated and not used       */
3208 /*    memory_size                           Deprecated and not used       */
3209 /*                                                                        */
3210 /*  OUTPUT                                                                */
3211 /*                                                                        */
3212 /*    status                                Completion status             */
3213 /*                                                                        */
3214 /*  CALLS                                                                 */
3215 /*                                                                        */
3216 /*    _nxd_mqtt_client_create_internal      Create mqtt client            */
3217 /*                                                                        */
3218 /*  CALLED BY                                                             */
3219 /*                                                                        */
3220 /*    Application Code                                                    */
3221 /*                                                                        */
3222 /*  RELEASE HISTORY                                                       */
3223 /*                                                                        */
3224 /*    DATE              NAME                      DESCRIPTION             */
3225 /*                                                                        */
3226 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3227 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
3228 /*                                            corrected mqtt client state,*/
3229 /*                                            resulting in version 6.1    */
3230 /*                                                                        */
3231 /**************************************************************************/
_nxd_mqtt_client_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority,VOID * memory_ptr,ULONG memory_size)3232 UINT _nxd_mqtt_client_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
3233                              CHAR *client_id, UINT client_id_length,
3234                              NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
3235                              VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority,
3236                              VOID *memory_ptr, ULONG memory_size)
3237 {
3238 
3239 UINT    status;
3240 
3241 
3242     NX_PARAMETER_NOT_USED(memory_ptr);
3243     NX_PARAMETER_NOT_USED(memory_size);
3244 
3245     /* Create MQTT client.  */
3246     status = _nxd_mqtt_client_create_internal(client_ptr, client_name, client_id, client_id_length, ip_ptr,
3247                                               pool_ptr, stack_ptr, stack_size, mqtt_thread_priority);
3248 
3249     /* Check status.  */
3250     if (status)
3251     {
3252         return(status);
3253     }
3254 
3255 #ifdef NXD_MQTT_CLOUD_ENABLE
3256 
3257     /* Create cloud helper.  */
3258     status = nx_cloud_create(&(client_ptr -> nxd_mqtt_client_cloud), "Cloud Helper", stack_ptr, stack_size, mqtt_thread_priority);
3259 
3260     /* Determine if an error occurred.  */
3261     if (status != NX_SUCCESS)
3262     {
3263 
3264         /* Delete internal resource created in _nxd_mqtt_client_create_internal().  */
3265 
3266         /* Delete socket.  */
3267         nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
3268 
3269         return(NXD_MQTT_INTERNAL_ERROR);
3270     }
3271 
3272     /* Save the cloud pointer.  */
3273     client_ptr -> nxd_mqtt_client_cloud_ptr = &(client_ptr -> nxd_mqtt_client_cloud);
3274 
3275     /* Save the mutex pointer.  */
3276     client_ptr -> nxd_mqtt_client_mutex_ptr = &(client_ptr -> nxd_mqtt_client_cloud.nx_cloud_mutex);
3277 
3278     /* Register MQTT on cloud helper.  */
3279     status = nx_cloud_module_register(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module), client_name, NX_CLOUD_MODULE_MQTT_EVENT,
3280                                       _nxd_mqtt_client_event_process, client_ptr);
3281 
3282     /* Determine if an error occurred.  */
3283     if (status != NX_SUCCESS)
3284     {
3285 
3286         /* Delete own created cloud.  */
3287         nx_cloud_delete(&(client_ptr -> nxd_mqtt_client_cloud));
3288 
3289         /* Delete internal resource created in _nxd_mqtt_client_create_internal().  */
3290 
3291         /* Delete socket.  */
3292         nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
3293 
3294         return(NXD_MQTT_INTERNAL_ERROR);
3295     }
3296 
3297 #endif /* NXD_MQTT_CLOUD_ENABLE */
3298 
3299     /* Update state.  */
3300     client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
3301 
3302     /* Return.  */
3303     return(NXD_MQTT_SUCCESS);
3304 }
3305 
3306 
3307 /**************************************************************************/
3308 /*                                                                        */
3309 /*  FUNCTION                                               RELEASE        */
3310 /*                                                                        */
3311 /*    _nxd_mqtt_client_create_internal                    PORTABLE C      */
3312 /*                                                           6.1.12       */
3313 /*  AUTHOR                                                                */
3314 /*                                                                        */
3315 /*    Yuxin Zhou, Microsoft Corporation                                   */
3316 /*                                                                        */
3317 /*  DESCRIPTION                                                           */
3318 /*                                                                        */
3319 /*    This function creates an instance for MQTT Client.  It initializes  */
3320 /*    MQTT Client control block, creates a thread, mutex and event queue  */
3321 /*    for MQTT Client, and creates the TCP socket for MQTT messaging.     */
3322 /*                                                                        */
3323 /*                                                                        */
3324 /*  INPUT                                                                 */
3325 /*                                                                        */
3326 /*    client_ptr                            Pointer to MQTT Client        */
3327 /*    client_id                             Client ID for this instance   */
3328 /*    client_id_length                      Length of Client ID, in bytes */
3329 /*    ip_ptr                                Pointer to IP instance        */
3330 /*    pool_ptr                              Pointer to packet pool        */
3331 /*    stack_ptr                             Client thread's stack pointer */
3332 /*    stack_size                            Client thread's stack size    */
3333 /*    mqtt_thread_priority                  Priority for MQTT thread      */
3334 /*                                                                        */
3335 /*  OUTPUT                                                                */
3336 /*                                                                        */
3337 /*    status                                Completion status             */
3338 /*                                                                        */
3339 /*  CALLS                                                                 */
3340 /*                                                                        */
3341 /*    tx_thread_create                      Create a thread               */
3342 /*    tx_mutex_create                       Create mutex                  */
3343 /*    tx_mutex_get                                                        */
3344 /*    tx_mutex_put                                                        */
3345 /*    tx_event_flag_create                  Create event flag             */
3346 /*                                                                        */
3347 /*  CALLED BY                                                             */
3348 /*                                                                        */
3349 /*    Application Code                                                    */
3350 /*                                                                        */
3351 /*  RELEASE HISTORY                                                       */
3352 /*                                                                        */
3353 /*    DATE              NAME                      DESCRIPTION             */
3354 /*                                                                        */
3355 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3356 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
3357 /*                                            corrected mqtt client state,*/
3358 /*                                            resulting in version 6.1    */
3359 /*  07-29-2022     Spencer McDonough        Modified comment(s), and      */
3360 /*                                            improved internal logic,    */
3361 /*                                            resulting in version 6.1.12 */
3362 /*                                                                        */
3363 /**************************************************************************/
_nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority)3364 static UINT _nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
3365                                              CHAR *client_id, UINT client_id_length,
3366                                              NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
3367                                              VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority)
3368 {
3369 #ifndef NXD_MQTT_CLOUD_ENABLE
3370 UINT                status;
3371 #endif /* NXD_MQTT_CLOUD_ENABLE */
3372 
3373 #ifdef NXD_MQTT_CLOUD_ENABLE
3374     NX_PARAMETER_NOT_USED(stack_ptr);
3375     NX_PARAMETER_NOT_USED(stack_size);
3376     NX_PARAMETER_NOT_USED(mqtt_thread_priority);
3377 #endif /* NXD_MQTT_CLOUD_ENABLE */
3378 
3379     /* Clear the MQTT Client control block. */
3380     NXD_MQTT_SECURE_MEMSET((void *)client_ptr, 0, sizeof(NXD_MQTT_CLIENT));
3381 
3382 #ifndef NXD_MQTT_CLOUD_ENABLE
3383 
3384     /* Create MQTT mutex.  */
3385     status = tx_mutex_create(&client_ptr -> nxd_mqtt_protection, client_name, TX_NO_INHERIT);
3386 
3387     /* Determine if an error occurred. */
3388     if (status != TX_SUCCESS)
3389     {
3390 
3391         return(NXD_MQTT_INTERNAL_ERROR);
3392     }
3393     client_ptr -> nxd_mqtt_client_mutex_ptr = &(client_ptr -> nxd_mqtt_protection);
3394 
3395     /* Now create MQTT client thread */
3396     status = tx_thread_create(&(client_ptr -> nxd_mqtt_thread), client_name, _nxd_mqtt_thread_entry,
3397                               (ULONG)client_ptr, stack_ptr, stack_size, mqtt_thread_priority, mqtt_thread_priority,
3398                               NXD_MQTT_CLIENT_THREAD_TIME_SLICE, TX_DONT_START);
3399 
3400     /* Determine if an error occurred. */
3401     if (status != TX_SUCCESS)
3402     {
3403         /* Delete the mutex. */
3404         tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3405 
3406         /* Return error code. */
3407         return(NXD_MQTT_INTERNAL_ERROR);
3408     }
3409 
3410     status = tx_event_flags_create(&(client_ptr -> nxd_mqtt_events), client_name);
3411 
3412     if (status != TX_SUCCESS)
3413     {
3414         /* Delete the mutex. */
3415         tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3416 
3417         /* Delete the thread. */
3418         tx_thread_delete(&(client_ptr -> nxd_mqtt_thread));
3419 
3420         /* Return error code. */
3421         return(NXD_MQTT_INTERNAL_ERROR);
3422     }
3423 #endif /* NXD_MQTT_CLOUD_ENABLE */
3424 
3425     /* Record the client ID information. */
3426     client_ptr -> nxd_mqtt_client_id = client_id;
3427     client_ptr -> nxd_mqtt_client_id_length = client_id_length;
3428     client_ptr -> nxd_mqtt_client_ip_ptr = ip_ptr;
3429     client_ptr -> nxd_mqtt_client_packet_pool_ptr = pool_ptr;
3430     client_ptr -> nxd_mqtt_client_name = client_name;
3431 
3432     /* Create the socket. */
3433     nx_tcp_socket_create(client_ptr -> nxd_mqtt_client_ip_ptr, &(client_ptr -> nxd_mqtt_client_socket), client_ptr -> nxd_mqtt_client_name,
3434                          NX_IP_NORMAL, NX_DONT_FRAGMENT, 0x80, NXD_MQTT_CLIENT_SOCKET_WINDOW_SIZE,
3435                          NX_NULL, _mqtt_client_disconnect_callback);
3436 
3437 
3438     /* Record the client_ptr in the socket structure. */
3439     client_ptr -> nxd_mqtt_client_socket.nx_tcp_socket_reserved_ptr = (VOID *)client_ptr;
3440 
3441 #ifndef NXD_MQTT_CLOUD_ENABLE
3442     /* Start MQTT thread. */
3443     tx_thread_resume(&(client_ptr -> nxd_mqtt_thread));
3444 #endif /* NXD_MQTT_CLOUD_ENABLE */
3445     return(NXD_MQTT_SUCCESS);
3446 }
3447 
3448 
3449 /**************************************************************************/
3450 /*                                                                        */
3451 /*  FUNCTION                                               RELEASE        */
3452 /*                                                                        */
3453 /*    _nxd_mqtt_client_login_set                          PORTABLE C      */
3454 /*                                                           6.1          */
3455 /*  AUTHOR                                                                */
3456 /*                                                                        */
3457 /*    Yuxin Zhou, Microsoft Corporation                                   */
3458 /*                                                                        */
3459 /*  DESCRIPTION                                                           */
3460 /*                                                                        */
3461 /*    This function sets optional MQTT username and password.  Note       */
3462 /*    if the broker requires username and password, this information      */
3463 /*    must be set prior to calling nxd_mqtt_client_connect or             */
3464 /*    nxd_mqtt_client_secure_connect.                                     */
3465 /*                                                                        */
3466 /*                                                                        */
3467 /*  INPUT                                                                 */
3468 /*                                                                        */
3469 /*    client_ptr                            Pointer to MQTT Client        */
3470 /*    username                              User name, or NULL if user    */
3471 /*                                            name is not required        */
3472 /*    username_length                       Length of the user name, or   */
3473 /*                                            0 if user name is NULL      */
3474 /*    password                              Password string, or NULL if   */
3475 /*                                            password is not required    */
3476 /*    password_length                       Length of the password, or    */
3477 /*                                            0 if password is NULL       */
3478 /*                                                                        */
3479 /*  OUTPUT                                                                */
3480 /*                                                                        */
3481 /*    status                                Completion status             */
3482 /*                                                                        */
3483 /*  CALLS                                                                 */
3484 /*                                                                        */
3485 /*    tx_mutex_get                                                        */
3486 /*                                                                        */
3487 /*  CALLED BY                                                             */
3488 /*                                                                        */
3489 /*    Application Code                                                    */
3490 /*                                                                        */
3491 /*  RELEASE HISTORY                                                       */
3492 /*                                                                        */
3493 /*    DATE              NAME                      DESCRIPTION             */
3494 /*                                                                        */
3495 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3496 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3497 /*                                            resulting in version 6.1    */
3498 /*                                                                        */
3499 /**************************************************************************/
_nxd_mqtt_client_login_set(NXD_MQTT_CLIENT * client_ptr,CHAR * username,UINT username_length,CHAR * password,UINT password_length)3500 UINT _nxd_mqtt_client_login_set(NXD_MQTT_CLIENT *client_ptr,
3501                                 CHAR *username, UINT username_length, CHAR *password, UINT password_length)
3502 {
3503 UINT status;
3504 
3505     /* Obtain the mutex. */
3506     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3507 
3508     if (status != TX_SUCCESS)
3509     {
3510         return(NXD_MQTT_MUTEX_FAILURE);
3511     }
3512     client_ptr -> nxd_mqtt_client_username = username;
3513     client_ptr -> nxd_mqtt_client_username_length = (USHORT)username_length;
3514     client_ptr -> nxd_mqtt_client_password = password;
3515     client_ptr -> nxd_mqtt_client_password_length = (USHORT)password_length;
3516 
3517     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3518 
3519     return(NX_SUCCESS);
3520 }
3521 
3522 /**************************************************************************/
3523 /*                                                                        */
3524 /*  FUNCTION                                               RELEASE        */
3525 /*                                                                        */
3526 /*    _nxd_mqtt_client_will_message_set                   PORTABLE C      */
3527 /*                                                           6.1          */
3528 /*  AUTHOR                                                                */
3529 /*                                                                        */
3530 /*    Yuxin Zhou, Microsoft Corporation                                   */
3531 /*                                                                        */
3532 /*  DESCRIPTION                                                           */
3533 /*                                                                        */
3534 /*    This function sets optional MQTT will topic and will message.       */
3535 /*    Note that if will message is needed, application must set will      */
3536 /*    message prior to calling nxd_mqtt_client_connect or                 */
3537 /*    nxd_mqtt_client_secure_connect.                                     */
3538 /*                                                                        */
3539 /*                                                                        */
3540 /*  INPUT                                                                 */
3541 /*                                                                        */
3542 /*    client_ptr                            Pointer to MQTT Client        */
3543 /*    will_topic                            Will topic string.            */
3544 /*    will_topic_length                     Length of the will topic.     */
3545 /*    will_message                          Will message string.          */
3546 /*    will_message_length                   Length of the will message.   */
3547 /*    will_retain_flag                      Whether or not the will       */
3548 /*                                            message is to be retained   */
3549 /*                                            when it is published.       */
3550 /*                                            Valid values are NX_TRUE    */
3551 /*                                            NX_FALSE                    */
3552 /*    will_QoS                              QoS level to be used when     */
3553 /*                                            publishing will message.    */
3554 /*                                            Valid values are 0, 1, 2    */
3555 /*                                                                        */
3556 /*  OUTPUT                                                                */
3557 /*                                                                        */
3558 /*    status                                Completion status             */
3559 /*                                                                        */
3560 /*  CALLS                                                                 */
3561 /*                                                                        */
3562 /*    tx_mutex_get                                                        */
3563 /*                                                                        */
3564 /*  CALLED BY                                                             */
3565 /*                                                                        */
3566 /*    Application Code                                                    */
3567 /*                                                                        */
3568 /*  RELEASE HISTORY                                                       */
3569 /*                                                                        */
3570 /*    DATE              NAME                      DESCRIPTION             */
3571 /*                                                                        */
3572 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3573 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3574 /*                                            resulting in version 6.1    */
3575 /*                                                                        */
3576 /**************************************************************************/
_nxd_mqtt_client_will_message_set(NXD_MQTT_CLIENT * client_ptr,const UCHAR * will_topic,UINT will_topic_length,const UCHAR * will_message,UINT will_message_length,UINT will_retain_flag,UINT will_QoS)3577 UINT _nxd_mqtt_client_will_message_set(NXD_MQTT_CLIENT *client_ptr,
3578                                        const UCHAR *will_topic, UINT will_topic_length, const UCHAR *will_message,
3579                                        UINT will_message_length, UINT will_retain_flag, UINT will_QoS)
3580 {
3581 UINT status;
3582 
3583     if (will_QoS == 2)
3584     {
3585         return(NXD_MQTT_QOS2_NOT_SUPPORTED);
3586     }
3587 
3588     /* Obtain the mutex. */
3589     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3590 
3591     if (status != TX_SUCCESS)
3592     {
3593         return(NXD_MQTT_MUTEX_FAILURE);
3594     }
3595 
3596     client_ptr -> nxd_mqtt_client_will_topic = will_topic;
3597     client_ptr -> nxd_mqtt_client_will_topic_length = will_topic_length;
3598     client_ptr -> nxd_mqtt_client_will_message = will_message;
3599     client_ptr -> nxd_mqtt_client_will_message_length = will_message_length;
3600 
3601     if (will_retain_flag == NX_TRUE)
3602     {
3603         client_ptr -> nxd_mqtt_client_will_qos_retain = 0x80;
3604     }
3605     client_ptr -> nxd_mqtt_client_will_qos_retain = (UCHAR)(client_ptr -> nxd_mqtt_client_will_qos_retain | will_QoS);
3606 
3607     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3608 
3609     return(NX_SUCCESS);
3610 }
3611 
3612 
3613 /**************************************************************************/
3614 /*                                                                        */
3615 /*  FUNCTION                                               RELEASE        */
3616 /*                                                                        */
3617 /*    _nxde_mqtt_client_will_message_set                  PORTABLE C      */
3618 /*                                                           6.1          */
3619 /*  AUTHOR                                                                */
3620 /*                                                                        */
3621 /*    Yuxin Zhou, Microsoft Corporation                                   */
3622 /*                                                                        */
3623 /*  DESCRIPTION                                                           */
3624 /*                                                                        */
3625 /*    This function checks for errors in the                              */
3626 /*    nxd_mqtt_client_will_message_set call.                              */
3627 /*                                                                        */
3628 /*                                                                        */
3629 /*  INPUT                                                                 */
3630 /*                                                                        */
3631 /*    client_ptr                            Pointer to MQTT Client        */
3632 /*    will_topic                            Will topic string.            */
3633 /*    will_topic_length                     Length of the will topic.     */
3634 /*    will_message                          Will message string.          */
3635 /*    will_message_length                   Length of the will message.   */
3636 /*    will_retain_flag                      Whether or not the will       */
3637 /*                                            message is to be retained   */
3638 /*                                            when it is published.       */
3639 /*                                            Valid values are NX_TRUE    */
3640 /*                                            NX_FALSE                    */
3641 /*    will_QoS                              QoS level to be used when     */
3642 /*                                            publishing will message.    */
3643 /*                                            Valid values are 0, 1, 2    */
3644 /*                                                                        */
3645 /*  OUTPUT                                                                */
3646 /*                                                                        */
3647 /*    status                                Completion status             */
3648 /*                                                                        */
3649 /*  CALLS                                                                 */
3650 /*                                                                        */
3651 /*    _nxd_mqtt_client_will_message_set                                   */
3652 /*                                                                        */
3653 /*  CALLED BY                                                             */
3654 /*                                                                        */
3655 /*    Application Code                                                    */
3656 /*                                                                        */
3657 /*  RELEASE HISTORY                                                       */
3658 /*                                                                        */
3659 /*    DATE              NAME                      DESCRIPTION             */
3660 /*                                                                        */
3661 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3662 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3663 /*                                            resulting in version 6.1    */
3664 /*                                                                        */
3665 /**************************************************************************/
_nxde_mqtt_client_will_message_set(NXD_MQTT_CLIENT * client_ptr,const UCHAR * will_topic,UINT will_topic_length,const UCHAR * will_message,UINT will_message_length,UINT will_retain_flag,UINT will_QoS)3666 UINT _nxde_mqtt_client_will_message_set(NXD_MQTT_CLIENT *client_ptr,
3667                                         const UCHAR *will_topic, UINT will_topic_length, const UCHAR *will_message,
3668                                         UINT will_message_length, UINT will_retain_flag, UINT will_QoS)
3669 {
3670 
3671     if (client_ptr == NX_NULL)
3672     {
3673         return(NXD_MQTT_INVALID_PARAMETER);
3674     }
3675 
3676     /* Valid will_topic string.  The will topic string cannot be NULL. */
3677     if ((will_topic == NX_NULL) || (will_topic_length  == 0))
3678     {
3679         return(NXD_MQTT_INVALID_PARAMETER);
3680     }
3681 
3682     if ((will_retain_flag != NX_TRUE) && (will_retain_flag != NX_FALSE))
3683     {
3684         return(NXD_MQTT_INVALID_PARAMETER);
3685     }
3686 
3687     if (will_QoS > 2)
3688     {
3689         return(NXD_MQTT_INVALID_PARAMETER);
3690     }
3691 
3692     return(_nxd_mqtt_client_will_message_set(client_ptr, will_topic, will_topic_length, will_message,
3693                                              will_message_length, will_retain_flag, will_QoS));
3694 }
3695 
3696 /**************************************************************************/
3697 /*                                                                        */
3698 /*  FUNCTION                                               RELEASE        */
3699 /*                                                                        */
3700 /*    _nxde_mqtt_client_login_set                         PORTABLE C      */
3701 /*                                                           6.1          */
3702 /*  AUTHOR                                                                */
3703 /*                                                                        */
3704 /*    Yuxin Zhou, Microsoft Corporation                                   */
3705 /*                                                                        */
3706 /*  DESCRIPTION                                                           */
3707 /*                                                                        */
3708 /*    This function checks for errors in the nxd_mqtt_client_login call.  */
3709 /*                                                                        */
3710 /*                                                                        */
3711 /*  INPUT                                                                 */
3712 /*                                                                        */
3713 /*    client_ptr                            Pointer to MQTT Client        */
3714 /*    username                              User name, or NULL if user    */
3715 /*                                            name is not required        */
3716 /*    username_length                       Length of the user name, or   */
3717 /*                                            0 if user name is NULL      */
3718 /*    password                              Password string, or NULL if   */
3719 /*                                            password is not required    */
3720 /*    password_length                       Length of the password, or    */
3721 /*                                            0 if password is NULL       */
3722 /*                                                                        */
3723 /*  OUTPUT                                                                */
3724 /*                                                                        */
3725 /*    status                                Completion status             */
3726 /*                                                                        */
3727 /*  CALLS                                                                 */
3728 /*                                                                        */
3729 /*    _nxd_mqtt_client_login_set                                          */
3730 /*                                                                        */
3731 /*  CALLED BY                                                             */
3732 /*                                                                        */
3733 /*    Application Code                                                    */
3734 /*                                                                        */
3735 /*  RELEASE HISTORY                                                       */
3736 /*                                                                        */
3737 /*    DATE              NAME                      DESCRIPTION             */
3738 /*                                                                        */
3739 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3740 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3741 /*                                            resulting in version 6.1    */
3742 /*                                                                        */
3743 /**************************************************************************/
_nxde_mqtt_client_login_set(NXD_MQTT_CLIENT * client_ptr,CHAR * username,UINT username_length,CHAR * password,UINT password_length)3744 UINT _nxde_mqtt_client_login_set(NXD_MQTT_CLIENT *client_ptr,
3745                                  CHAR *username, UINT username_length, CHAR *password, UINT password_length)
3746 {
3747     if (client_ptr == NX_NULL)
3748     {
3749         return(NXD_MQTT_INVALID_PARAMETER);
3750     }
3751 
3752     /* Username and username length don't match,
3753        or password and password length don't match. */
3754     if (((username == NX_NULL) && (username_length != 0)) ||
3755         ((password == NX_NULL) && (password_length != 0)))
3756     {
3757         return(NXD_MQTT_INVALID_PARAMETER);
3758     }
3759 
3760     return(_nxd_mqtt_client_login_set(client_ptr, username, username_length, password, password_length));
3761 }
3762 
3763 
3764 /**************************************************************************/
3765 /*                                                                        */
3766 /*  FUNCTION                                               RELEASE        */
3767 /*                                                                        */
3768 /*    _nxd_mqtt_client_retransmit_message                 PORTABLE C      */
3769 /*                                                           6.2.0        */
3770 /*  AUTHOR                                                                */
3771 /*                                                                        */
3772 /*    Yuxin Zhou, Microsoft Corporation                                   */
3773 /*                                                                        */
3774 /*  DESCRIPTION                                                           */
3775 /*                                                                        */
3776 /*    This function retransmit QoS1 messages upon reconnection, if the    */
3777 /*    connection is not set CLEAN_SESSION.                                */
3778 /*                                                                        */
3779 /*                                                                        */
3780 /*  INPUT                                                                 */
3781 /*                                                                        */
3782 /*    client_ptr                            Pointer to MQTT Client        */
3783 /*    wait_option                           Timeout value                 */
3784 /*                                                                        */
3785 /*  OUTPUT                                                                */
3786 /*                                                                        */
3787 /*    status                                Completion status             */
3788 /*                                                                        */
3789 /*  CALLS                                                                 */
3790 /*                                                                        */
3791 /*    tx_mutex_get                                                        */
3792 /*    tx_mutex_put                                                        */
3793 /*    _nxd_mqtt_packet_send                                               */
3794 /*    nx_packet_release                                                   */
3795 /*    tx_time_get                                                         */
3796 /*    nx_packet_copy                                                      */
3797 /*                                                                        */
3798 /*  CALLED BY                                                             */
3799 /*                                                                        */
3800 /*    Application Code                                                    */
3801 /*                                                                        */
3802 /*  RELEASE HISTORY                                                       */
3803 /*                                                                        */
3804 /*    DATE              NAME                      DESCRIPTION             */
3805 /*                                                                        */
3806 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3807 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3808 /*                                            resulting in version 6.1    */
3809 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
3810 /*                                            the logic of sending packet,*/
3811 /*                                            resulting in version 6.2.0  */
3812 /*                                                                        */
3813 /**************************************************************************/
_nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)3814 static UINT _nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
3815 {
3816 NX_PACKET          *transmit_packet_ptr;
3817 NX_PACKET          *packet_ptr;
3818 UINT                status = NXD_MQTT_SUCCESS;
3819 UINT                mutex_status;
3820 UCHAR               fixed_header;
3821 
3822     transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
3823 
3824     while (transmit_packet_ptr)
3825     {
3826         fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
3827 
3828         if ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4))
3829         {
3830 
3831             /* Retransmit publish packet only. */
3832             /* Obtain a NetX Packet. */
3833             status = nx_packet_copy(transmit_packet_ptr, &packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
3834 
3835             if (status != NXD_MQTT_SUCCESS)
3836             {
3837                 return(NXD_MQTT_PACKET_POOL_FAILURE);
3838             }
3839 
3840             /* Release the mutex. */
3841             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3842 
3843             /* Send packet to server.  */
3844             status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
3845 
3846             if (status)
3847             {
3848                 /* Release the packet. */
3849                 nx_packet_release(packet_ptr);
3850 
3851                 status = NXD_MQTT_COMMUNICATION_FAILURE;
3852             }
3853             /* Obtain the mutex. */
3854             mutex_status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, wait_option);
3855 
3856             if (mutex_status != TX_SUCCESS)
3857             {
3858                 return(NXD_MQTT_MUTEX_FAILURE);
3859             }
3860             if (status)
3861             {
3862                 return(status);
3863             }
3864         }
3865         transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
3866     }
3867 
3868     /* Update the timeout value. */
3869     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
3870 
3871     return(status);
3872 }
3873 
3874 /**************************************************************************/
3875 /*                                                                        */
3876 /*  FUNCTION                                               RELEASE        */
3877 /*                                                                        */
3878 /*    _nxd_mqtt_client_connect                            PORTABLE C      */
3879 /*                                                           6.3.0        */
3880 /*  AUTHOR                                                                */
3881 /*                                                                        */
3882 /*    Yuxin Zhou, Microsoft Corporation                                   */
3883 /*                                                                        */
3884 /*  DESCRIPTION                                                           */
3885 /*                                                                        */
3886 /*    This function makes an initial connection to the MQTT server.       */
3887 /*                                                                        */
3888 /*                                                                        */
3889 /*  INPUT                                                                 */
3890 /*                                                                        */
3891 /*    client_ptr                            Pointer to MQTT Client        */
3892 /*    server_ip                             Server IP address structure   */
3893 /*    server_port                           Server port number, in host   */
3894 /*                                            byte order                  */
3895 /*    keepalive                             Keepalive flag                */
3896 /*    clean_session                         Clean session flag            */
3897 /*    wait_option                           Timeout value                 */
3898 /*                                                                        */
3899 /*                                                                        */
3900 /*  OUTPUT                                                                */
3901 /*                                                                        */
3902 /*    status                                Completion status             */
3903 /*                                                                        */
3904 /*  CALLS                                                                 */
3905 /*                                                                        */
3906 /*    tx_mutex_get                                                        */
3907 /*    nx_tcp_socket_create                  Create TCP socket             */
3908 /*    nx_tcp_socket_receive_notify                                        */
3909 /*    nx_tcp_client_socket_bind                                           */
3910 /*    nxd_tcp_client_socket_connect                                       */
3911 /*    nx_tcp_client_socket_unbind                                         */
3912 /*    tx_mutex_put                                                        */
3913 /*    nx_secure_tls_session_start                                         */
3914 /*    nx_secure_tls_session_send                                          */
3915 /*    nx_secure_tls_session_receive                                       */
3916 /*    _nxd_mqtt_client_set_fixed_header                                   */
3917 /*    _nxd_mqtt_client_append_message                                     */
3918 /*    nx_tcp_socket_send                                                  */
3919 /*    nx_packet_release                                                   */
3920 /*    nx_tcp_socket_receive                                               */
3921 /*    tx_event_flag_set                                                   */
3922 /*    _nxd_mqtt_release_transmit_packet                                   */
3923 /*    tx_timer_create                                                     */
3924 /*    nx_secure_tls_session_receive                                       */
3925 /*    _nxd_mqtt_client_connection_end                                     */
3926 /*                                                                        */
3927 /*  CALLED BY                                                             */
3928 /*                                                                        */
3929 /*    Application Code                                                    */
3930 /*                                                                        */
3931 /*  RELEASE HISTORY                                                       */
3932 /*                                                                        */
3933 /*    DATE              NAME                      DESCRIPTION             */
3934 /*                                                                        */
3935 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3936 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
3937 /*                                            fixed return value when it  */
3938 /*                                            is set in CONNACK, corrected*/
3939 /*                                            mqtt client state,          */
3940 /*                                            resulting in version 6.1    */
3941 /*  08-02-2021     Yuxin Zhou               Modified comment(s), and      */
3942 /*                                            corrected the logic for     */
3943 /*                                            non-blocking mode,          */
3944 /*                                            resulting in version 6.1.8  */
3945 /*  07-29-2022     Spencer McDonough        Modified comment(s), and      */
3946 /*                                            improved internal logic,    */
3947 /*                                            resulting in version 6.1.12 */
3948 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
3949 /*                                            mqtt over websocket,        */
3950 /*                                            resulting in version 6.2.0  */
3951 /*  10-31-2023     Haiqing Zhao             Modified comment(s),          */
3952 /*                                            resulting in version 6.3.0  */
3953 /*                                                                        */
3954 /**************************************************************************/
_nxd_mqtt_client_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT keepalive,UINT clean_session,ULONG wait_option)3955 UINT _nxd_mqtt_client_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
3956                               UINT keepalive, UINT clean_session, ULONG wait_option)
3957 {
3958 NX_PACKET           *packet_ptr;
3959 UINT                 status;
3960 TX_THREAD           *thread_ptr;
3961 UINT                 new_priority;
3962 UINT                 old_priority;
3963 
3964 
3965     /* Obtain the mutex. */
3966     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3967 
3968     if (status != TX_SUCCESS)
3969     {
3970 #ifdef NX_SECURE_ENABLE
3971         if (client_ptr -> nxd_mqtt_client_use_tls)
3972         {
3973             nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
3974         }
3975 #endif /* NX_SECURE_ENABLE */
3976 
3977         return(NXD_MQTT_MUTEX_FAILURE);
3978     }
3979 
3980     /* Do nothing if the client is already connected. */
3981     if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
3982     {
3983         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3984         return(NXD_MQTT_ALREADY_CONNECTED);
3985     }
3986 
3987     /* Check if client is connecting. */
3988     if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTING)
3989     {
3990         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3991         return(NXD_MQTT_CONNECTING);
3992     }
3993 
3994     /* Client state must be in IDLE.  */
3995     if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_IDLE)
3996     {
3997 #ifdef NX_SECURE_ENABLE
3998         if (client_ptr -> nxd_mqtt_client_use_tls)
3999         {
4000             nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
4001         }
4002 #endif /* NX_SECURE_ENABLE */
4003         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4004         return(NXD_MQTT_INVALID_STATE);
4005     }
4006 
4007 #if defined(NX_SECURE_ENABLE) && defined(NXD_MQTT_REQUIRE_TLS)
4008     if (!client_ptr -> nxd_mqtt_client_use_tls)
4009     {
4010 
4011         /* NXD_MQTT_REQUIRE_TLS is defined but the application does not use TLS.
4012            This is security violation.  Return with failure code. */
4013         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4014         return(NXD_MQTT_CONNECT_FAILURE);
4015     }
4016 #endif /* NX_SECURE_ENABLE && NXD_MQTT_REQUIRE_TLS*/
4017 
4018     /* Record the keepalive value, converted to TX timer ticks. */
4019     client_ptr -> nxd_mqtt_keepalive = keepalive * NX_IP_PERIODIC_RATE;
4020     if (keepalive)
4021     {
4022         client_ptr -> nxd_mqtt_timer_value = NXD_MQTT_KEEPALIVE_TIMER_RATE;
4023         client_ptr -> nxd_mqtt_ping_timeout = NXD_MQTT_PING_TIMEOUT_DELAY;
4024 
4025         /* Create timer */
4026         tx_timer_create(&(client_ptr -> nxd_mqtt_timer), "MQTT Timer", _nxd_mqtt_periodic_timer_entry, (ULONG)client_ptr,
4027                         client_ptr -> nxd_mqtt_timer_value, client_ptr -> nxd_mqtt_timer_value, TX_AUTO_ACTIVATE);
4028     }
4029     else
4030     {
4031         client_ptr -> nxd_mqtt_timer_value = 0;
4032         client_ptr -> nxd_mqtt_ping_timeout = 0;
4033     }
4034 
4035     /* Record the clean session flag.  */
4036     client_ptr -> nxd_mqtt_clean_session = clean_session;
4037 
4038     /* Set TCP connection establish notify for non-blocking mode.  */
4039     if (wait_option == 0)
4040     {
4041         nx_tcp_socket_establish_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_tcp_establish_notify);
4042 
4043         /* Set the receive callback. */
4044         nx_tcp_socket_receive_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_receive_callback);
4045 
4046 #ifdef NXD_MQTT_OVER_WEBSOCKET
4047         if (client_ptr -> nxd_mqtt_client_use_websocket)
4048         {
4049 
4050             /* Set the websocket connection callback.  */
4051             nx_websocket_client_connection_status_callback_set(&client_ptr -> nxd_mqtt_client_websocket, client_ptr, _nxd_mqtt_client_websocket_connection_status_callback);
4052         }
4053 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4054     }
4055     else
4056     {
4057 
4058         /* Clean receive callback.  */
4059         client_ptr -> nxd_mqtt_client_socket.nx_tcp_receive_callback = NX_NULL;
4060 
4061 #ifdef NXD_MQTT_OVER_WEBSOCKET
4062         if (client_ptr -> nxd_mqtt_client_use_websocket)
4063         {
4064 
4065             /* Clean the websocket connection callback.  */
4066             nx_websocket_client_connection_status_callback_set(&client_ptr -> nxd_mqtt_client_websocket, NX_NULL, NX_NULL);
4067         }
4068 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4069     }
4070 
4071     /* Release mutex */
4072     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4073 
4074     /* First attempt to bind the client socket. */
4075     nx_tcp_client_socket_bind(&(client_ptr -> nxd_mqtt_client_socket), NX_ANY_PORT, wait_option);
4076 
4077     /* Obtain the mutex. */
4078     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4079 
4080     /* Make state as NXD_MQTT_CLIENT_STATE_CONNECTING. */
4081     client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_CONNECTING;
4082 
4083     /* Release mutex. */
4084     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4085 
4086     /* Connect to the MQTT server */
4087     status = nxd_tcp_client_socket_connect(&(client_ptr -> nxd_mqtt_client_socket), server_ip, server_port, wait_option);
4088     if ((status != NX_SUCCESS) && (status != NX_IN_PROGRESS))
4089     {
4090 
4091         /* Obtain the mutex. */
4092         tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4093 
4094         /* Make state as NXD_MQTT_CLIENT_STATE_IDLE. */
4095         client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
4096 
4097         /* Release mutex. */
4098         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4099 
4100 #ifdef NX_SECURE_ENABLE
4101         if (client_ptr -> nxd_mqtt_client_use_tls)
4102         {
4103             nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
4104         }
4105 #endif /* NX_SECURE_ENABLE */
4106         nx_tcp_client_socket_unbind(&(client_ptr -> nxd_mqtt_client_socket));
4107         tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
4108         return(NXD_MQTT_CONNECT_FAILURE);
4109     }
4110 
4111     /* Just return for non-blocking mode.  */
4112     if (wait_option == 0)
4113     {
4114         return(NX_IN_PROGRESS);
4115     }
4116 
4117     /* Increase priority to the same of internal thread to avoid out of order packet process. */
4118 #ifndef NXD_MQTT_CLOUD_ENABLE
4119     thread_ptr = &(client_ptr -> nxd_mqtt_thread);
4120 #else
4121     thread_ptr = &(client_ptr -> nxd_mqtt_client_cloud_ptr -> nx_cloud_thread);
4122 #endif /* NXD_MQTT_CLOUD_ENABLE */
4123     tx_thread_info_get(thread_ptr, NX_NULL, NX_NULL, NX_NULL,
4124                        &new_priority, NX_NULL, NX_NULL, NX_NULL, NX_NULL);
4125     tx_thread_priority_change(tx_thread_identify(), new_priority, &old_priority);
4126 
4127     /* If TLS is enabled, start TLS */
4128 #ifdef NX_SECURE_ENABLE
4129     if (client_ptr -> nxd_mqtt_client_use_tls)
4130     {
4131 
4132         status = nx_secure_tls_session_start(&(client_ptr -> nxd_mqtt_tls_session), &(client_ptr -> nxd_mqtt_client_socket), wait_option);
4133 
4134         if (status != NX_SUCCESS)
4135         {
4136 
4137             /* Revert thread priority. */
4138             tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4139 
4140             /* End connection. */
4141             _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4142 
4143             return(NXD_MQTT_CONNECT_FAILURE);
4144         }
4145     }
4146 #endif /* NX_SECURE_ENABLE */
4147 
4148 #ifdef NXD_MQTT_OVER_WEBSOCKET
4149     if (client_ptr -> nxd_mqtt_client_use_websocket)
4150     {
4151 #ifdef NX_SECURE_ENABLE
4152         if (client_ptr -> nxd_mqtt_client_use_tls)
4153         {
4154             status = nx_websocket_client_secure_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_tls_session),
4155                                                         client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
4156                                                         client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
4157                                                         (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
4158                                                         wait_option);
4159         }
4160         else
4161 #endif /* NX_SECURE_ENABLE */
4162         {
4163             status = nx_websocket_client_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_client_socket),
4164                                                  client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
4165                                                  client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
4166                                                  (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
4167                                                  wait_option);
4168         }
4169 
4170         if (status != NX_SUCCESS)
4171         {
4172 
4173             /* Revert thread priority. */
4174             tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4175 
4176             /* End connection. */
4177             _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4178 
4179             return(NXD_MQTT_CONNECT_FAILURE);
4180         }
4181     }
4182 
4183 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4184 
4185     /* Start to send connect packet.  */
4186     status = _nxd_mqtt_client_connect_packet_send(client_ptr, wait_option);
4187 
4188     if (status != NX_SUCCESS)
4189     {
4190 
4191         /* Revert thread priority. */
4192         tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4193 
4194         /* End connection. */
4195         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4196         return(NXD_MQTT_CONNECT_FAILURE);
4197     }
4198 
4199     /* Call a receive. */
4200     status = _nxd_mqtt_packet_receive(client_ptr, &packet_ptr, wait_option);
4201 
4202     /* Revert thread priority. */
4203     tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4204 
4205     /* Check status.  */
4206     if (status)
4207     {
4208 
4209         /* End connection. */
4210         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4211         return(NXD_MQTT_COMMUNICATION_FAILURE);
4212     }
4213 
4214     /* Process CONNACK message.  */
4215     status = _nxd_mqtt_process_connack(client_ptr, packet_ptr, wait_option);
4216 
4217     /* Release the packet.  */
4218     nx_packet_release(packet_ptr);
4219 
4220     /* Check status.  */
4221     if (status == NX_SUCCESS)
4222     {
4223 
4224         /* Set the receive callback. */
4225         nx_tcp_socket_receive_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_receive_callback);
4226     }
4227 
4228     return(status);
4229 }
4230 
4231 /**************************************************************************/
4232 /*                                                                        */
4233 /*  FUNCTION                                               RELEASE        */
4234 /*                                                                        */
4235 /*    _nxd_mqtt_client_connect_packet_send                PORTABLE C      */
4236 /*                                                           6.3.0        */
4237 /*  AUTHOR                                                                */
4238 /*                                                                        */
4239 /*    Yuxin Zhou, Microsoft Corporation                                   */
4240 /*                                                                        */
4241 /*  DESCRIPTION                                                           */
4242 /*                                                                        */
4243 /*    This function sends CONNECT packet to MQTT server.                  */
4244 /*                                                                        */
4245 /*                                                                        */
4246 /*  INPUT                                                                 */
4247 /*                                                                        */
4248 /*    client_ptr                            Pointer to MQTT Client        */
4249 /*    keepalive                             Keepalive flag                */
4250 /*    clean_session                         Clean session flag            */
4251 /*    wait_option                           Timeout value                 */
4252 /*                                                                        */
4253 /*                                                                        */
4254 /*  OUTPUT                                                                */
4255 /*                                                                        */
4256 /*    status                                Completion status             */
4257 /*                                                                        */
4258 /*  CALLS                                                                 */
4259 /*                                                                        */
4260 /*    nx_packet_release                                                   */
4261 /*    _nxd_mqtt_client_packet_allocate                                    */
4262 /*    _nxd_mqtt_release_transmit_packet                                   */
4263 /*    _nxd_mqtt_client_connection_end                                     */
4264 /*    _nxd_mqtt_client_set_fixed_header                                   */
4265 /*    _nxd_mqtt_client_append_message                                     */
4266 /*    _nxd_mqtt_packet_send                                               */
4267 /*                                                                        */
4268 /*  CALLED BY                                                             */
4269 /*                                                                        */
4270 /*    _nxd_mqtt_client_connect                                            */
4271 /*    _nxd_mqtt_tcp_establish_process                                     */
4272 /*    _nxd_mqtt_tls_establish_process                                     */
4273 /*                                                                        */
4274 /*  RELEASE HISTORY                                                       */
4275 /*                                                                        */
4276 /*    DATE              NAME                      DESCRIPTION             */
4277 /*                                                                        */
4278 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4279 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4280 /*                                            resulting in version 6.1    */
4281 /*  07-29-2022     Spencer McDonough        Modified comment(s),          */
4282 /*                                            improved internal logic,    */
4283 /*                                            resulting in version 6.1.12 */
4284 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
4285 /*                                            the logic of sending packet,*/
4286 /*                                            resulting in version 6.2.0  */
4287 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
4288 /*                                            internal logic,             */
4289 /*                                            resulting in version 6.3.0  */
4290 /*                                                                        */
4291 /**************************************************************************/
_nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)4292 UINT _nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
4293 {
4294 NX_PACKET           *packet_ptr;
4295 UINT                 status;
4296 UINT                 length = 0;
4297 UCHAR                connection_flags = 0;
4298 UINT                 ret = NXD_MQTT_SUCCESS;
4299 UCHAR                temp_data[4];
4300 UINT                 keepalive = (client_ptr -> nxd_mqtt_keepalive/NX_IP_PERIODIC_RATE);
4301 
4302 
4303     /* Construct connect flags by taking the connect flag user supplies, or'ing the username and
4304        password bits, if they are supplied. */
4305     if (client_ptr -> nxd_mqtt_client_username)
4306     {
4307         connection_flags |= MQTT_CONNECT_FLAGS_USERNAME;
4308 
4309         /* Add the password flag only if username is supplied. */
4310         if (client_ptr -> nxd_mqtt_client_password)
4311         {
4312             connection_flags |= MQTT_CONNECT_FLAGS_PASSWORD;
4313         }
4314     }
4315 
4316     if (client_ptr -> nxd_mqtt_client_will_topic)
4317     {
4318         connection_flags = connection_flags | MQTT_CONNECT_FLAGS_WILL_FLAG;
4319 
4320 
4321         if (client_ptr -> nxd_mqtt_client_will_qos_retain & 0x80)
4322         {
4323             connection_flags = connection_flags | MQTT_CONNECT_FLAGS_WILL_RETAIN;
4324         }
4325 
4326         connection_flags = (UCHAR)(connection_flags | ((client_ptr -> nxd_mqtt_client_will_qos_retain & 0x3) << 3));
4327     }
4328 
4329     if (client_ptr -> nxd_mqtt_clean_session == NX_TRUE)
4330     {
4331         connection_flags = connection_flags | MQTT_CONNECT_FLAGS_CLEAN_SESSION;
4332 
4333         /* Clear any transmit blocks from the previous session. */
4334         while (client_ptr -> message_transmit_queue_head)
4335         {
4336             _nxd_mqtt_release_transmit_packet(client_ptr, client_ptr -> message_transmit_queue_head, NX_NULL);
4337         }
4338     }
4339 
4340     /* Set the length of the packet. */
4341     length = 10;
4342 
4343     /* Add the size of the client Identifier. */
4344     length += (client_ptr -> nxd_mqtt_client_id_length + 2);
4345 
4346     /* Add the will topic, if present. */
4347     if (connection_flags & MQTT_CONNECT_FLAGS_WILL_FLAG)
4348     {
4349         length += (client_ptr -> nxd_mqtt_client_will_topic_length + 2);
4350         length += (client_ptr -> nxd_mqtt_client_will_message_length + 2);
4351     }
4352     if (connection_flags & MQTT_CONNECT_FLAGS_USERNAME)
4353     {
4354         length += (UINT)(client_ptr -> nxd_mqtt_client_username_length + 2);
4355     }
4356     if (connection_flags & MQTT_CONNECT_FLAGS_PASSWORD)
4357     {
4358         length += (UINT)(client_ptr -> nxd_mqtt_client_password_length + 2);
4359     }
4360 
4361     /* Check for invalid length. */
4362     if (length > (127 * 127 * 127 * 127))
4363     {
4364         return(NXD_MQTT_INTERNAL_ERROR);
4365     }
4366 
4367     status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, wait_option);
4368 
4369     if (status)
4370     {
4371         return(status);
4372     }
4373 
4374     /* Construct MQTT CONNECT message. */
4375     temp_data[0] = ((MQTT_CONTROL_PACKET_TYPE_CONNECT << 4) & 0xF0);
4376 
4377     /* Fill in fixed header. */
4378     ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, temp_data[0], length, wait_option);
4379 
4380     if (ret)
4381     {
4382         /* Release the packet. */
4383         nx_packet_release(packet_ptr);
4384         return(NXD_MQTT_PACKET_POOL_FAILURE);
4385     }
4386 
4387     /* Fill in protocol name. */
4388     ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, "MQTT", 4, wait_option);
4389 
4390     if (ret)
4391     {
4392         /* Release the packet. */
4393         nx_packet_release(packet_ptr);
4394         return(NXD_MQTT_PACKET_POOL_FAILURE);
4395     }
4396 
4397     /* Fill in protocol level, */
4398     temp_data[0] = MQTT_PROTOCOL_LEVEL;
4399 
4400     /* Fill in byte 8: connect flags */
4401     temp_data[1] = connection_flags;
4402 
4403     /* Fill in byte 9 and 10: keep alive */
4404     temp_data[2] = (keepalive >> 8) & 0xFF;
4405     temp_data[3] = (keepalive & 0xFF);
4406 
4407     ret = nx_packet_data_append(packet_ptr, temp_data, 4, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4408 
4409     if (ret)
4410     {
4411 
4412         /* Release the packet. */
4413         nx_packet_release(packet_ptr);
4414 
4415         return(NXD_MQTT_PACKET_POOL_FAILURE);
4416     }
4417 
4418     /* Fill in payload area, in the order of: client identifier, will topic, will message,
4419        user name, and password. */
4420     ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_id,
4421                                           client_ptr -> nxd_mqtt_client_id_length, wait_option);
4422 
4423     /* Next fill will topic and will message if the will flag is set. */
4424     if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_WILL_FLAG))
4425     {
4426         ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, (CHAR *)client_ptr -> nxd_mqtt_client_will_topic,
4427                                               client_ptr -> nxd_mqtt_client_will_topic_length, wait_option);
4428 
4429         if (!ret)
4430         {
4431             ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, (CHAR *)client_ptr -> nxd_mqtt_client_will_message,
4432                                                   client_ptr -> nxd_mqtt_client_will_message_length, wait_option);
4433         }
4434     }
4435 
4436     /* Fill username if username flag is set */
4437     if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_USERNAME))
4438     {
4439         ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_username,
4440                                               client_ptr -> nxd_mqtt_client_username_length, wait_option);
4441     }
4442 
4443     /* Fill password if password flag is set */
4444     if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_PASSWORD))
4445     {
4446         ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_password,
4447                                               client_ptr -> nxd_mqtt_client_password_length, wait_option);
4448     }
4449 
4450     if (ret)
4451     {
4452 
4453         /* Release the packet. */
4454         nx_packet_release(packet_ptr);
4455 
4456         return(NXD_MQTT_PACKET_POOL_FAILURE);
4457     }
4458 
4459     /* Ready to send the connect message to the server. */
4460     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
4461 
4462     if (status)
4463     {
4464 
4465         /* Release the packet. */
4466         nx_packet_release(packet_ptr);
4467     }
4468 
4469     /* Update the timeout value. */
4470     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
4471 
4472     return(status);
4473 }
4474 
4475 /**************************************************************************/
4476 /*                                                                        */
4477 /*  FUNCTION                                               RELEASE        */
4478 /*                                                                        */
4479 /*    _nxd_mqtt_client_secure_connect                     PORTABLE C      */
4480 /*                                                           6.1          */
4481 /*  AUTHOR                                                                */
4482 /*                                                                        */
4483 /*    Yuxin Zhou, Microsoft Corporation                                   */
4484 /*                                                                        */
4485 /*  DESCRIPTION                                                           */
4486 /*                                                                        */
4487 /*    This function makes an initial secure (TLS) connection to           */
4488 /*    the MQTT server.                                                    */
4489 /*                                                                        */
4490 /*                                                                        */
4491 /*  INPUT                                                                 */
4492 /*                                                                        */
4493 /*    client_ptr                            Pointer to MQTT Client        */
4494 /*    server_ip                             Server IP address structure   */
4495 /*    server_port                           Server port number, in host   */
4496 /*                                            byte order                  */
4497 /*    tls_setup                             User-supplied callback        */
4498 /*                                            function to set up TLS      */
4499 /*                                            parameters.                 */
4500 /*    username                              User name, or NULL if user    */
4501 /*                                            name is not required        */
4502 /*    username_length                       Length of the user name, or   */
4503 /*                                            0 if user name is NULL      */
4504 /*    password                              Password string, or NULL if   */
4505 /*                                            password is not required    */
4506 /*    password_length                       Length of the password, or    */
4507 /*                                            0 if password is NULL       */
4508 /*    connection_flag                       Flag passed to the server     */
4509 /*    timeout                               Timeout value                 */
4510 /*                                                                        */
4511 /*                                                                        */
4512 /*  OUTPUT                                                                */
4513 /*                                                                        */
4514 /*    status                                Completion status             */
4515 /*                                                                        */
4516 /*  CALLS                                                                 */
4517 /*                                                                        */
4518 /*    _nxd_mqtt_client_connect              Actual MQTT Client connect    */
4519 /*                                            call                        */
4520 /*                                                                        */
4521 /*  CALLED BY                                                             */
4522 /*                                                                        */
4523 /*    Application Code                                                    */
4524 /*                                                                        */
4525 /*  RELEASE HISTORY                                                       */
4526 /*                                                                        */
4527 /*    DATE              NAME                      DESCRIPTION             */
4528 /*                                                                        */
4529 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4530 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4531 /*                                            resulting in version 6.1    */
4532 /*                                                                        */
4533 /**************************************************************************/
4534 #ifdef NX_SECURE_ENABLE
_nxd_mqtt_client_secure_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT (* tls_setup)(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION *,NX_SECURE_X509_CERT *,NX_SECURE_X509_CERT *),UINT keepalive,UINT clean_session,ULONG wait_option)4535 UINT _nxd_mqtt_client_secure_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
4536                                      UINT (*tls_setup)(NXD_MQTT_CLIENT *client_ptr, NX_SECURE_TLS_SESSION *,
4537                                                        NX_SECURE_X509_CERT *, NX_SECURE_X509_CERT *),
4538                                      UINT keepalive, UINT clean_session, ULONG wait_option)
4539 {
4540 UINT ret;
4541 
4542     /* Set up TLS session information. */
4543     ret = (*tls_setup)(client_ptr, &client_ptr -> nxd_mqtt_tls_session,
4544                        &client_ptr -> nxd_mqtt_tls_certificate,
4545                        &client_ptr -> nxd_mqtt_tls_trusted_certificate);
4546 
4547     if (ret)
4548     {
4549         return(ret);
4550     }
4551 
4552     /* Mark the connection as secure. */
4553     client_ptr -> nxd_mqtt_client_use_tls = 1;
4554 
4555     ret = _nxd_mqtt_client_connect(client_ptr, server_ip, server_port, keepalive, clean_session, wait_option);
4556 
4557     return(ret);
4558 }
4559 
4560 #endif /* NX_SECURE_ENABLE */
4561 
4562 
4563 /**************************************************************************/
4564 /*                                                                        */
4565 /*  FUNCTION                                               RELEASE        */
4566 /*                                                                        */
4567 /*    _nxd_mqtt_client_delete                             PORTABLE C      */
4568 /*                                                           6.2.0        */
4569 /*  AUTHOR                                                                */
4570 /*                                                                        */
4571 /*    Yuxin Zhou, Microsoft Corporation                                   */
4572 /*                                                                        */
4573 /*  DESCRIPTION                                                           */
4574 /*                                                                        */
4575 /*    This function deletes a previously created MQTT client instance.    */
4576 /*    If the NXD_MQTT_SOCKET_TIMEOUT is set to NX_WAIT_FOREVER, this may  */
4577 /*    suspend infinitely. This is because the MQTT Client must            */
4578 /*    disconnect with the server, and if the network link is disabled or  */
4579 /*    the server is not responding, this will blocks this function from   */
4580 /*    completing.                                                         */
4581 /*                                                                        */
4582 /*  INPUT                                                                 */
4583 /*                                                                        */
4584 /*    client_ptr                            Pointer to MQTT Client        */
4585 /*                                                                        */
4586 /*  OUTPUT                                                                */
4587 /*                                                                        */
4588 /*    status                                Completion status             */
4589 /*                                                                        */
4590 /*  CALLS                                                                 */
4591 /*                                                                        */
4592 /*    tx_event_flags_set                                                  */
4593 /*                                                                        */
4594 /*  CALLED BY                                                             */
4595 /*                                                                        */
4596 /*    Application Code                                                    */
4597 /*                                                                        */
4598 /*  RELEASE HISTORY                                                       */
4599 /*                                                                        */
4600 /*    DATE              NAME                      DESCRIPTION             */
4601 /*                                                                        */
4602 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4603 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4604 /*                                            resulting in version 6.1    */
4605 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
4606 /*                                            mqtt over websocket,        */
4607 /*                                            resulting in version 6.2.0  */
4608 /*                                                                        */
4609 /**************************************************************************/
_nxd_mqtt_client_delete(NXD_MQTT_CLIENT * client_ptr)4610 UINT _nxd_mqtt_client_delete(NXD_MQTT_CLIENT *client_ptr)
4611 {
4612 
4613 
4614     /* Set the event flag for DELETE. Next time when the MQTT client thread
4615        wakes up, it will perform the deletion process. */
4616 #ifndef NXD_MQTT_CLOUD_ENABLE
4617     tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_DELETE_EVENT, TX_OR);
4618 #else
4619     nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_DELETE_EVENT);
4620 #endif /* NXD_MQTT_CLOUD_ENABLE */
4621 
4622     /* Check if the MQTT client thread has completed. */
4623     while((client_ptr -> nxd_mqtt_client_socket).nx_tcp_socket_id != 0)
4624     {
4625         tx_thread_sleep(NX_IP_PERIODIC_RATE);
4626     }
4627 
4628 #ifndef NXD_MQTT_CLOUD_ENABLE
4629     /* Now we can delete the Client instance. */
4630     tx_thread_delete(&(client_ptr -> nxd_mqtt_thread));
4631 #else
4632     /* Deregister mqtt module from cloud helper.  */
4633     nx_cloud_module_deregister(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module));
4634 
4635     /* Delete own created cloud.  */
4636     if (client_ptr -> nxd_mqtt_client_cloud.nx_cloud_id == NX_CLOUD_ID)
4637         nx_cloud_delete(&(client_ptr -> nxd_mqtt_client_cloud));
4638 #endif /* NXD_MQTT_CLOUD_ENABLE */
4639 
4640 #ifdef NXD_MQTT_OVER_WEBSOCKET
4641     if (client_ptr -> nxd_mqtt_client_use_websocket)
4642     {
4643         nx_websocket_client_delete(&client_ptr -> nxd_mqtt_client_websocket);
4644         client_ptr -> nxd_mqtt_client_use_websocket = NX_FALSE;
4645     }
4646 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4647 
4648     return(NXD_MQTT_SUCCESS);
4649 }
4650 
4651 /**************************************************************************/
4652 /*                                                                        */
4653 /*  FUNCTION                                               RELEASE        */
4654 /*                                                                        */
4655 /*    _nxd_mqtt_client_publish_packet_send                PORTABLE C      */
4656 /*                                                           6.2.0        */
4657 /*  AUTHOR                                                                */
4658 /*                                                                        */
4659 /*    Yuxin Zhou, Microsoft Corporation                                   */
4660 /*                                                                        */
4661 /*  DESCRIPTION                                                           */
4662 /*                                                                        */
4663 /*    This function sends a publish packet to the connected broker.       */
4664 /*                                                                        */
4665 /*                                                                        */
4666 /*  INPUT                                                                 */
4667 /*                                                                        */
4668 /*    client_ptr                            Pointer to MQTT Client        */
4669 /*    packet_ptr                            Pointer to publish packet     */
4670 /*    packet_id                             Current packet ID             */
4671 /*    QoS                                   Quality of service            */
4672 /*    wait_option                           Suspension option             */
4673 /*                                                                        */
4674 /*  OUTPUT                                                                */
4675 /*                                                                        */
4676 /*    status                                Completion status             */
4677 /*                                                                        */
4678 /*  CALLS                                                                 */
4679 /*                                                                        */
4680 /*    tx_mutex_get                                                        */
4681 /*    tx_mutex_put                                                        */
4682 /*    nx_packet_release                                                   */
4683 /*    _nxd_mqtt_copy_transmit_packet                                      */
4684 /*    _nxd_mqtt_packet_send                                               */
4685 /*                                                                        */
4686 /*  CALLED BY                                                             */
4687 /*                                                                        */
4688 /*    _nxd_mqtt_client_publish                                            */
4689 /*                                                                        */
4690 /*  RELEASE HISTORY                                                       */
4691 /*                                                                        */
4692 /*    DATE              NAME                      DESCRIPTION             */
4693 /*                                                                        */
4694 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4695 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4696 /*                                            resulting in version 6.1    */
4697 /*  08-02-2021     Yuxin Zhou               Modified comment(s),          */
4698 /*                                            supported maximum transmit  */
4699 /*                                            queue depth,                */
4700 /*                                            resulting in version 6.1.8  */
4701 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
4702 /*                                            the logic of sending packet,*/
4703 /*                                            resulting in version 6.2.0  */
4704 /*                                                                        */
4705 /**************************************************************************/
_nxd_mqtt_client_publish_packet_send(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,USHORT packet_id,UINT QoS,ULONG wait_option)4706 UINT _nxd_mqtt_client_publish_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr,
4707                                           USHORT packet_id, UINT QoS, ULONG wait_option)
4708 {
4709 
4710 UINT       status;
4711 UINT       ret = NXD_MQTT_SUCCESS;
4712 
4713     if (QoS != 0)
4714     {
4715     /* This packet needs to be stored locally for possible retransmission. */
4716     NX_PACKET *transmit_packet_ptr;
4717 
4718         /* Copy packet for retransmission. */
4719         if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
4720                                            packet_id, NX_TRUE, wait_option))
4721         {
4722             return(NXD_MQTT_PACKET_POOL_FAILURE);
4723         }
4724 
4725         /* Obtain the mutex. */
4726         status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4727 
4728         if (status != TX_SUCCESS)
4729         {
4730             nx_packet_release(transmit_packet_ptr);
4731 
4732 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
4733             /* Decrease the transmit queue depth.  */
4734             client_ptr -> message_transmit_queue_depth--;
4735 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
4736             return(NXD_MQTT_MUTEX_FAILURE);
4737         }
4738 
4739         if (client_ptr -> message_transmit_queue_head == NX_NULL)
4740         {
4741             client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
4742         }
4743         else
4744         {
4745             client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
4746         }
4747         client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
4748     }
4749     else
4750     {
4751 
4752         /* Obtain the mutex. */
4753         status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4754 
4755         if (status != TX_SUCCESS)
4756         {
4757             return(NXD_MQTT_MUTEX_FAILURE);
4758         }
4759     }
4760 
4761     /* Update the timeout value. */
4762     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
4763 
4764 
4765     /* Release the mutex. */
4766     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4767 
4768     /* Ready to send the connect message to the server. */
4769     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
4770 
4771     if (status)
4772     {
4773         ret = NXD_MQTT_COMMUNICATION_FAILURE;
4774     }
4775 
4776     return(ret);
4777 }
4778 
4779 /**************************************************************************/
4780 /*                                                                        */
4781 /*  FUNCTION                                               RELEASE        */
4782 /*                                                                        */
4783 /*    _nxd_mqtt_client_publish                            PORTABLE C      */
4784 /*                                                           6.3.0        */
4785 /*  AUTHOR                                                                */
4786 /*                                                                        */
4787 /*    Yuxin Zhou, Microsoft Corporation                                   */
4788 /*                                                                        */
4789 /*  DESCRIPTION                                                           */
4790 /*                                                                        */
4791 /*    This function publishes a message to the connected broker.          */
4792 /*                                                                        */
4793 /*                                                                        */
4794 /*  INPUT                                                                 */
4795 /*                                                                        */
4796 /*    client_ptr                            Pointer to MQTT Client        */
4797 /*    topic_name                            Name of the topic             */
4798 /*    topic_name_length                     Length of the topic name      */
4799 /*    message                               Message string                */
4800 /*    message_length                        Length of the message,        */
4801 /*                                            in bytes                    */
4802 /*    retain                                The retain flag, whether      */
4803 /*                                            or not the broker should    */
4804 /*                                            store this message          */
4805 /*    QoS                                   Expected QoS level            */
4806 /*    wait_option                           Suspension option             */
4807 /*                                                                        */
4808 /*  OUTPUT                                                                */
4809 /*                                                                        */
4810 /*    status                                Completion status             */
4811 /*                                                                        */
4812 /*  CALLS                                                                 */
4813 /*                                                                        */
4814 /*    tx_mutex_get                                                        */
4815 /*    _nxd_mqtt_client_packet_allocate                                    */
4816 /*    _nxd_mqtt_client_set_fixed_header                                   */
4817 /*    _nxd_mqtt_client_append_message                                     */
4818 /*    tx_mutex_put                                                        */
4819 /*    nx_packet_release                                                   */
4820 /*    _nxd_mqtt_client_publish_packet_send                                */
4821 /*                                                                        */
4822 /*  CALLED BY                                                             */
4823 /*                                                                        */
4824 /*    Application Code                                                    */
4825 /*                                                                        */
4826 /*  RELEASE HISTORY                                                       */
4827 /*                                                                        */
4828 /*    DATE              NAME                      DESCRIPTION             */
4829 /*                                                                        */
4830 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4831 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4832 /*                                            resulting in version 6.1    */
4833 /*  07-29-2022     Spencer McDonough        Modified comment(s),          */
4834 /*                                            improved internal logic,    */
4835 /*                                            resulting in version 6.1.12 */
4836 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
4837 /*                                            internal logic,             */
4838 /*                                            resulting in version 6.3.0  */
4839 /*                                                                        */
4840 /**************************************************************************/
_nxd_mqtt_client_publish(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,CHAR * message,UINT message_length,UINT retain,UINT QoS,ULONG wait_option)4841 UINT _nxd_mqtt_client_publish(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length,
4842                               CHAR *message, UINT message_length, UINT retain, UINT QoS, ULONG wait_option)
4843 {
4844 
4845 NX_PACKET *packet_ptr;
4846 UINT       status;
4847 UINT       length = 0;
4848 UCHAR      flags;
4849 USHORT     packet_id = 0;
4850 UINT       ret = NXD_MQTT_SUCCESS;
4851 
4852     if (QoS == 2)
4853     {
4854         return(NXD_MQTT_QOS2_NOT_SUPPORTED);
4855     }
4856 
4857 
4858     /* Do nothing if the client is already connected. */
4859     if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTED)
4860     {
4861         return(NXD_MQTT_NOT_CONNECTED);
4862     }
4863 
4864     status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, wait_option);
4865 
4866     if (status != NXD_MQTT_SUCCESS)
4867     {
4868         return(NXD_MQTT_PACKET_POOL_FAILURE);
4869     }
4870 
4871     flags = (UCHAR)((MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4) | (QoS << 1));
4872 
4873     if (retain)
4874     {
4875         flags = flags | MQTT_PUBLISH_RETAIN;
4876     }
4877 
4878 
4879     /* Compute the remaining length. */
4880 
4881     /* Topic Name. */
4882     /* Compute Topic Name length. */
4883     length = topic_name_length + 2;
4884 
4885     /* Count packet ID for QoS 1 or QoS 2 message. */
4886     if ((QoS == 1) || (QoS == 2))
4887     {
4888         length += 2;
4889     }
4890 
4891     /* Count message. */
4892     if ((message != NX_NULL) && (message_length != 0))
4893     {
4894         length += message_length;
4895     }
4896 
4897     /* Write out the control header and remaining length field. */
4898     ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, flags, length, wait_option);
4899 
4900     if (ret)
4901     {
4902 
4903         /* Release the packet. */
4904         nx_packet_release(packet_ptr);
4905 
4906         return(NXD_MQTT_INTERNAL_ERROR);
4907     }
4908 
4909 
4910     /* Write out topic */
4911     ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, topic_name, topic_name_length, wait_option);
4912 
4913     if (ret)
4914     {
4915 
4916         /* Release the packet. */
4917         nx_packet_release(packet_ptr);
4918 
4919         return(NXD_MQTT_INTERNAL_ERROR);
4920     }
4921 
4922     /* Append Packet Identifier for QoS level 1 or 2  MQTT 3.3.2.2 */
4923     if ((QoS == 1) || (QoS == 2))
4924     {
4925     UCHAR identifier[2];
4926 
4927         /* Obtain the mutex. */
4928         status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4929 
4930         if (status != TX_SUCCESS)
4931         {
4932             return(NXD_MQTT_MUTEX_FAILURE);
4933         }
4934 
4935         packet_id = (USHORT)client_ptr -> nxd_mqtt_client_packet_identifier;
4936         identifier[0] = (UCHAR)(client_ptr -> nxd_mqtt_client_packet_identifier >> 8);
4937         identifier[1] = (client_ptr -> nxd_mqtt_client_packet_identifier & 0xFF);
4938 
4939         /* Update packet id. */
4940         client_ptr -> nxd_mqtt_client_packet_identifier = (client_ptr -> nxd_mqtt_client_packet_identifier + 1) & 0xFFFF;
4941 
4942         /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
4943         if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
4944             client_ptr -> nxd_mqtt_client_packet_identifier = 1;
4945 
4946         /* Release the mutex. */
4947         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4948 
4949         ret = nx_packet_data_append(packet_ptr, identifier, 2,
4950                                     client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4951 
4952         if (ret)
4953         {
4954 
4955             /* Release the packet. */
4956             nx_packet_release(packet_ptr);
4957 
4958             return(NXD_MQTT_INTERNAL_ERROR);
4959         }
4960     }
4961 
4962     /* Append message. */
4963     if ((message != NX_NULL) && (message_length) != 0)
4964     {
4965 
4966         /* Use nx_packet_data_append to move user-supplied message data into the packet.
4967            nx_packet_data_append uses chained packet if the additional storage space is
4968            needed. */
4969         ret = nx_packet_data_append(packet_ptr, message, message_length,
4970                                        client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4971         if(ret)
4972         {
4973             /* Unable to obtain a new packet to store the message. */
4974 
4975             /* Release the packet. */
4976             nx_packet_release(packet_ptr);
4977 
4978             return(NXD_MQTT_INTERNAL_ERROR);
4979         }
4980     }
4981 
4982     /* Send publish packet. */
4983     ret = _nxd_mqtt_client_publish_packet_send(client_ptr, packet_ptr, packet_id, QoS, wait_option);
4984 
4985     if (ret)
4986     {
4987 
4988         /* Release the packet. */
4989         nx_packet_release(packet_ptr);
4990     }
4991     return(ret);
4992 }
4993 
4994 /**************************************************************************/
4995 /*                                                                        */
4996 /*  FUNCTION                                               RELEASE        */
4997 /*                                                                        */
4998 /*    _nxd_mqtt_client_subscribe                          PORTABLE C      */
4999 /*                                                           6.1.2        */
5000 /*  AUTHOR                                                                */
5001 /*                                                                        */
5002 /*    Yuxin Zhou, Microsoft Corporation                                   */
5003 /*                                                                        */
5004 /*  DESCRIPTION                                                           */
5005 /*                                                                        */
5006 /*    This function sends a subscribe message to the broker.              */
5007 /*                                                                        */
5008 /*                                                                        */
5009 /*  INPUT                                                                 */
5010 /*                                                                        */
5011 /*    client_ptr                            Pointer to MQTT Client        */
5012 /*    topic_name                            Pointer to the topic string   */
5013 /*                                            to subscribe to             */
5014 /*    topic_name_length                     Length of the topic string    */
5015 /*                                            in bytes                    */
5016 /*    QoS                                   Expected QoS level            */
5017 /*                                                                        */
5018 /*  OUTPUT                                                                */
5019 /*                                                                        */
5020 /*    status                                Completion status             */
5021 /*                                                                        */
5022 /*  CALLS                                                                 */
5023 /*                                                                        */
5024 /*    _nxd_mqtt_client_sub_unsub            The actual routine that       */
5025 /*                                            performs the sub/unsub      */
5026 /*                                            action.                     */
5027 /*                                                                        */
5028 /*  CALLED BY                                                             */
5029 /*                                                                        */
5030 /*    Application Code                                                    */
5031 /*                                                                        */
5032 /*  RELEASE HISTORY                                                       */
5033 /*                                                                        */
5034 /*    DATE              NAME                      DESCRIPTION             */
5035 /*                                                                        */
5036 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5037 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5038 /*                                            resulting in version 6.1    */
5039 /*  11-09-2020     Yuxin Zhou               Modified comment(s), and      */
5040 /*                                            added packet id parameter,  */
5041 /*                                            resulting in version 6.1.2  */
5042 /*                                                                        */
5043 /**************************************************************************/
_nxd_mqtt_client_subscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,UINT QoS)5044 UINT _nxd_mqtt_client_subscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length, UINT QoS)
5045 {
5046 
5047     if (QoS == 2)
5048     {
5049         return(NXD_MQTT_QOS2_NOT_SUPPORTED);
5050     }
5051 
5052     return(_nxd_mqtt_client_sub_unsub(client_ptr, (MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02,
5053                                       topic_name, topic_name_length, NX_NULL, QoS));
5054 }
5055 
5056 
5057 
5058 
5059 /**************************************************************************/
5060 /*                                                                        */
5061 /*  FUNCTION                                               RELEASE        */
5062 /*                                                                        */
5063 /*    _nxd_mqtt_client_unsubscribe                        PORTABLE C      */
5064 /*                                                           6.1.2        */
5065 /*  AUTHOR                                                                */
5066 /*                                                                        */
5067 /*    Yuxin Zhou, Microsoft Corporation                                   */
5068 /*                                                                        */
5069 /*  DESCRIPTION                                                           */
5070 /*                                                                        */
5071 /*    This function unsubscribes a topic from the broker.                 */
5072 /*                                                                        */
5073 /*                                                                        */
5074 /*  INPUT                                                                 */
5075 /*                                                                        */
5076 /*    client_ptr                            Pointer to MQTT Client        */
5077 /*    topic_name                            Pointer to the topic string   */
5078 /*                                            to subscribe to             */
5079 /*    topic_name_length                     Length of the topic string    */
5080 /*                                            in bytes                    */
5081 /*                                                                        */
5082 /*  OUTPUT                                                                */
5083 /*                                                                        */
5084 /*    status                                Completion status             */
5085 /*                                                                        */
5086 /*  CALLS                                                                 */
5087 /*                                                                        */
5088 /*    _nxd_mqtt_client_sub_unsub                                          */
5089 /*                                                                        */
5090 /*  CALLED BY                                                             */
5091 /*                                                                        */
5092 /*    Application Code                                                    */
5093 /*                                                                        */
5094 /*  RELEASE HISTORY                                                       */
5095 /*                                                                        */
5096 /*    DATE              NAME                      DESCRIPTION             */
5097 /*                                                                        */
5098 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5099 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5100 /*                                            resulting in version 6.1    */
5101 /*  11-09-2020     Yuxin Zhou               Modified comment(s), and      */
5102 /*                                            added packet id parameter,  */
5103 /*                                            resulting in version 6.1.2  */
5104 /*                                                                        */
5105 /**************************************************************************/
_nxd_mqtt_client_unsubscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length)5106 UINT _nxd_mqtt_client_unsubscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length)
5107 {
5108     return(_nxd_mqtt_client_sub_unsub(client_ptr, (MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE << 4) | 0x02,
5109                                       topic_name, topic_name_length, NX_NULL, 0));
5110 }
5111 
5112 /**************************************************************************/
5113 /*                                                                        */
5114 /*  FUNCTION                                               RELEASE        */
5115 /*                                                                        */
5116 /*    _nxd_mqtt_send_simple_message                       PORTABLE C      */
5117 /*                                                           6.3.0        */
5118 /*  AUTHOR                                                                */
5119 /*                                                                        */
5120 /*    Yuxin Zhou, Microsoft Corporation                                   */
5121 /*                                                                        */
5122 /*  DESCRIPTION                                                           */
5123 /*                                                                        */
5124 /*    This internal function handles the transmission of PINGREQ or       */
5125 /*    DISCONNECT message.                                                 */
5126 /*                                                                        */
5127 /*                                                                        */
5128 /*  INPUT                                                                 */
5129 /*                                                                        */
5130 /*    client_ptr                            Pointer to MQTT Client        */
5131 /*    header_value                          Value to be programmed into   */
5132 /*                                            MQTT header.                */
5133 /*                                                                        */
5134 /*  OUTPUT                                                                */
5135 /*                                                                        */
5136 /*    status                                Completion status             */
5137 /*                                                                        */
5138 /*  CALLS                                                                 */
5139 /*                                                                        */
5140 /*    tx_mutex_get                                                        */
5141 /*    _nxd_mqtt_client_packet_allocate                                    */
5142 /*    tx_mutex_put                                                        */
5143 /*    _nxd_mqtt_packet_send                                               */
5144 /*    nx_packet_release                                                   */
5145 /*                                                                        */
5146 /*  CALLED BY                                                             */
5147 /*                                                                        */
5148 /*    _nxd_mqtt_client_ping                                               */
5149 /*    _nxd_mqtt_client_disconnect                                         */
5150 /*                                                                        */
5151 /*  RELEASE HISTORY                                                       */
5152 /*                                                                        */
5153 /*    DATE              NAME                      DESCRIPTION             */
5154 /*                                                                        */
5155 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5156 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5157 /*                                            resulting in version 6.1    */
5158 /*  07-29-2022     Spencer McDonough        Modified comment(s),          */
5159 /*                                            improved internal logic,    */
5160 /*                                            resulting in version 6.1.12 */
5161 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
5162 /*                                            the logic of sending packet,*/
5163 /*                                            resulting in version 6.2.0  */
5164 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
5165 /*                                            internal logic,             */
5166 /*                                            resulting in version 6.3.0  */
5167 /*                                                                        */
5168 /**************************************************************************/
_nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT * client_ptr,UCHAR header_value)5169 static UINT _nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT *client_ptr, UCHAR header_value)
5170 {
5171 
5172 NX_PACKET *packet_ptr;
5173 UINT       status;
5174 UINT       status_mutex;
5175 UCHAR     *byte;
5176 
5177     status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
5178     if (status)
5179     {
5180         return(NXD_MQTT_INTERNAL_ERROR);
5181     }
5182 
5183     if (2u > ((ULONG)(packet_ptr -> nx_packet_data_end) - (ULONG)(packet_ptr -> nx_packet_append_ptr)))
5184     {
5185         nx_packet_release(packet_ptr);
5186 
5187         /* Packet buffer is too small to hold the message. */
5188         return(NX_SIZE_ERROR);
5189     }
5190 
5191     byte = packet_ptr -> nx_packet_prepend_ptr;
5192 
5193     *byte = (UCHAR)(header_value << 4);
5194     *(byte + 1) = 0;
5195 
5196     packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + 2;
5197     packet_ptr -> nx_packet_length = 2;
5198 
5199 
5200     /* Release MQTT protection before making NetX/TLS calls. */
5201     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5202 
5203     /* Send packet to server.  */
5204     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
5205 
5206     status_mutex = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5207     if (status)
5208     {
5209 
5210         /* Release the packet. */
5211         nx_packet_release(packet_ptr);
5212 
5213         status = NXD_MQTT_COMMUNICATION_FAILURE;
5214     }
5215     if (status_mutex)
5216     {
5217         return(NXD_MQTT_MUTEX_FAILURE);
5218     }
5219 
5220     if (header_value == MQTT_CONTROL_PACKET_TYPE_PINGREQ)
5221     {
5222         /* Do not update the ping sent time if the outstanding ping has not been responded yet */
5223         if (client_ptr -> nxd_mqtt_ping_not_responded != 1)
5224         {
5225             /* Record the time we send out the PINGREG */
5226             client_ptr -> nxd_mqtt_ping_sent_time = tx_time_get();
5227             client_ptr -> nxd_mqtt_ping_not_responded = 1;
5228         }
5229     }
5230 
5231     /* Update the timeout value. */
5232     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
5233 
5234     return(status);
5235 }
5236 
5237 
5238 /**************************************************************************/
5239 /*                                                                        */
5240 /*  FUNCTION                                               RELEASE        */
5241 /*                                                                        */
5242 /*    _nxd_mqtt_client_disconnect                         PORTABLE C      */
5243 /*                                                           6.1          */
5244 /*  AUTHOR                                                                */
5245 /*                                                                        */
5246 /*    Yuxin Zhou, Microsoft Corporation                                   */
5247 /*                                                                        */
5248 /*  DESCRIPTION                                                           */
5249 /*                                                                        */
5250 /*    This function disconnects the MQTT client from a server.            */
5251 /*                                                                        */
5252 /*                                                                        */
5253 /*  INPUT                                                                 */
5254 /*                                                                        */
5255 /*    client_ptr                            Pointer to MQTT Client        */
5256 /*                                                                        */
5257 /*  OUTPUT                                                                */
5258 /*                                                                        */
5259 /*    status                                Completion status             */
5260 /*                                                                        */
5261 /*  CALLS                                                                 */
5262 /*                                                                        */
5263 /*    _nxd_mqtt_send_simple_message                                       */
5264 /*    _nxd_mqtt_process_disconnect                                        */
5265 /*                                                                        */
5266 /*  CALLED BY                                                             */
5267 /*                                                                        */
5268 /*    Application Code                                                    */
5269 /*                                                                        */
5270 /*  RELEASE HISTORY                                                       */
5271 /*                                                                        */
5272 /*    DATE              NAME                      DESCRIPTION             */
5273 /*                                                                        */
5274 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5275 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5276 /*                                            resulting in version 6.1    */
5277 /*                                                                        */
5278 /**************************************************************************/
_nxd_mqtt_client_disconnect(NXD_MQTT_CLIENT * client_ptr)5279 UINT _nxd_mqtt_client_disconnect(NXD_MQTT_CLIENT *client_ptr)
5280 {
5281 UINT status;
5282 
5283     /* Obtain the mutex. */
5284     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
5285     if (status != TX_SUCCESS)
5286     {
5287         /* Disable timer if timer has been started. */
5288         if (client_ptr -> nxd_mqtt_keepalive)
5289         {
5290             tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
5291         }
5292 
5293         return(NXD_MQTT_MUTEX_FAILURE);
5294     }
5295 
5296     /* Let the server know we are ending the MQTT session. */
5297     _nxd_mqtt_send_simple_message(client_ptr, MQTT_CONTROL_PACKET_TYPE_DISCONNECT);
5298 
5299     /* Call the disconnect routine to disconnect the socket,
5300        release transmit packets, release received packets,
5301        and delete the client timer. */
5302     _nxd_mqtt_process_disconnect(client_ptr);
5303 
5304     /* Release the mutex. */
5305     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5306 
5307     return(status);
5308 }
5309 
5310 /**************************************************************************/
5311 /*                                                                        */
5312 /*  FUNCTION                                               RELEASE        */
5313 /*                                                                        */
5314 /*    _nxd_mqtt_client_receive_notify_set                 PORTABLE C      */
5315 /*                                                           6.1          */
5316 /*  AUTHOR                                                                */
5317 /*                                                                        */
5318 /*    Yuxin Zhou, Microsoft Corporation                                   */
5319 /*                                                                        */
5320 /*  DESCRIPTION                                                           */
5321 /*                                                                        */
5322 /*    This function installs the MQTT client publish notify callback      */
5323 /*    function.                                                           */
5324 /*                                                                        */
5325 /*                                                                        */
5326 /*  INPUT                                                                 */
5327 /*                                                                        */
5328 /*    client_ptr                            Pointer to MQTT Client        */
5329 /*    mqtt_client_receive_notify            User-supplied callback        */
5330 /*                                            function, which is invoked  */
5331 /*                                            upon receiving a publish    */
5332 /*                                            message.                    */
5333 /*                                                                        */
5334 /*  OUTPUT                                                                */
5335 /*                                                                        */
5336 /*    status                                Completion status             */
5337 /*                                                                        */
5338 /*  CALLS                                                                 */
5339 /*                                                                        */
5340 /*    tx_mutex_get                                                        */
5341 /*    tx_mutex_put                                                        */
5342 /*                                                                        */
5343 /*  CALLED BY                                                             */
5344 /*                                                                        */
5345 /*    Application Code                                                    */
5346 /*                                                                        */
5347 /*  RELEASE HISTORY                                                       */
5348 /*                                                                        */
5349 /*    DATE              NAME                      DESCRIPTION             */
5350 /*                                                                        */
5351 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5352 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5353 /*                                            resulting in version 6.1    */
5354 /*                                                                        */
5355 /**************************************************************************/
_nxd_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* receive_notify)(NXD_MQTT_CLIENT * client_ptr,UINT message_count))5356 UINT _nxd_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT *client_ptr,
5357                                          VOID (*receive_notify)(NXD_MQTT_CLIENT *client_ptr, UINT message_count))
5358 {
5359 
5360     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5361 
5362     client_ptr -> nxd_mqtt_client_receive_notify = receive_notify;
5363 
5364     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5365 
5366     return(NXD_MQTT_SUCCESS);
5367 }
5368 
5369 /**************************************************************************/
5370 /*                                                                        */
5371 /*  FUNCTION                                               RELEASE        */
5372 /*                                                                        */
5373 /*    _nxd_mqtt_client_message_get                        PORTABLE C      */
5374 /*                                                           6.1          */
5375 /*  AUTHOR                                                                */
5376 /*                                                                        */
5377 /*    Yuxin Zhou, Microsoft Corporation                                   */
5378 /*                                                                        */
5379 /*  DESCRIPTION                                                           */
5380 /*                                                                        */
5381 /*    This function retrieves a published MQTT message.                   */
5382 /*                                                                        */
5383 /*                                                                        */
5384 /*  INPUT                                                                 */
5385 /*                                                                        */
5386 /*    client_ptr                            Pointer to MQTT Client        */
5387 /*    topic_buffer                          Pointer to the topic buffer   */
5388 /*                                            where topic is copied to    */
5389 /*    topic_buffer_size                     Size of the topic buffer.     */
5390 /*    actual_topic_length                   Number of bytes copied into   */
5391 /*                                            topic_buffer                */
5392 /*    message_buffer                        Pointer to the buffer where   */
5393 /*                                            message is copied to        */
5394 /*    message_buffer_size                   Size of the message_buffer    */
5395 /*    actual_message_length                 Number of bytes copied into   */
5396 /*                                            the message buffer.         */
5397 /*                                                                        */
5398 /*  OUTPUT                                                                */
5399 /*                                                                        */
5400 /*    status                                Completion status             */
5401 /*                                                                        */
5402 /*  CALLS                                                                 */
5403 /*                                                                        */
5404 /*    _nxd_mqtt_client_disconnect           Actual MQTT Client disconnect */
5405 /*                                            call                        */
5406 /*    _nxd_mqtt_read_remaining_length       Skip the remaining length     */
5407 /*                                            field                       */
5408 /*    tx_mutex_get                                                        */
5409 /*    tx_mutex_put                                                        */
5410 /*                                                                        */
5411 /*  CALLED BY                                                             */
5412 /*                                                                        */
5413 /*    Application Code                                                    */
5414 /*                                                                        */
5415 /*  RELEASE HISTORY                                                       */
5416 /*                                                                        */
5417 /*    DATE              NAME                      DESCRIPTION             */
5418 /*                                                                        */
5419 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5420 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
5421 /*                                            fixed uninitialized value,  */
5422 /*                                            resulting in version 6.1    */
5423 /*                                                                        */
5424 /**************************************************************************/
_nxd_mqtt_client_message_get(NXD_MQTT_CLIENT * client_ptr,UCHAR * topic_buffer,UINT topic_buffer_size,UINT * actual_topic_length,UCHAR * message_buffer,UINT message_buffer_size,UINT * actual_message_length)5425 UINT _nxd_mqtt_client_message_get(NXD_MQTT_CLIENT *client_ptr, UCHAR *topic_buffer, UINT topic_buffer_size, UINT *actual_topic_length,
5426                                   UCHAR *message_buffer, UINT message_buffer_size, UINT *actual_message_length)
5427 {
5428 
5429 UINT                status;
5430 NX_PACKET          *packet_ptr;
5431 ULONG               topic_offset;
5432 USHORT              topic_length;
5433 ULONG               message_offset;
5434 ULONG               message_length;
5435 
5436     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5437     while (client_ptr -> message_receive_queue_depth)
5438     {
5439         packet_ptr = client_ptr -> message_receive_queue_head;
5440         status = _nxd_mqtt_process_publish_packet(packet_ptr, &topic_offset, &topic_length, &message_offset, &message_length);
5441         if (status == NXD_MQTT_SUCCESS)
5442         {
5443             if ((topic_buffer_size < topic_length) ||
5444                 (message_buffer_size < message_length))
5445             {
5446                 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5447                 return(NXD_MQTT_INSUFFICIENT_BUFFER_SPACE);
5448             }
5449         }
5450 
5451         client_ptr -> message_receive_queue_head = packet_ptr -> nx_packet_queue_next;
5452         if (client_ptr -> message_receive_queue_tail == packet_ptr)
5453         {
5454             client_ptr -> message_receive_queue_tail = NX_NULL;
5455         }
5456         client_ptr -> message_receive_queue_depth--;
5457 
5458         if (status == NXD_MQTT_SUCCESS)
5459         {
5460 
5461             /* Set topic and message lengths to avoid uninitialized value. */
5462             *actual_topic_length = 0;
5463             *actual_message_length = 0;
5464             nx_packet_data_extract_offset(packet_ptr, topic_offset, topic_buffer,
5465                                           topic_length, (ULONG *)actual_topic_length);
5466             nx_packet_data_extract_offset(packet_ptr, message_offset, message_buffer,
5467                                           message_length, (ULONG *)actual_message_length);
5468             nx_packet_release(packet_ptr);
5469 
5470             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5471             return(NXD_MQTT_SUCCESS);
5472         }
5473         nx_packet_release(packet_ptr);
5474     }
5475     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5476     return(NXD_MQTT_NO_MESSAGE);
5477 }
5478 
5479 /**************************************************************************/
5480 /*                                                                        */
5481 /*  FUNCTION                                               RELEASE        */
5482 /*                                                                        */
5483 /*    _nxde_mqtt_client_create                            PORTABLE C      */
5484 /*                                                           6.1          */
5485 /*  AUTHOR                                                                */
5486 /*                                                                        */
5487 /*    Yuxin Zhou, Microsoft Corporation                                   */
5488 /*                                                                        */
5489 /*  DESCRIPTION                                                           */
5490 /*                                                                        */
5491 /*    This function checks for errors in nxd_mqt_client_create call.      */
5492 /*                                                                        */
5493 /*                                                                        */
5494 /*  INPUT                                                                 */
5495 /*                                                                        */
5496 /*    client_ptr                            Pointer to MQTT Client        */
5497 /*    client_name                           Name string used in by the    */
5498 /*                                            client                      */
5499 /*    client_id                             Client ID used by the client  */
5500 /*    client_id_length                      Length of Client ID, in bytes */
5501 /*    ip_ptr                                Pointer to IP instance        */
5502 /*    pool_ptr                              Pointer to packet pool        */
5503 /*    stack_ptr                             Client thread's stack pointer */
5504 /*    stack_size                            Client thread's stack size    */
5505 /*    mqtt_thread_priority                  Priority for MQTT thread      */
5506 /*    memory_ptr                            Deprecated and not used       */
5507 /*    memory_size                           Deprecated and not used       */
5508 /*                                                                        */
5509 /*  OUTPUT                                                                */
5510 /*                                                                        */
5511 /*    status                                Completion status             */
5512 /*                                                                        */
5513 /*  CALLS                                                                 */
5514 /*                                                                        */
5515 /*    _nxd_mqtt_client_create               Actual client create call     */
5516 /*                                                                        */
5517 /*  CALLED BY                                                             */
5518 /*                                                                        */
5519 /*    Application Code                                                    */
5520 /*                                                                        */
5521 /*  RELEASE HISTORY                                                       */
5522 /*                                                                        */
5523 /*    DATE              NAME                      DESCRIPTION             */
5524 /*                                                                        */
5525 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5526 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5527 /*                                            resulting in version 6.1    */
5528 /*                                                                        */
5529 /**************************************************************************/
_nxde_mqtt_client_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority,VOID * memory_ptr,ULONG memory_size)5530 UINT _nxde_mqtt_client_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name, CHAR *client_id, UINT client_id_length,
5531                               NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
5532                               VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority,
5533                               VOID *memory_ptr, ULONG memory_size)
5534 {
5535 
5536 
5537     /* Check for invalid input pointers.  */
5538     if ((client_ptr == NX_NULL) || (ip_ptr == NX_NULL) || (ip_ptr -> nx_ip_id != NX_IP_ID) ||
5539         (stack_ptr == NX_NULL) || (stack_size == 0) || (pool_ptr == NX_NULL))
5540     {
5541         return(NX_PTR_ERROR);
5542     }
5543 
5544     return(_nxd_mqtt_client_create(client_ptr, client_name, client_id, client_id_length, ip_ptr,
5545                                    pool_ptr, stack_ptr, stack_size, mqtt_thread_priority,
5546                                    memory_ptr, memory_size));
5547 }
5548 
5549 
5550 /**************************************************************************/
5551 /*                                                                        */
5552 /*  FUNCTION                                               RELEASE        */
5553 /*                                                                        */
5554 /*    _nxde_mqtt_client_connect                           PORTABLE C      */
5555 /*                                                           6.1          */
5556 /*  AUTHOR                                                                */
5557 /*                                                                        */
5558 /*    Yuxin Zhou, Microsoft Corporation                                   */
5559 /*                                                                        */
5560 /*  DESCRIPTION                                                           */
5561 /*                                                                        */
5562 /*    This function checks for errors in MQTT client stop call.           */
5563 /*                                                                        */
5564 /*                                                                        */
5565 /*  INPUT                                                                 */
5566 /*                                                                        */
5567 /*    client_ptr                            Pointer to MQTT Client        */
5568 /*    server_ip                             Server IP address structure   */
5569 /*    server_port                           Server port number, in host   */
5570 /*                                            byte order                  */
5571 /*    keepalive                             The MQTT keepalive timer      */
5572 /*    clean_session                         Clean session flag            */
5573 /*    wait_option                           Suspension option             */
5574 /*                                                                        */
5575 /*                                                                        */
5576 /*  OUTPUT                                                                */
5577 /*                                                                        */
5578 /*    status                                Completion status             */
5579 /*                                                                        */
5580 /*  CALLS                                                                 */
5581 /*                                                                        */
5582 /*    _nxd_mqtt_client_connect              Actual MQTT Client connect    */
5583 /*                                            call                        */
5584 /*                                                                        */
5585 /*  CALLED BY                                                             */
5586 /*                                                                        */
5587 /*    Application Code                                                    */
5588 /*                                                                        */
5589 /*  RELEASE HISTORY                                                       */
5590 /*                                                                        */
5591 /*    DATE              NAME                      DESCRIPTION             */
5592 /*                                                                        */
5593 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5594 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
5595 /*                                            corrected mqtt client state,*/
5596 /*                                            resulting in version 6.1    */
5597 /*                                                                        */
5598 /**************************************************************************/
_nxde_mqtt_client_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT keepalive,UINT clean_session,ULONG wait_option)5599 UINT _nxde_mqtt_client_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
5600                                UINT keepalive, UINT clean_session, ULONG wait_option)
5601 {
5602 
5603 UINT status;
5604 
5605     if (client_ptr == NX_NULL)
5606     {
5607         return(NX_PTR_ERROR);
5608     }
5609 
5610     if (server_ip == NX_NULL)
5611     {
5612         return(NX_PTR_ERROR);
5613     }
5614 
5615     /* Test for IP version flag. */
5616     if ((server_ip -> nxd_ip_version != 4) && (server_ip -> nxd_ip_version != 6))
5617     {
5618         return(NXD_MQTT_INVALID_PARAMETER);
5619     }
5620 
5621     if (server_port == 0)
5622     {
5623         return(NX_INVALID_PORT);
5624     }
5625 
5626     status = _nxd_mqtt_client_connect(client_ptr, server_ip, server_port, keepalive, clean_session, wait_option);
5627 
5628     return(status);
5629 }
5630 
5631 /**************************************************************************/
5632 /*                                                                        */
5633 /*  FUNCTION                                               RELEASE        */
5634 /*                                                                        */
5635 /*    _nxde_mqtt_client_secure_connect                    PORTABLE C      */
5636 /*                                                           6.1          */
5637 /*  AUTHOR                                                                */
5638 /*                                                                        */
5639 /*    Yuxin Zhou, Microsoft Corporation                                   */
5640 /*                                                                        */
5641 /*  DESCRIPTION                                                           */
5642 /*                                                                        */
5643 /*    This function checks for errors in MQTT client TLS secure connect.  */
5644 /*                                                                        */
5645 /*                                                                        */
5646 /*  INPUT                                                                 */
5647 /*                                                                        */
5648 /*    client_ptr                            Pointer to MQTT Client        */
5649 /*    server_ip                             Server IP address structure   */
5650 /*    server_port                           Server port number, in host   */
5651 /*                                            byte order                  */
5652 /*    tls_setup                             User-supplied callback        */
5653 /*                                            function to set up TLS      */
5654 /*                                            parameters.                 */
5655 /*    keepalive                             The MQTT keepalive timer      */
5656 /*    wait_option                           Suspension option             */
5657 /*                                                                        */
5658 /*                                                                        */
5659 /*  OUTPUT                                                                */
5660 /*                                                                        */
5661 /*    status                                Completion status             */
5662 /*                                                                        */
5663 /*  CALLS                                                                 */
5664 /*                                                                        */
5665 /*    _nxd_mqtt_client_connect              Actual MQTT Client connect    */
5666 /*                                            call                        */
5667 /*                                                                        */
5668 /*  CALLED BY                                                             */
5669 /*                                                                        */
5670 /*    Application Code                                                    */
5671 /*                                                                        */
5672 /*  RELEASE HISTORY                                                       */
5673 /*                                                                        */
5674 /*    DATE              NAME                      DESCRIPTION             */
5675 /*                                                                        */
5676 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5677 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
5678 /*                                            corrected mqtt client state,*/
5679 /*                                            resulting in version 6.1    */
5680 /*                                                                        */
5681 /**************************************************************************/
5682 #ifdef NX_SECURE_ENABLE
5683 
_nxde_mqtt_client_secure_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT (* tls_setup)(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION *,NX_SECURE_X509_CERT *,NX_SECURE_X509_CERT *),UINT keepalive,UINT clean_session,ULONG wait_option)5684 UINT _nxde_mqtt_client_secure_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
5685                                       UINT (*tls_setup)(NXD_MQTT_CLIENT *client_ptr, NX_SECURE_TLS_SESSION *,
5686                                                         NX_SECURE_X509_CERT *, NX_SECURE_X509_CERT *),
5687                                       UINT keepalive, UINT clean_session, ULONG wait_option)
5688 {
5689 
5690 UINT status;
5691 
5692     if (client_ptr == NX_NULL)
5693     {
5694         return(NX_PTR_ERROR);
5695     }
5696 
5697     if (server_ip == NX_NULL)
5698     {
5699         return(NX_PTR_ERROR);
5700     }
5701 
5702     if (server_port == 0)
5703     {
5704         return(NX_INVALID_PORT);
5705     }
5706 
5707     status = _nxd_mqtt_client_secure_connect(client_ptr, server_ip, server_port, tls_setup,
5708                                              keepalive, clean_session, wait_option);
5709 
5710     return(status);
5711 }
5712 #endif /* NX_SECURE_ENABLE */
5713 
5714 
5715 /**************************************************************************/
5716 /*                                                                        */
5717 /*  FUNCTION                                               RELEASE        */
5718 /*                                                                        */
5719 /*    _nxde_mqtt_client_delete                            PORTABLE C      */
5720 /*                                                           6.1          */
5721 /*  AUTHOR                                                                */
5722 /*                                                                        */
5723 /*    Yuxin Zhou, Microsoft Corporation                                   */
5724 /*                                                                        */
5725 /*  DESCRIPTION                                                           */
5726 /*                                                                        */
5727 /*    This function checks for errors in MQTT client delete call.         */
5728 /*                                                                        */
5729 /*                                                                        */
5730 /*  INPUT                                                                 */
5731 /*                                                                        */
5732 /*    client_ptr                            Pointer to MQTT Client        */
5733 /*                                                                        */
5734 /*  OUTPUT                                                                */
5735 /*                                                                        */
5736 /*    status                                Completion status             */
5737 /*                                                                        */
5738 /*  CALLS                                                                 */
5739 /*                                                                        */
5740 /*    _nxd_mqtt_client_delete               Actual MQTT Client delete     */
5741 /*                                            call.                       */
5742 /*                                                                        */
5743 /*  CALLED BY                                                             */
5744 /*                                                                        */
5745 /*    Application Code                                                    */
5746 /*                                                                        */
5747 /*  RELEASE HISTORY                                                       */
5748 /*                                                                        */
5749 /*    DATE              NAME                      DESCRIPTION             */
5750 /*                                                                        */
5751 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5752 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5753 /*                                            resulting in version 6.1    */
5754 /*                                                                        */
5755 /**************************************************************************/
_nxde_mqtt_client_delete(NXD_MQTT_CLIENT * client_ptr)5756 UINT _nxde_mqtt_client_delete(NXD_MQTT_CLIENT *client_ptr)
5757 {
5758 
5759 UINT status;
5760 
5761     if (client_ptr == NX_NULL)
5762     {
5763         return(NX_PTR_ERROR);
5764     }
5765 
5766     status = _nxd_mqtt_client_delete(client_ptr);
5767 
5768     return(status);
5769 }
5770 
5771 
5772 
5773 /**************************************************************************/
5774 /*                                                                        */
5775 /*  FUNCTION                                               RELEASE        */
5776 /*                                                                        */
5777 /*    _nxde_mqtt_client_publish                           PORTABLE C      */
5778 /*                                                           6.1          */
5779 /*  AUTHOR                                                                */
5780 /*                                                                        */
5781 /*    Yuxin Zhou, Microsoft Corporation                                   */
5782 /*                                                                        */
5783 /*  DESCRIPTION                                                           */
5784 /*                                                                        */
5785 /*    This function performs error checking to the publish service.       */
5786 /*                                                                        */
5787 /*                                                                        */
5788 /*  INPUT                                                                 */
5789 /*                                                                        */
5790 /*    client_ptr                            Pointer to MQTT Client        */
5791 /*    topic_name                            Name of the topic             */
5792 /*    topic_name_length                     Length of the topic name      */
5793 /*    message                               Message string                */
5794 /*    message_length                        Length of the message,        */
5795 /*                                            in bytes                    */
5796 /*    retain                                The retain flag, whether      */
5797 /*                                            or not the broker should    */
5798 /*                                            store this message          */
5799 /*    QoS                                   Expected QoS level            */
5800 /*    wait_option                           Suspension option             */
5801 /*                                                                        */
5802 /*  OUTPUT                                                                */
5803 /*                                                                        */
5804 /*    status                                Completion status             */
5805 /*                                                                        */
5806 /*  CALLS                                                                 */
5807 /*                                                                        */
5808 /*                                                                        */
5809 /*  CALLED BY                                                             */
5810 /*                                                                        */
5811 /*    Application Code                                                    */
5812 /*                                                                        */
5813 /*  RELEASE HISTORY                                                       */
5814 /*                                                                        */
5815 /*    DATE              NAME                      DESCRIPTION             */
5816 /*                                                                        */
5817 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5818 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5819 /*                                            resulting in version 6.1    */
5820 /*                                                                        */
5821 /**************************************************************************/
_nxde_mqtt_client_publish(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,CHAR * message,UINT message_length,UINT retain,UINT QoS,ULONG wait_option)5822 UINT _nxde_mqtt_client_publish(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length,
5823                                CHAR *message, UINT message_length, UINT retain, UINT QoS, ULONG wait_option)
5824 {
5825     /* Validate client_ptr */
5826     if (client_ptr == NX_NULL)
5827     {
5828         return(NX_PTR_ERROR);
5829     }
5830 
5831     /* Validate topic_name */
5832     if ((topic_name == NX_NULL) || (topic_name_length == 0))
5833     {
5834         return(NXD_MQTT_INVALID_PARAMETER);
5835     }
5836 
5837     /* Validate message length. */
5838     if (message && (message_length == 0))
5839     {
5840         return(NXD_MQTT_INVALID_PARAMETER);
5841     }
5842 
5843     /* Validate QoS value. */
5844     if (QoS > 3)
5845     {
5846         return(NXD_MQTT_INVALID_PARAMETER);
5847     }
5848 
5849     return(_nxd_mqtt_client_publish(client_ptr, topic_name, topic_name_length, message, message_length, retain, QoS, wait_option));
5850 }
5851 
5852 
5853 /**************************************************************************/
5854 /*                                                                        */
5855 /*  FUNCTION                                               RELEASE        */
5856 /*                                                                        */
5857 /*    _nxde_mqtt_client_subscribe                         PORTABLE C      */
5858 /*                                                           6.1          */
5859 /*  AUTHOR                                                                */
5860 /*                                                                        */
5861 /*    Yuxin Zhou, Microsoft Corporation                                   */
5862 /*                                                                        */
5863 /*  DESCRIPTION                                                           */
5864 /*                                                                        */
5865 /*    This function performs error checking to the subscribe service.     */
5866 /*                                                                        */
5867 /*                                                                        */
5868 /*  INPUT                                                                 */
5869 /*                                                                        */
5870 /*    client_ptr                            Pointer to MQTT Client        */
5871 /*    topic_name                            Pointer to the topic string   */
5872 /*                                            to subscribe to             */
5873 /*    topic_name_length                     Length of the topic string    */
5874 /*                                            in bytes                    */
5875 /*                                                                        */
5876 /*  OUTPUT                                                                */
5877 /*                                                                        */
5878 /*    client_ptr                            Pointer to MQTT Client        */
5879 /*    topic_name                            Pointer to the topic string   */
5880 /*                                            to subscribe to             */
5881 /*    topic_name_length                     Length of the topic string    */
5882 /*                                            in bytes                    */
5883 /*    QoS                                   Expected QoS level            */
5884 /*                                                                        */
5885 /*  CALLS                                                                 */
5886 /*                                                                        */
5887 /*    _nxd_mqtt_client_subscribe                                          */
5888 /*                                                                        */
5889 /*  CALLED BY                                                             */
5890 /*                                                                        */
5891 /*    Application Code                                                    */
5892 /*                                                                        */
5893 /*  RELEASE HISTORY                                                       */
5894 /*                                                                        */
5895 /*    DATE              NAME                      DESCRIPTION             */
5896 /*                                                                        */
5897 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5898 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5899 /*                                            resulting in version 6.1    */
5900 /*                                                                        */
5901 /**************************************************************************/
_nxde_mqtt_client_subscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,UINT QoS)5902 UINT _nxde_mqtt_client_subscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length, UINT QoS)
5903 {
5904 
5905 
5906     /* Validate client_ptr */
5907     if (client_ptr == NX_NULL)
5908     {
5909         return(NX_PTR_ERROR);
5910     }
5911 
5912     /* Validate topic_name */
5913     if ((topic_name == NX_NULL) || (topic_name_length == 0))
5914     {
5915         return(NXD_MQTT_INVALID_PARAMETER);
5916     }
5917 
5918     /* Validate QoS value. */
5919     if (QoS > 2)
5920     {
5921         return(NXD_MQTT_INVALID_PARAMETER);
5922     }
5923 
5924     return(_nxd_mqtt_client_subscribe(client_ptr, topic_name, topic_name_length, QoS));
5925 }
5926 
5927 
5928 
5929 
5930 /**************************************************************************/
5931 /*                                                                        */
5932 /*  FUNCTION                                               RELEASE        */
5933 /*                                                                        */
5934 /*    _nxde_mqtt_client_unsubscribe                       PORTABLE C      */
5935 /*                                                           6.1          */
5936 /*  AUTHOR                                                                */
5937 /*                                                                        */
5938 /*    Yuxin Zhou, Microsoft Corporation                                   */
5939 /*                                                                        */
5940 /*  DESCRIPTION                                                           */
5941 /*                                                                        */
5942 /*    This function performs error checking to the unsubscribe service.   */
5943 /*                                                                        */
5944 /*                                                                        */
5945 /*  INPUT                                                                 */
5946 /*                                                                        */
5947 /*    client_ptr                            Pointer to MQTT Client        */
5948 /*    topic_name                            Pointer to the topic string   */
5949 /*                                            to unsubscribe              */
5950 /*    topic_name_length                     Length of the topic string    */
5951 /*                                            in bytes                    */
5952 /*                                                                        */
5953 /*  OUTPUT                                                                */
5954 /*                                                                        */
5955 /*    status                                Completion status             */
5956 /*                                                                        */
5957 /*  CALLS                                                                 */
5958 /*                                                                        */
5959 /*                                                                        */
5960 /*  CALLED BY                                                             */
5961 /*                                                                        */
5962 /*    Application Code                                                    */
5963 /*                                                                        */
5964 /*  RELEASE HISTORY                                                       */
5965 /*                                                                        */
5966 /*    DATE              NAME                      DESCRIPTION             */
5967 /*                                                                        */
5968 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5969 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5970 /*                                            resulting in version 6.1    */
5971 /*                                                                        */
5972 /**************************************************************************/
_nxde_mqtt_client_unsubscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length)5973 UINT _nxde_mqtt_client_unsubscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length)
5974 {
5975     /* Validate client_ptr */
5976     if (client_ptr == NX_NULL)
5977     {
5978         return(NX_PTR_ERROR);
5979     }
5980 
5981     /* Validate topic_name */
5982     if ((topic_name == NX_NULL) || (topic_name_length == 0))
5983     {
5984         return(NXD_MQTT_INVALID_PARAMETER);
5985     }
5986 
5987     return(_nxd_mqtt_client_unsubscribe(client_ptr, topic_name, topic_name_length));
5988 }
5989 
5990 
5991 /**************************************************************************/
5992 /*                                                                        */
5993 /*  FUNCTION                                               RELEASE        */
5994 /*                                                                        */
5995 /*    _nxde_mqtt_client_disconnect                        PORTABLE C      */
5996 /*                                                           6.1          */
5997 /*  AUTHOR                                                                */
5998 /*                                                                        */
5999 /*    Yuxin Zhou, Microsoft Corporation                                   */
6000 /*                                                                        */
6001 /*  DESCRIPTION                                                           */
6002 /*                                                                        */
6003 /*    This function checks for errors in MQTT client disconnect call.     */
6004 /*                                                                        */
6005 /*                                                                        */
6006 /*  INPUT                                                                 */
6007 /*                                                                        */
6008 /*    client_ptr                            Pointer to MQTT Client        */
6009 /*                                                                        */
6010 /*  OUTPUT                                                                */
6011 /*                                                                        */
6012 /*    status                                Completion status             */
6013 /*                                                                        */
6014 /*  CALLS                                                                 */
6015 /*                                                                        */
6016 /*    _nxd_mqtt_client_disconnect           Actual MQTT Client disconnect */
6017 /*                                            call                        */
6018 /*                                                                        */
6019 /*  CALLED BY                                                             */
6020 /*                                                                        */
6021 /*    Application Code                                                    */
6022 /*                                                                        */
6023 /*  RELEASE HISTORY                                                       */
6024 /*                                                                        */
6025 /*    DATE              NAME                      DESCRIPTION             */
6026 /*                                                                        */
6027 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6028 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6029 /*                                            resulting in version 6.1    */
6030 /*                                                                        */
6031 /**************************************************************************/
_nxde_mqtt_client_disconnect(NXD_MQTT_CLIENT * client_ptr)6032 UINT _nxde_mqtt_client_disconnect(NXD_MQTT_CLIENT *client_ptr)
6033 {
6034     /* Validate client_ptr */
6035     if (client_ptr == NX_NULL)
6036     {
6037         return(NX_PTR_ERROR);
6038     }
6039 
6040     return(_nxd_mqtt_client_disconnect(client_ptr));
6041 }
6042 
6043 
6044 /**************************************************************************/
6045 /*                                                                        */
6046 /*  FUNCTION                                               RELEASE        */
6047 /*                                                                        */
6048 /*    _nxde_mqtt_client_message_get                       PORTABLE C      */
6049 /*                                                           6.1          */
6050 /*  AUTHOR                                                                */
6051 /*                                                                        */
6052 /*    Yuxin Zhou, Microsoft Corporation                                   */
6053 /*                                                                        */
6054 /*  DESCRIPTION                                                           */
6055 /*                                                                        */
6056 /*    This function checks for errors in MQTT client message get call.    */
6057 /*                                                                        */
6058 /*                                                                        */
6059 /*  INPUT                                                                 */
6060 /*                                                                        */
6061 /*    client_ptr                            Pointer to MQTT Client        */
6062 /*    topic_buffer                          Pointer to the topic buffer   */
6063 /*                                            where topic is copied to    */
6064 /*    topic_buffer_size                     Size of the topic buffer.     */
6065 /*    actual_topic_length                   Number of bytes copied into   */
6066 /*                                            topic_buffer                */
6067 /*    message_buffer                        Pointer to the buffer where   */
6068 /*                                            message is copied to        */
6069 /*    message_buffer_size                   Size of the message_buffer    */
6070 /*    actual_message_length                 Number of bytes copied into   */
6071 /*                                            the message buffer.         */
6072 /*                                                                        */
6073 /*  OUTPUT                                                                */
6074 /*                                                                        */
6075 /*    status                                Completion status             */
6076 /*                                                                        */
6077 /*  CALLS                                                                 */
6078 /*                                                                        */
6079 /*    _nxd_mqtt_client_message_get                                        */
6080 /*                                                                        */
6081 /*                                                                        */
6082 /*  CALLED BY                                                             */
6083 /*                                                                        */
6084 /*    Application Code                                                    */
6085 /*                                                                        */
6086 /*  RELEASE HISTORY                                                       */
6087 /*                                                                        */
6088 /*    DATE              NAME                      DESCRIPTION             */
6089 /*                                                                        */
6090 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6091 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6092 /*                                            resulting in version 6.1    */
6093 /*                                                                        */
6094 /**************************************************************************/
_nxde_mqtt_client_message_get(NXD_MQTT_CLIENT * client_ptr,UCHAR * topic_buffer,UINT topic_buffer_size,UINT * actual_topic_length,UCHAR * message_buffer,UINT message_buffer_size,UINT * actual_message_length)6095 UINT _nxde_mqtt_client_message_get(NXD_MQTT_CLIENT *client_ptr, UCHAR *topic_buffer, UINT topic_buffer_size, UINT *actual_topic_length,
6096                                    UCHAR *message_buffer, UINT message_buffer_size, UINT *actual_message_length)
6097 {
6098 
6099     /* Validate client_ptr */
6100     if (client_ptr == NX_NULL)
6101     {
6102         return(NX_PTR_ERROR);
6103     }
6104 
6105     /* Topic and topic_length can be NULL if caller does not care the topic string. */
6106 
6107     /* Validate message.  Message_length can be NULL if caller does not care message length. */
6108     if ((message_buffer == NX_NULL) || (topic_buffer == NX_NULL))
6109     {
6110         return(NXD_MQTT_INVALID_PARAMETER);
6111     }
6112 
6113 
6114     return(_nxd_mqtt_client_message_get(client_ptr, topic_buffer, topic_buffer_size, actual_topic_length,
6115                                         message_buffer, message_buffer_size, actual_message_length));
6116 }
6117 
6118 
6119 /**************************************************************************/
6120 /*                                                                        */
6121 /*  FUNCTION                                               RELEASE        */
6122 /*                                                                        */
6123 /*    _nxde_mqtt_client_receive_notify_set                PORTABLE C      */
6124 /*                                                           6.1          */
6125 /*  AUTHOR                                                                */
6126 /*                                                                        */
6127 /*    Yuxin Zhou, Microsoft Corporation                                   */
6128 /*                                                                        */
6129 /*  DESCRIPTION                                                           */
6130 /*                                                                        */
6131 /*    This function checks for errors in MQTT client publish notify call. */
6132 /*                                                                        */
6133 /*                                                                        */
6134 /*  INPUT                                                                 */
6135 /*                                                                        */
6136 /*    client_ptr                            Pointer to MQTT Client        */
6137 /*    receive_notify                        User-supplied callback        */
6138 /*                                            function, which is invoked  */
6139 /*                                            upon receiving a publish    */
6140 /*                                            message.                    */
6141 /*                                                                        */
6142 /*  OUTPUT                                                                */
6143 /*                                                                        */
6144 /*    status                                Completion status             */
6145 /*                                                                        */
6146 /*  CALLS                                                                 */
6147 /*                                                                        */
6148 /*    _nxd_mqtt_client_receive_notify_set                                 */
6149 /*                                                                        */
6150 /*                                                                        */
6151 /*  CALLED BY                                                             */
6152 /*                                                                        */
6153 /*    Application Code                                                    */
6154 /*                                                                        */
6155 /*  RELEASE HISTORY                                                       */
6156 /*                                                                        */
6157 /*    DATE              NAME                      DESCRIPTION             */
6158 /*                                                                        */
6159 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6160 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6161 /*                                            resulting in version 6.1    */
6162 /*                                                                        */
6163 /**************************************************************************/
_nxde_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* receive_notify)(NXD_MQTT_CLIENT * client_ptr,UINT message_count))6164 UINT _nxde_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT *client_ptr,
6165                                           VOID (*receive_notify)(NXD_MQTT_CLIENT *client_ptr, UINT message_count))
6166 {
6167     /* Validate client_ptr */
6168     if (client_ptr == NX_NULL)
6169     {
6170         return(NX_PTR_ERROR);
6171     }
6172 
6173     if (receive_notify == NX_NULL)
6174     {
6175         return(NX_PTR_ERROR);
6176     }
6177 
6178     return(_nxd_mqtt_client_receive_notify_set(client_ptr, receive_notify));
6179 }
6180 
6181 
6182 
6183 /**************************************************************************/
6184 /*                                                                        */
6185 /*  FUNCTION                                               RELEASE        */
6186 /*                                                                        */
6187 /*    _nxd_mqtt_client_disconnect_notify_set              PORTABLE C      */
6188 /*                                                           6.1          */
6189 /*  AUTHOR                                                                */
6190 /*                                                                        */
6191 /*    Yuxin Zhou, Microsoft Corporation                                   */
6192 /*                                                                        */
6193 /*  DESCRIPTION                                                           */
6194 /*                                                                        */
6195 /*    This function sets the notify function for the disconnect event.    */
6196 /*                                                                        */
6197 /*  INPUT                                                                 */
6198 /*                                                                        */
6199 /*    client_ptr                            Pointer to MQTT Client        */
6200 /*    disconnect_notify                     The notify function to be     */
6201 /*                                            used when the client is     */
6202 /*                                            disconnected from the       */
6203 /*                                            server.                     */
6204 /*                                                                        */
6205 /*  OUTPUT                                                                */
6206 /*                                                                        */
6207 /*    status                                Completion status             */
6208 /*                                                                        */
6209 /*  CALLS                                                                 */
6210 /*                                                                        */
6211 /*    None                                                                */
6212 /*                                                                        */
6213 /*  CALLED BY                                                             */
6214 /*                                                                        */
6215 /*    Application Code                                                    */
6216 /*                                                                        */
6217 /*  RELEASE HISTORY                                                       */
6218 /*                                                                        */
6219 /*    DATE              NAME                      DESCRIPTION             */
6220 /*                                                                        */
6221 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6222 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6223 /*                                            resulting in version 6.1    */
6224 /*                                                                        */
6225 /**************************************************************************/
_nxd_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* disconnect_notify)(NXD_MQTT_CLIENT *))6226 UINT _nxd_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT *client_ptr, VOID (*disconnect_notify)(NXD_MQTT_CLIENT *))
6227 {
6228 
6229     client_ptr -> nxd_mqtt_disconnect_notify = disconnect_notify;
6230 
6231     return(NXD_MQTT_SUCCESS);
6232 }
6233 
6234 
6235 /**************************************************************************/
6236 /*                                                                        */
6237 /*  FUNCTION                                               RELEASE        */
6238 /*                                                                        */
6239 /*    _nxde_mqtt_client_disconnect_notify_set             PORTABLE C      */
6240 /*                                                           6.1          */
6241 /*  AUTHOR                                                                */
6242 /*                                                                        */
6243 /*    Yuxin Zhou, Microsoft Corporation                                   */
6244 /*                                                                        */
6245 /*  DESCRIPTION                                                           */
6246 /*                                                                        */
6247 /*    This function checks for errors in setting MQTT client disconnect   */
6248 /*    callback function.                                                  */
6249 /*                                                                        */
6250 /*  INPUT                                                                 */
6251 /*                                                                        */
6252 /*    client_ptr                            Pointer to MQTT Client        */
6253 /*    disconnect_callback                   The callback function to be   */
6254 /*                                            used when an on-going       */
6255 /*                                            connection is disconnected. */
6256 /*                                                                        */
6257 /*  OUTPUT                                                                */
6258 /*                                                                        */
6259 /*    status                                Completion status             */
6260 /*                                                                        */
6261 /*  CALLS                                                                 */
6262 /*                                                                        */
6263 /*    _nxd_mqtt_client_disconnect_notify_set                              */
6264 /*                                                                        */
6265 /*  CALLED BY                                                             */
6266 /*                                                                        */
6267 /*    Application Code                                                    */
6268 /*                                                                        */
6269 /*  RELEASE HISTORY                                                       */
6270 /*                                                                        */
6271 /*    DATE              NAME                      DESCRIPTION             */
6272 /*                                                                        */
6273 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6274 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6275 /*                                            resulting in version 6.1    */
6276 /*                                                                        */
6277 /**************************************************************************/
_nxde_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* disconnect_notify)(NXD_MQTT_CLIENT *))6278 UINT _nxde_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT *client_ptr, VOID (*disconnect_notify)(NXD_MQTT_CLIENT *))
6279 {
6280 
6281     /* Validate client_ptr */
6282     if (client_ptr == NX_NULL)
6283     {
6284         return(NX_PTR_ERROR);
6285     }
6286     return(_nxd_mqtt_client_disconnect_notify_set(client_ptr, disconnect_notify));
6287 }
6288 
6289 
6290 #ifdef NXD_MQTT_CLOUD_ENABLE
6291 /**************************************************************************/
6292 /*                                                                        */
6293 /*  FUNCTION                                               RELEASE        */
6294 /*                                                                        */
6295 /*    _nxd_mqtt_client_cloud_create                       PORTABLE C      */
6296 /*                                                           6.1          */
6297 /*  AUTHOR                                                                */
6298 /*                                                                        */
6299 /*    Yuxin Zhou, Microsoft Corporation                                   */
6300 /*                                                                        */
6301 /*  DESCRIPTION                                                           */
6302 /*                                                                        */
6303 /*    This function creates mqtt client running on cloud helper thread.   */
6304 /*                                                                        */
6305 /*                                                                        */
6306 /*  INPUT                                                                 */
6307 /*                                                                        */
6308 /*    client_ptr                            Pointer to MQTT Client        */
6309 /*    client_name                           Name string used in by the    */
6310 /*                                            client                      */
6311 /*    client_id                             Client ID used by the client  */
6312 /*    client_id_length                      Length of Client ID, in bytes */
6313 /*    ip_ptr                                Pointer to IP instance        */
6314 /*    pool_ptr                              Pointer to packet pool        */
6315 /*    cloud_ptr                             Pointer to Cloud instance     */
6316 /*                                                                        */
6317 /*  OUTPUT                                                                */
6318 /*                                                                        */
6319 /*    status                                Completion status             */
6320 /*                                                                        */
6321 /*  CALLS                                                                 */
6322 /*                                                                        */
6323 /*    _nxd_mqtt_client_create               Actual client create call     */
6324 /*                                                                        */
6325 /*  CALLED BY                                                             */
6326 /*                                                                        */
6327 /*    Application Code                                                    */
6328 /*                                                                        */
6329 /*  RELEASE HISTORY                                                       */
6330 /*                                                                        */
6331 /*    DATE              NAME                      DESCRIPTION             */
6332 /*                                                                        */
6333 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6334 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
6335 /*                                            corrected mqtt client state,*/
6336 /*                                            resulting in version 6.1    */
6337 /*                                                                        */
6338 /**************************************************************************/
_nxd_mqtt_client_cloud_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,NX_CLOUD * cloud_ptr)6339 UINT _nxd_mqtt_client_cloud_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name, CHAR *client_id, UINT client_id_length,
6340                                    NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr, NX_CLOUD *cloud_ptr)
6341 {
6342 
6343 UINT    status;
6344 
6345 
6346     /* Check for invalid input pointers.  */
6347     if ((client_ptr == NX_NULL) || (ip_ptr == NX_NULL) || (ip_ptr -> nx_ip_id != NX_IP_ID) ||
6348         (pool_ptr == NX_NULL) || (cloud_ptr == NX_NULL) || (cloud_ptr -> nx_cloud_id != NX_CLOUD_ID))
6349     {
6350         return(NX_PTR_ERROR);
6351     }
6352 
6353     /* Create MQTT client.  */
6354     status = _nxd_mqtt_client_create_internal(client_ptr, client_name, client_id, client_id_length, ip_ptr,
6355                                               pool_ptr, NX_NULL, 0, 0);
6356 
6357     /* Check status.  */
6358     if (status)
6359     {
6360         return(status);
6361     }
6362 
6363     /* Save the cloud pointer.  */
6364     client_ptr -> nxd_mqtt_client_cloud_ptr = cloud_ptr;
6365 
6366     /* Save the mutex pointer.  */
6367     client_ptr -> nxd_mqtt_client_mutex_ptr = &(cloud_ptr -> nx_cloud_mutex);
6368 
6369     /* Register MQTT on cloud helper.  */
6370     status = nx_cloud_module_register(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module), client_name, NX_CLOUD_MODULE_MQTT_EVENT,
6371                                       _nxd_mqtt_client_event_process, client_ptr);
6372 
6373     /* Determine if an error occurred.  */
6374     if (status != NX_SUCCESS)
6375     {
6376 
6377         /* Delete internal resource created in _nxd_mqtt_client_create_internal().  */
6378 
6379         /* Delete socket.  */
6380         nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
6381 
6382         return(NXD_MQTT_INTERNAL_ERROR);
6383     }
6384 
6385     /* Update state.  */
6386     client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
6387 
6388     return(NXD_MQTT_SUCCESS);
6389 }
6390 #endif /* NXD_MQTT_CLOUD_ENABLE */
6391 
6392 #ifdef NXD_MQTT_OVER_WEBSOCKET
6393 /**************************************************************************/
6394 /*                                                                        */
6395 /*  FUNCTION                                               RELEASE        */
6396 /*                                                                        */
6397 /*    _nxd_mqtt_client_websocket_connection_status_callback               */
6398 /*                                                        PORTABLE C      */
6399 /*                                                           6.2.0        */
6400 /*  AUTHOR                                                                */
6401 /*                                                                        */
6402 /*    Yuxin Zhou, Microsoft Corporation                                   */
6403 /*                                                                        */
6404 /*  DESCRIPTION                                                           */
6405 /*                                                                        */
6406 /*    This function is the websocket connection status callback.          */
6407 /*                                                                        */
6408 /*  INPUT                                                                 */
6409 /*                                                                        */
6410 /*    websocket_client_ptr                  Pointer to websocket client   */
6411 /*    context                               Pointer to MQTT client        */
6412 /*    status                                Websocket connection status   */
6413 /*                                                                        */
6414 /*  OUTPUT                                                                */
6415 /*                                                                        */
6416 /*    None                                                                */
6417 /*                                                                        */
6418 /*  CALLS                                                                 */
6419 /*                                                                        */
6420 /*    _nxd_mqtt_client_connect_packet_send                                */
6421 /*    _nxd_mqtt_client_connection_end                                     */
6422 /*                                                                        */
6423 /*  CALLED BY                                                             */
6424 /*                                                                        */
6425 /*    _nxd_mqtt_client_event_process                                      */
6426 /*                                                                        */
6427 /*  RELEASE HISTORY                                                       */
6428 /*                                                                        */
6429 /*    DATE              NAME                      DESCRIPTION             */
6430 /*                                                                        */
6431 /*  10-31-2022     Yuxin Zhou               Initial Version 6.2.0         */
6432 /*                                                                        */
6433 /**************************************************************************/
_nxd_mqtt_client_websocket_connection_status_callback(NX_WEBSOCKET_CLIENT * websocket_client_ptr,VOID * context,UINT status)6434 VOID _nxd_mqtt_client_websocket_connection_status_callback(NX_WEBSOCKET_CLIENT *websocket_client_ptr, VOID *context, UINT status)
6435 {
6436 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)context;
6437 
6438 
6439     NX_PARAMETER_NOT_USED(websocket_client_ptr);
6440 
6441     if (status == NX_SUCCESS)
6442     {
6443 
6444         /* Start to send MQTT connect packet.  */
6445         status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
6446     }
6447 
6448     /* If an error occurs.  */
6449     if (status)
6450     {
6451 
6452         /* End connection. */
6453         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
6454 
6455         /* Check callback function.  */
6456         if (client_ptr -> nxd_mqtt_connect_notify)
6457         {
6458             client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
6459         }
6460     }
6461 }
6462 
6463 /**************************************************************************/
6464 /*                                                                        */
6465 /*  FUNCTION                                               RELEASE        */
6466 /*                                                                        */
6467 /*    _nxd_mqtt_client_websocket_set                      PORTABLE C      */
6468 /*                                                           6.2.0        */
6469 /*  AUTHOR                                                                */
6470 /*                                                                        */
6471 /*    Yuxin Zhou, Microsoft Corporation                                   */
6472 /*                                                                        */
6473 /*  DESCRIPTION                                                           */
6474 /*                                                                        */
6475 /*    This function sets the websocket.                                   */
6476 /*                                                                        */
6477 /*  INPUT                                                                 */
6478 /*                                                                        */
6479 /*    client_ptr                            Pointer to MQTT Client        */
6480 /*    host                                  Host used by the client       */
6481 /*    host_length                           Length of host, in bytes      */
6482 /*    uri_path                              URI path used by the client   */
6483 /*    uri_path_length                       Length of uri path, in bytes  */
6484 /*                                                                        */
6485 /*  OUTPUT                                                                */
6486 /*                                                                        */
6487 /*    status                                Completion status             */
6488 /*                                                                        */
6489 /*  CALLS                                                                 */
6490 /*                                                                        */
6491 /*    None                                                                */
6492 /*                                                                        */
6493 /*  CALLED BY                                                             */
6494 /*                                                                        */
6495 /*    Application Code                                                    */
6496 /*                                                                        */
6497 /*  RELEASE HISTORY                                                       */
6498 /*                                                                        */
6499 /*    DATE              NAME                      DESCRIPTION             */
6500 /*                                                                        */
6501 /*  10-31-2022     Yuxin Zhou               Initial Version 6.2.0         */
6502 /*                                                                        */
6503 /**************************************************************************/
_nxd_mqtt_client_websocket_set(NXD_MQTT_CLIENT * client_ptr,UCHAR * host,UINT host_length,UCHAR * uri_path,UINT uri_path_length)6504 UINT _nxd_mqtt_client_websocket_set(NXD_MQTT_CLIENT *client_ptr, UCHAR *host, UINT host_length, UCHAR *uri_path, UINT uri_path_length)
6505 {
6506 UINT status;
6507 
6508     /* Obtain the mutex. */
6509     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
6510 
6511     if (status != TX_SUCCESS)
6512     {
6513         return(NXD_MQTT_MUTEX_FAILURE);
6514     }
6515 
6516     /* Set the host info.  */
6517     client_ptr -> nxd_mqtt_client_websocket_host = host;
6518     client_ptr -> nxd_mqtt_client_websocket_host_length = host_length;
6519     client_ptr -> nxd_mqtt_client_websocket_uri_path = uri_path;
6520     client_ptr -> nxd_mqtt_client_websocket_uri_path_length = uri_path_length;
6521 
6522     /* Create WebSocket.  */
6523     status = nx_websocket_client_create(&client_ptr -> nxd_mqtt_client_websocket, (UCHAR *)"",
6524                                         client_ptr -> nxd_mqtt_client_ip_ptr,
6525                                         client_ptr -> nxd_mqtt_client_packet_pool_ptr);
6526 
6527     /* Check status.  */
6528     if (status)
6529     {
6530         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
6531         return(status);
6532     }
6533 
6534     client_ptr -> nxd_mqtt_client_use_websocket = NX_TRUE;
6535 
6536     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
6537     return(NXD_MQTT_SUCCESS);
6538 }
6539 
6540 
6541 /**************************************************************************/
6542 /*                                                                        */
6543 /*  FUNCTION                                               RELEASE        */
6544 /*                                                                        */
6545 /*    _nxde_mqtt_client_websocket_set                     PORTABLE C      */
6546 /*                                                           6.2.0        */
6547 /*  AUTHOR                                                                */
6548 /*                                                                        */
6549 /*    Yuxin Zhou, Microsoft Corporation                                   */
6550 /*                                                                        */
6551 /*  DESCRIPTION                                                           */
6552 /*                                                                        */
6553 /*    This function checks for errors in setting MQTT client websocket.   */
6554 /*                                                                        */
6555 /*  INPUT                                                                 */
6556 /*                                                                        */
6557 /*    client_ptr                            Pointer to MQTT Client        */
6558 /*    host                                  Host used by the client       */
6559 /*    host_length                           Length of host, in bytes      */
6560 /*    uri_path                              URI path used by the client   */
6561 /*    uri_path_length                       Length of uri path, in bytes  */
6562 /*                                                                        */
6563 /*  OUTPUT                                                                */
6564 /*                                                                        */
6565 /*    status                                Completion status             */
6566 /*                                                                        */
6567 /*  CALLS                                                                 */
6568 /*                                                                        */
6569 /*    _nxd_mqtt_client_websocket_set                                      */
6570 /*                                                                        */
6571 /*  CALLED BY                                                             */
6572 /*                                                                        */
6573 /*    Application Code                                                    */
6574 /*                                                                        */
6575 /*  RELEASE HISTORY                                                       */
6576 /*                                                                        */
6577 /*    DATE              NAME                      DESCRIPTION             */
6578 /*                                                                        */
6579 /*  10-31-2022     Yuxin Zhou               Initial Version 6.2.0         */
6580 /*                                                                        */
6581 /**************************************************************************/
_nxde_mqtt_client_websocket_set(NXD_MQTT_CLIENT * client_ptr,UCHAR * host,UINT host_length,UCHAR * uri_path,UINT uri_path_length)6582 UINT _nxde_mqtt_client_websocket_set(NXD_MQTT_CLIENT *client_ptr, UCHAR *host, UINT host_length, UCHAR *uri_path, UINT uri_path_length)
6583 {
6584 
6585     /* Validate the parameters.  */
6586     if ((client_ptr == NX_NULL) || (host == NX_NULL) || (host_length == 0) ||
6587         (uri_path == NX_NULL) || (uri_path_length == 0))
6588     {
6589         return(NX_PTR_ERROR);
6590     }
6591 
6592     return(_nxd_mqtt_client_websocket_set(client_ptr, host, host_length, uri_path, uri_path_length));
6593 }
6594 #endif /* NXD_MQTT_OVER_WEBSOCKET */
6595