1 /**************************************************************************/
2 /*                                                                        */
3 /*       Copyright (c) Microsoft Corporation. All rights reserved.        */
4 /*                                                                        */
5 /*       This software is licensed under the Microsoft Software License   */
6 /*       Terms for Microsoft Azure RTOS. Full text of the license can be  */
7 /*       found in the LICENSE file at https://aka.ms/AzureRTOS_EULA       */
8 /*       and in the root directory of this software.                      */
9 /*                                                                        */
10 /**************************************************************************/
11 
12 
13 /**************************************************************************/
14 /**************************************************************************/
15 /**                                                                       */
16 /** NetX Component                                                        */
17 /**                                                                       */
18 /**   MQTT (MQTT)                                                         */
19 /**                                                                       */
20 /**************************************************************************/
21 /**************************************************************************/
22 
23 #define NXD_MQTT_CLIENT_SOURCE_CODE
24 
25 
26 /* Force error checking to be disabled in this module */
27 
28 #ifndef NX_DISABLE_ERROR_CHECKING
29 #define NX_DISABLE_ERROR_CHECKING
30 #endif
31 
32 
33 /* Include necessary system files.  */
34 
35 #include    "tx_api.h"
36 #include    "nx_api.h"
37 #include    "nx_ip.h"
38 #include    "nxd_mqtt_client.h"
39 
40 /* Bring in externals for caller checking code.  */
41 
42 #define MQTT_ALL_EVENTS               ((ULONG)0xFFFFFFFF)
43 #define MQTT_TIMEOUT_EVENT            ((ULONG)0x00000001)
44 #define MQTT_PACKET_RECEIVE_EVENT     ((ULONG)0x00000002)
45 #define MQTT_START_EVENT              ((ULONG)0x00000004)
46 #define MQTT_DELETE_EVENT             ((ULONG)0x00000008)
47 #define MQTT_PING_TIMEOUT_EVENT       ((ULONG)0x00000010)
48 #define MQTT_NETWORK_DISCONNECT_EVENT ((ULONG)0x00000020)
49 #define MQTT_TCP_ESTABLISH_EVENT      ((ULONG)0x00000040)
50 
51 static UINT _nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
52                                              CHAR *client_id, UINT client_id_length,
53                                              NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
54                                              VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority);
55 static UINT _nxd_mqtt_packet_allocate(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, ULONG wait_option);
56 static UINT _nxd_mqtt_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UINT wait_option);
57 static UINT _nxd_mqtt_packet_receive(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, UINT wait_option);
58 static UINT _nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET **new_packet_ptr,
59                                            USHORT packet_id, UCHAR set_duplicate_flag, UINT wait_option);
60 static VOID _nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr);
61 static VOID _nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr);
62 static UINT _nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option);
63 static UINT _nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option);
64 
65 /**************************************************************************/
66 /*                                                                        */
67 /*  FUNCTION                                               RELEASE        */
68 /*                                                                        */
69 /*    _nxd_mqtt_client_set_fixed_header                   PORTABLE C      */
70 /*                                                           6.1          */
71 /*  AUTHOR                                                                */
72 /*                                                                        */
73 /*    Yuxin Zhou, Microsoft Corporation                                   */
74 /*                                                                        */
75 /*  DESCRIPTION                                                           */
76 /*                                                                        */
77 /*    This function writes the fixed header filed in the outgoing         */
78 /*    MQTT packet.                                                        */
79 /*                                                                        */
80 /*    This function follows the logic outlined in 2.2 in MQTT             */
81 /*    specification.                                                      */
82 /*                                                                        */
83 /*  INPUT                                                                 */
84 /*                                                                        */
85 /*    client_ptr                            Pointer to MQTT Client        */
86 /*    packet_ptr                            Outgoing MQTT packet          */
87 /*    control_header                        Control byte                  */
88 /*    length                                Remaining length in bytes     */
89 /*    wait_option                           Wait option                   */
90 /*                                                                        */
91 /*  OUTPUT                                                                */
92 /*                                                                        */
93 /*    status                                                              */
94 /*                                                                        */
95 /*  CALLS                                                                 */
96 /*                                                                        */
97 /*    nx_packet_data_append                 Append packet data            */
98 /*                                                                        */
99 /*  CALLED BY                                                             */
100 /*                                                                        */
101 /*    _nxd_mqtt_client_sub_unsub                                          */
102 /*    _nxd_mqtt_client_connect                                            */
103 /*    _nxd_mqtt_client_publish                                            */
104 /*                                                                        */
105 /*  RELEASE HISTORY                                                       */
106 /*                                                                        */
107 /*    DATE              NAME                      DESCRIPTION             */
108 /*                                                                        */
109 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
110 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
111 /*                                            resulting in version 6.1    */
112 /*                                                                        */
113 /**************************************************************************/
_nxd_mqtt_client_set_fixed_header(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,UCHAR control_header,UINT length,UINT wait_option)114 UINT _nxd_mqtt_client_set_fixed_header(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UCHAR control_header, UINT length, UINT wait_option)
115 {
116 UCHAR  fixed_header[5];
117 UCHAR *byte = fixed_header;
118 UINT   count = 0;
119 UINT   ret;
120 
121     *byte = control_header;
122     byte++;
123 
124     do
125     {
126         if (length & 0xFFFFFF80)
127         {
128             *(byte + count) = (UCHAR)((length & 0x7F) | 0x80);
129         }
130         else
131         {
132             *(byte + count) = length & 0x7F;
133         }
134         length = length >> 7;
135 
136         count++;
137     } while (length != 0);
138 
139     ret = nx_packet_data_append(packet_ptr, fixed_header, count + 1,
140                                 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
141 
142     return(ret);
143 }
144 
145 
146 
147 /**************************************************************************/
148 /*                                                                        */
149 /*  FUNCTION                                               RELEASE        */
150 /*                                                                        */
151 /*    _nxd_mqtt_read_remaining_length                     PORTABLE C      */
152 /*                                                           6.1          */
153 /*  AUTHOR                                                                */
154 /*                                                                        */
155 /*    Yuxin Zhou, Microsoft Corporation                                   */
156 /*                                                                        */
157 /*  DESCRIPTION                                                           */
158 /*                                                                        */
159 /*    This function parses the remaining length filed in the incoming     */
160 /*    MQTT packet.                                                        */
161 /*                                                                        */
162 /*    This function follows the logic outlined in 2.2.3 in MQTT           */
163 /*    specification                                                       */
164 /*                                                                        */
165 /*  INPUT                                                                 */
166 /*                                                                        */
167 /*    packet_ptr                            Incoming MQTT packet.         */
168 /*    remaining_length                      remaining length in bytes,    */
169 /*                                            this is the return value.   */
170 /*    offset                                Pointer to offset of the      */
171 /*                                            remaining data              */
172 /*                                                                        */
173 /*  OUTPUT                                                                */
174 /*                                                                        */
175 /*    status                                                              */
176 /*                                                                        */
177 /*  CALLS                                                                 */
178 /*                                                                        */
179 /*    None                                                                */
180 /*                                                                        */
181 /*  CALLED BY                                                             */
182 /*                                                                        */
183 /*    _nxd_mqtt_process_publish                                           */
184 /*    _nxd_mqtt_client_message_get                                        */
185 /*    _nxd_mqtt_process_sub_unsub_ack                                     */
186 /*                                                                        */
187 /*  RELEASE HISTORY                                                       */
188 /*                                                                        */
189 /*    DATE              NAME                      DESCRIPTION             */
190 /*                                                                        */
191 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
192 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
193 /*                                            resulting in version 6.1    */
194 /*                                                                        */
195 /**************************************************************************/
_nxd_mqtt_read_remaining_length(NX_PACKET * packet_ptr,UINT * remaining_length,ULONG * offset_ptr)196 UINT _nxd_mqtt_read_remaining_length(NX_PACKET *packet_ptr, UINT *remaining_length, ULONG *offset_ptr)
197 {
198 UINT   value = 0;
199 UCHAR  bytes[4] = {0};
200 UINT   multiplier = 1;
201 UINT   byte_count = 0;
202 ULONG  bytes_copied;
203 
204     if (nx_packet_data_extract_offset(packet_ptr, 1, &bytes, sizeof(bytes), &bytes_copied))
205     {
206 
207         /* Packet is incomplete. */
208         return(NXD_MQTT_PARTIAL_PACKET);
209     }
210 
211     do
212     {
213         if (byte_count >= bytes_copied)
214         {
215             if (byte_count == 4)
216             {
217                 return(NXD_MQTT_INTERNAL_ERROR);
218             }
219             else
220             {
221 
222                 /* Packet is incomplete. */
223                 return(NXD_MQTT_PARTIAL_PACKET);
224             }
225         }
226         value += (((bytes[byte_count]) & 0x7F) * multiplier);
227         multiplier = multiplier << 7;
228     } while ((bytes[byte_count++]) & 0x80);
229 
230     if ((1 + byte_count + value) > packet_ptr -> nx_packet_length)
231     {
232 
233         /* Packet is incomplete. */
234         /* Remaining length is larger than packet size. */
235         return(NXD_MQTT_PARTIAL_PACKET);
236     }
237 
238     *remaining_length = value;
239     *offset_ptr = (1 + byte_count);
240 
241     return(NXD_MQTT_SUCCESS);
242 }
243 
244 
245 
246 /**************************************************************************/
247 /*                                                                        */
248 /*  FUNCTION                                               RELEASE        */
249 /*                                                                        */
250 /*    _nxd_mqtt_client_sub_unsub                          PORTABLE C      */
251 /*                                                           6.2.0        */
252 /*  AUTHOR                                                                */
253 /*                                                                        */
254 /*    Yuxin Zhou, Microsoft Corporation                                   */
255 /*                                                                        */
256 /*  DESCRIPTION                                                           */
257 /*                                                                        */
258 /*    This function sends a subscribe or unsubscribe message to the       */
259 /*    broker.                                                             */
260 /*                                                                        */
261 /*                                                                        */
262 /*  INPUT                                                                 */
263 /*                                                                        */
264 /*    client_ptr                            Pointer to MQTT Client        */
265 /*    op                                    Subscribe or Unsubscribe      */
266 /*    topic_name                            Pointer to the topic string   */
267 /*                                            to subscribe to             */
268 /*    topic_name_length                     Length of the topic string    */
269 /*                                            in bytes                    */
270 /*    packet_id_ptr                         Pointer to packet id that     */
271 /*                                            will be filled with         */
272 /*                                            assigned packet id for      */
273 /*                                            sub/unsub message           */
274 /*    QoS                                   Expected QoS level            */
275 /*                                                                        */
276 /*  OUTPUT                                                                */
277 /*                                                                        */
278 /*    status                                Completion status             */
279 /*                                                                        */
280 /*  CALLS                                                                 */
281 /*                                                                        */
282 /*    tx_mutex_get                                                        */
283 /*    _nxd_mqtt_packet_allocate                                           */
284 /*    _nxd_mqtt_client_set_fixed_header                                   */
285 /*    _nxd_mqtt_client_append_message                                     */
286 /*    tx_mutex_put                                                        */
287 /*    _nxd_mqtt_packet_send                                               */
288 /*    nx_packet_release                                                   */
289 /*    _nxd_mqtt_copy_transmit_packet                                      */
290 /*                                                                        */
291 /*  CALLED BY                                                             */
292 /*                                                                        */
293 /*    _nxd_mqtt_client_subscribe                                          */
294 /*    _nxd_mqtt_client_unsubscribe                                        */
295 /*                                                                        */
296 /*  RELEASE HISTORY                                                       */
297 /*                                                                        */
298 /*    DATE              NAME                      DESCRIPTION             */
299 /*                                                                        */
300 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
301 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
302 /*                                            resulting in version 6.1    */
303 /*  11-09-2020     Yuxin Zhou               Modified comment(s), and      */
304 /*                                            added packet id parameter,  */
305 /*                                            resulting in version 6.1.2  */
306 /*  07-29-2022     Spencer McDonough        Modified comment(s),          */
307 /*                                            improved internal logic,    */
308 /*                                            resulting in version 6.1.12 */
309 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
310 /*                                            the logic of sending packet,*/
311 /*                                            resulting in version 6.2.0  */
312 /*                                                                        */
313 /**************************************************************************/
_nxd_mqtt_client_sub_unsub(NXD_MQTT_CLIENT * client_ptr,UINT op,CHAR * topic_name,UINT topic_name_length,USHORT * packet_id_ptr,UINT QoS)314 UINT _nxd_mqtt_client_sub_unsub(NXD_MQTT_CLIENT *client_ptr, UINT op,
315                                 CHAR *topic_name, UINT topic_name_length,
316                                 USHORT *packet_id_ptr, UINT QoS)
317 {
318 
319 
320 NX_PACKET          *packet_ptr;
321 NX_PACKET          *transmit_packet_ptr;
322 UINT                status;
323 UINT                length = 0;
324 UINT                ret = NXD_MQTT_SUCCESS;
325 UCHAR               temp_data[2];
326 
327     /* Obtain the mutex. */
328     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
329 
330     if (status != TX_SUCCESS)
331     {
332         return(NXD_MQTT_MUTEX_FAILURE);
333     }
334 
335     /* Do nothing if the client is already connected. */
336     if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTED)
337     {
338         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
339         return(NXD_MQTT_NOT_CONNECTED);
340     }
341 
342     status = _nxd_mqtt_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
343     if (status)
344     {
345         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
346         return(status);
347     }
348 
349     /* Compute the remaining length field, starting with 2 bytes of packet ID */
350     length = 2;
351 
352     /* Count the topic. */
353     length += (2 + topic_name_length);
354 
355     if (op == ((MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02))
356     {
357         /* Count one byte for QoS */
358         length++;
359     }
360 
361     /* Write out the control header and remaining length field. */
362     ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, (UCHAR )op, length, NX_WAIT_FOREVER);
363 
364     if (ret)
365     {
366 
367         /* Release the mutex. */
368         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
369 
370         /* Release the packet. */
371         nx_packet_release(packet_ptr);
372 
373         return(NXD_MQTT_PACKET_POOL_FAILURE);
374     }
375 
376     temp_data[0] = (UCHAR)(client_ptr -> nxd_mqtt_client_packet_identifier >> 8);
377     temp_data[1] = (client_ptr -> nxd_mqtt_client_packet_identifier &  0xFF);
378 
379     if (packet_id_ptr)
380     {
381         *packet_id_ptr = (USHORT)(client_ptr -> nxd_mqtt_client_packet_identifier & 0xFFFF);
382     }
383 
384     /* Append packet ID. */
385     ret = nx_packet_data_append(packet_ptr, temp_data, 2, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_WAIT_FOREVER);
386 
387     if (ret)
388     {
389 
390         /* Release the mutex. */
391         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
392 
393         /* Release the packet. */
394         nx_packet_release(packet_ptr);
395 
396         return(NXD_MQTT_PACKET_POOL_FAILURE);
397     }
398 
399     /* Append topic name */
400     ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, topic_name, topic_name_length, NX_WAIT_FOREVER);
401 
402     if (ret)
403     {
404 
405         /* Release the mutex. */
406         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
407 
408         /* Release the packet. */
409         nx_packet_release(packet_ptr);
410 
411         return(NXD_MQTT_PACKET_POOL_FAILURE);
412     }
413 
414     if (op == ((MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02))
415     {
416         /* Fill in QoS value. */
417         temp_data[0] = QoS & 0x3;
418 
419         ret = nx_packet_data_append(packet_ptr, temp_data, 1, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_WAIT_FOREVER);
420 
421         if (ret)
422         {
423 
424             /* Release the mutex. */
425             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
426 
427             /* Release the packet. */
428             nx_packet_release(packet_ptr);
429 
430             return(NXD_MQTT_PACKET_POOL_FAILURE);
431         }
432     }
433 
434     /* Copy packet for retransmission. */
435     if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
436                                        (USHORT)(client_ptr -> nxd_mqtt_client_packet_identifier),
437                                        NX_FALSE, NX_WAIT_FOREVER))
438     {
439         /* Release the mutex. */
440         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
441 
442         /* Release the packet. */
443         nx_packet_release(packet_ptr);
444 
445         return(NXD_MQTT_PACKET_POOL_FAILURE);
446     }
447 
448     if (client_ptr -> message_transmit_queue_head == NX_NULL)
449     {
450         client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
451     }
452     else
453     {
454         client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
455     }
456     client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
457 
458     client_ptr -> nxd_mqtt_client_packet_identifier = (client_ptr -> nxd_mqtt_client_packet_identifier + 1) & 0xFFFF;
459 
460     /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
461     if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
462         client_ptr -> nxd_mqtt_client_packet_identifier = 1;
463 
464     /* Update the timeout value. */
465     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
466 
467     /* Release the mutex. */
468     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
469 
470     /* Ready to send the connect message to the server. */
471     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
472 
473     if (status)
474     {
475         /* Release the packet. */
476         nx_packet_release(packet_ptr);
477 
478         ret = NXD_MQTT_COMMUNICATION_FAILURE;
479     }
480 
481     return(ret);
482 }
483 
484 
485 /**************************************************************************/
486 /*                                                                        */
487 /*  FUNCTION                                               RELEASE        */
488 /*                                                                        */
489 /*    _nxd_mqtt_packet_allocate                           PORTABLE C      */
490 /*                                                           6.2.0        */
491 /*  AUTHOR                                                                */
492 /*                                                                        */
493 /*    Yuxin Zhou, Microsoft Corporation                                   */
494 /*                                                                        */
495 /*  DESCRIPTION                                                           */
496 /*                                                                        */
497 /*    This function allocates a packet for transmitting MQTT message.     */
498 /*    Special care has to be taken for accommodating IPv4/IPv6 header,    */
499 /*    and possibly TLS record if TLS is being used. On failure, the       */
500 /*    TLS mutex is released and the caller can simply return.             */
501 /*                                                                        */
502 /*                                                                        */
503 /*  INPUT                                                                 */
504 /*                                                                        */
505 /*    client_ptr                            Pointer to MQTT Client        */
506 /*    packet_ptr                            Allocated packet to be        */
507 /*                                            returned to the caller.     */
508 /*                                                                        */
509 /*  OUTPUT                                                                */
510 /*                                                                        */
511 /*    status                                Completion status             */
512 /*                                                                        */
513 /*  CALLS                                                                 */
514 /*                                                                        */
515 /*    nx_secure_tls_packet_allocate         Allocate packet for MQTT      */
516 /*                                            over TLS socket             */
517 /*    nx_packet_allocate                    Allocate a packet for MQTT    */
518 /*                                            over regular TCP socket     */
519 /*    tx_mutex_put                          Release a mutex               */
520 /*                                                                        */
521 /*  CALLED BY                                                             */
522 /*                                                                        */
523 /*    _nxd_mqtt_process_publish                                           */
524 /*    _nxd_mqtt_client_connect                                            */
525 /*    _nxd_mqtt_client_publish                                            */
526 /*    _nxd_mqtt_client_subscribe                                          */
527 /*    _nxd_mqtt_client_unsubscribe                                        */
528 /*    _nxd_mqtt_client_send_simple_message                                */
529 /*                                                                        */
530 /*  RELEASE HISTORY                                                       */
531 /*                                                                        */
532 /*    DATE              NAME                      DESCRIPTION             */
533 /*                                                                        */
534 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
535 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
536 /*                                            resulting in version 6.1    */
537 /*  07-29-2022     Spencer McDonough        Modified comment(s), and      */
538 /*                                            improved internal logic,    */
539 /*                                            resulting in version 6.1.12 */
540 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
541 /*                                            mqtt over websocket,        */
542 /*                                            resulting in version 6.2.0  */
543 /*                                                                        */
544 /**************************************************************************/
_nxd_mqtt_packet_allocate(NXD_MQTT_CLIENT * client_ptr,NX_PACKET ** packet_ptr,ULONG wait_option)545 static UINT _nxd_mqtt_packet_allocate(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, ULONG wait_option)
546 {
547 UINT status = NXD_MQTT_SUCCESS;
548 
549 #ifdef NXD_MQTT_OVER_WEBSOCKET
550     if (client_ptr -> nxd_mqtt_client_use_websocket)
551     {
552 
553         /* Use WebSocket packet allocate since it is able to count for WebSocket-related header space */
554         status = nx_websocket_client_packet_allocate(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, wait_option);
555     }
556     else
557 #endif /* NXD_MQTT_OVER_WEBSOCKET */
558 #ifdef NX_SECURE_ENABLE
559     if (client_ptr -> nxd_mqtt_client_use_tls)
560     {
561         /* Use TLS packet allocate.  The TLS packet allocate is able to count for
562            TLS-related header space including crypto initial vector area. */
563         status = nx_secure_tls_packet_allocate(&client_ptr -> nxd_mqtt_tls_session, client_ptr -> nxd_mqtt_client_packet_pool_ptr,
564                                                packet_ptr, wait_option);
565     }
566     /* Allocate a packet  */
567     else
568     {
569 #endif /* NX_SECURE_ENABLE */
570         if (client_ptr -> nxd_mqtt_client_socket.nx_tcp_socket_connect_ip.nxd_ip_version == NX_IP_VERSION_V4)
571         {
572             status = nx_packet_allocate(client_ptr -> nxd_mqtt_client_packet_pool_ptr, packet_ptr, NX_IPv4_TCP_PACKET,
573                                         wait_option);
574         }
575         else
576         {
577             status = nx_packet_allocate(client_ptr -> nxd_mqtt_client_packet_pool_ptr, packet_ptr, NX_IPv6_TCP_PACKET,
578                                         wait_option);
579         }
580 #ifdef NX_SECURE_ENABLE
581     }
582 #endif /* NX_SECURE_ENABLE */
583 
584     if (status != NX_SUCCESS)
585     {
586         return(NXD_MQTT_PACKET_POOL_FAILURE);
587     }
588 
589     return(NXD_MQTT_SUCCESS);
590 }
591 
592 /**************************************************************************/
593 /*                                                                        */
594 /*  FUNCTION                                               RELEASE        */
595 /*                                                                        */
596 /*    _nxd_mqtt_packet_send                               PORTABLE C      */
597 /*                                                           6.2.0        */
598 /*  AUTHOR                                                                */
599 /*                                                                        */
600 /*    Yuxin Zhou, Microsoft Corporation                                   */
601 /*                                                                        */
602 /*  DESCRIPTION                                                           */
603 /*                                                                        */
604 /*    This function sends out a packet.                                   */
605 /*                                                                        */
606 /*                                                                        */
607 /*  INPUT                                                                 */
608 /*                                                                        */
609 /*    client_ptr                            Pointer to MQTT Client        */
610 /*    packet_ptr                            Pointer to packet             */
611 /*    wait_option                           Timeout value                 */
612 /*                                                                        */
613 /*  OUTPUT                                                                */
614 /*                                                                        */
615 /*    status                                Completion status             */
616 /*                                                                        */
617 /*  CALLS                                                                 */
618 /*                                                                        */
619 /*    nx_websocket_client_send                                            */
620 /*    nx_secure_tls_session_send                                          */
621 /*    nx_tcp_socket_send                                                  */
622 /*                                                                        */
623 /*  CALLED BY                                                             */
624 /*                                                                        */
625 /*                                                                        */
626 /*  RELEASE HISTORY                                                       */
627 /*                                                                        */
628 /*    DATE              NAME                      DESCRIPTION             */
629 /*                                                                        */
630 /*  10-31-2022     Bo Chen                  Initial Version 6.2.0         */
631 /*                                                                        */
632 /**************************************************************************/
_nxd_mqtt_packet_send(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,UINT wait_option)633 static UINT _nxd_mqtt_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UINT wait_option)
634 {
635 UINT status = NXD_MQTT_SUCCESS;
636 
637 #ifdef NXD_MQTT_OVER_WEBSOCKET
638     if (client_ptr -> nxd_mqtt_client_use_websocket)
639     {
640         status = nx_websocket_client_send(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, NX_WEBSOCKET_OPCODE_BINARY_FRAME, NX_TRUE, wait_option);
641     }
642     else
643 #endif /* NXD_MQTT_OVER_WEBSOCKET */
644 
645 #ifdef NX_SECURE_ENABLE
646     if (client_ptr -> nxd_mqtt_client_use_tls)
647     {
648         status = nx_secure_tls_session_send(&(client_ptr -> nxd_mqtt_tls_session), packet_ptr, wait_option);
649     }
650     else
651 #endif /* NX_SECURE_ENABLE */
652     {
653         status = nx_tcp_socket_send(&client_ptr -> nxd_mqtt_client_socket, packet_ptr, wait_option);
654     }
655 
656     return(status);
657 }
658 
659 /**************************************************************************/
660 /*                                                                        */
661 /*  FUNCTION                                               RELEASE        */
662 /*                                                                        */
663 /*    _nxd_mqtt_packet_receive                            PORTABLE C      */
664 /*                                                           6.2.0        */
665 /*  AUTHOR                                                                */
666 /*                                                                        */
667 /*    Yuxin Zhou, Microsoft Corporation                                   */
668 /*                                                                        */
669 /*  DESCRIPTION                                                           */
670 /*                                                                        */
671 /*    This function receives a packet.                                    */
672 /*                                                                        */
673 /*                                                                        */
674 /*  INPUT                                                                 */
675 /*                                                                        */
676 /*    client_ptr                            Pointer to MQTT Client        */
677 /*    packet_ptr                            Pointer to packet             */
678 /*    wait_option                           Timeout value                 */
679 /*                                                                        */
680 /*  OUTPUT                                                                */
681 /*                                                                        */
682 /*    status                                Completion status             */
683 /*                                                                        */
684 /*  CALLS                                                                 */
685 /*                                                                        */
686 /*    nx_websocket_client_receive                                         */
687 /*    nx_secure_tls_session_receive                                       */
688 /*    nx_tcp_socket_receive                                               */
689 /*                                                                        */
690 /*  CALLED BY                                                             */
691 /*                                                                        */
692 /*                                                                        */
693 /*  RELEASE HISTORY                                                       */
694 /*                                                                        */
695 /*    DATE              NAME                      DESCRIPTION             */
696 /*                                                                        */
697 /*  10-31-2022     Bo Chen                  Initial Version 6.2.0         */
698 /*                                                                        */
699 /**************************************************************************/
_nxd_mqtt_packet_receive(NXD_MQTT_CLIENT * client_ptr,NX_PACKET ** packet_ptr,UINT wait_option)700 static UINT _nxd_mqtt_packet_receive(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, UINT wait_option)
701 {
702 UINT status = NXD_MQTT_SUCCESS;
703 #ifdef NXD_MQTT_OVER_WEBSOCKET
704 UINT op_code = 0;
705 #endif /* NXD_MQTT_OVER_WEBSOCKET*/
706 
707 #ifdef NXD_MQTT_OVER_WEBSOCKET
708     if (client_ptr -> nxd_mqtt_client_use_websocket)
709     {
710         status = nx_websocket_client_receive(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, &op_code, wait_option);
711         if ((status == NX_SUCCESS) && (op_code != NX_WEBSOCKET_OPCODE_BINARY_FRAME))
712         {
713             return(NX_INVALID_PACKET);
714         }
715     }
716     else
717 #endif /* NXD_MQTT_OVER_WEBSOCKET */
718 
719 #ifdef NX_SECURE_ENABLE
720     if (client_ptr -> nxd_mqtt_client_use_tls)
721     {
722         status = nx_secure_tls_session_receive(&(client_ptr -> nxd_mqtt_tls_session), packet_ptr, wait_option);
723     }
724     else
725 #endif /* NX_SECURE_ENABLE */
726     {
727         status = nx_tcp_socket_receive(&(client_ptr -> nxd_mqtt_client_socket), packet_ptr, wait_option);
728     }
729 
730     return(status);
731 }
732 
733 /**************************************************************************/
734 /*                                                                        */
735 /*  FUNCTION                                               RELEASE        */
736 /*                                                                        */
737 /*    _nxd_mqtt_tcp_establish_notify                      PORTABLE C      */
738 /*                                                           6.1          */
739 /*  AUTHOR                                                                */
740 /*                                                                        */
741 /*    Yuxin Zhou, Microsoft Corporation                                   */
742 /*                                                                        */
743 /*  DESCRIPTION                                                           */
744 /*                                                                        */
745 /*    This internal function is installed as TCP connection establish     */
746 /*    callback function.                                                  */
747 /*                                                                        */
748 /*  INPUT                                                                 */
749 /*                                                                        */
750 /*    socket_ptr                            The socket that receives      */
751 /*                                           the message.                 */
752 /*                                                                        */
753 /*  OUTPUT                                                                */
754 /*                                                                        */
755 /*    None                                                                */
756 /*                                                                        */
757 /*  CALLS                                                                 */
758 /*                                                                        */
759 /*    tx_event_flags_set                                                  */
760 /*                                                                        */
761 /*  CALLED BY                                                             */
762 /*                                                                        */
763 /*    _nxd_mqtt_thread_entry                                              */
764 /*    _nxd_mqtt_client_event_process                                      */
765 /*                                                                        */
766 /*  RELEASE HISTORY                                                       */
767 /*                                                                        */
768 /*    DATE              NAME                      DESCRIPTION             */
769 /*                                                                        */
770 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
771 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
772 /*                                            resulting in version 6.1    */
773 /*                                                                        */
774 /**************************************************************************/
_nxd_mqtt_tcp_establish_notify(NX_TCP_SOCKET * socket_ptr)775 static VOID _nxd_mqtt_tcp_establish_notify(NX_TCP_SOCKET *socket_ptr)
776 {
777 NXD_MQTT_CLIENT *client_ptr;
778 
779     client_ptr = (NXD_MQTT_CLIENT *)socket_ptr -> nx_tcp_socket_reserved_ptr;
780 
781     if (&(client_ptr -> nxd_mqtt_client_socket) == socket_ptr)
782     {
783 
784         /* Set the event flag. */
785 #ifndef NXD_MQTT_CLOUD_ENABLE
786         tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_TCP_ESTABLISH_EVENT, TX_OR);
787 #else
788         nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_TCP_ESTABLISH_EVENT);
789 #endif /* NXD_MQTT_CLOUD_ENABLE */
790     }
791 }
792 
793 /**************************************************************************/
794 /*                                                                        */
795 /*  FUNCTION                                               RELEASE        */
796 /*                                                                        */
797 /*    _nxd_mqtt_receive_callback                          PORTABLE C      */
798 /*                                                           6.1          */
799 /*  AUTHOR                                                                */
800 /*                                                                        */
801 /*    Yuxin Zhou, Microsoft Corporation                                   */
802 /*                                                                        */
803 /*  DESCRIPTION                                                           */
804 /*                                                                        */
805 /*    This internal function is installed as TCP receive callback         */
806 /*    function.  On receiving a TCP message, the callback function        */
807 /*    sets an event flag to trigger MQTT client to process received       */
808 /*    message.                                                            */
809 /*                                                                        */
810 /*                                                                        */
811 /*  INPUT                                                                 */
812 /*                                                                        */
813 /*    socket_ptr                            The socket that receives      */
814 /*                                           the message.                 */
815 /*                                                                        */
816 /*  OUTPUT                                                                */
817 /*                                                                        */
818 /*    None                                                                */
819 /*                                                                        */
820 /*  CALLS                                                                 */
821 /*                                                                        */
822 /*    tx_event_flags_set                                                  */
823 /*                                                                        */
824 /*  CALLED BY                                                             */
825 /*                                                                        */
826 /*    _nxd_mqtt_thread_entry                                              */
827 /*    _nxd_mqtt_client_event_process                                      */
828 /*                                                                        */
829 /*  RELEASE HISTORY                                                       */
830 /*                                                                        */
831 /*    DATE              NAME                      DESCRIPTION             */
832 /*                                                                        */
833 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
834 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
835 /*                                            resulting in version 6.1    */
836 /*                                                                        */
837 /**************************************************************************/
_nxd_mqtt_receive_callback(NX_TCP_SOCKET * socket_ptr)838 static VOID _nxd_mqtt_receive_callback(NX_TCP_SOCKET *socket_ptr)
839 {
840 NXD_MQTT_CLIENT *client_ptr;
841 
842     client_ptr = (NXD_MQTT_CLIENT *)socket_ptr -> nx_tcp_socket_reserved_ptr;
843 
844     if (&(client_ptr -> nxd_mqtt_client_socket) == socket_ptr)
845     {
846         /* Set the event flag. */
847 #ifndef NXD_MQTT_CLOUD_ENABLE
848         tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_PACKET_RECEIVE_EVENT, TX_OR);
849 #else
850         nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PACKET_RECEIVE_EVENT);
851 #endif /* NXD_MQTT_CLOUD_ENABLE */
852     }
853 }
854 
855 /**************************************************************************/
856 /*                                                                        */
857 /*  FUNCTION                                               RELEASE        */
858 /*                                                                        */
859 /*    _nxd_mqtt_copy_transmit_packet                      PORTABLE C      */
860 /*                                                           6.1.8        */
861 /*  AUTHOR                                                                */
862 /*                                                                        */
863 /*    Yuxin Zhou, Microsoft Corporation                                   */
864 /*                                                                        */
865 /*  DESCRIPTION                                                           */
866 /*                                                                        */
867 /*    This internal function saves a transmit packet.                     */
868 /*    A transmit packet is allocated to store QoS 1 and 2 messages.       */
869 /*    Upon a message being properly acknowledged, the packet will         */
870 /*    be released.                                                        */
871 /*                                                                        */
872 /*                                                                        */
873 /*  INPUT                                                                 */
874 /*                                                                        */
875 /*    client_ptr                            Pointer to MQTT Client        */
876 /*    packet_ptr                            Pointer to the MQTT message   */
877 /*                                            packet to be saved          */
878 /*    new_packet_ptr                        Return a copied packet        */
879 /*    packet_id                             Current packet ID             */
880 /*    set_duplicate_flag                    Set duplicate flag for fixed  */
881 /*                                            header or not               */
882 /*    wait_option                           Timeout value                 */
883 /*                                                                        */
884 /*  OUTPUT                                                                */
885 /*                                                                        */
886 /*    status                                                              */
887 /*                                                                        */
888 /*  CALLS                                                                 */
889 /*                                                                        */
890 /*    nx_packet_copy                                                      */
891 /*                                                                        */
892 /*  CALLED BY                                                             */
893 /*                                                                        */
894 /*    _nxd_mqtt_client_sub_unsub                                          */
895 /*    _nxd_mqtt_process_publish                                           */
896 /*                                                                        */
897 /*  RELEASE HISTORY                                                       */
898 /*                                                                        */
899 /*    DATE              NAME                      DESCRIPTION             */
900 /*                                                                        */
901 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
902 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
903 /*                                            resulting in version 6.1    */
904 /*  08-02-2021     Yuxin Zhou               Modified comment(s),          */
905 /*                                            supported maximum transmit  */
906 /*                                            queue depth,                */
907 /*                                            resulting in version 6.1.8  */
908 /*                                                                        */
909 /**************************************************************************/
_nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET ** new_packet_ptr,USHORT packet_id,UCHAR set_duplicate_flag,UINT wait_option)910 static UINT _nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET **new_packet_ptr,
911                                            USHORT packet_id, UCHAR set_duplicate_flag, UINT wait_option)
912 {
913 UINT status;
914 
915 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
916     if (client_ptr -> message_transmit_queue_depth >= NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH)
917     {
918 
919         /* Hit the transmit queue depeth. No more packets should be queued. */
920         return(NX_TX_QUEUE_DEPTH);
921     }
922 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
923 
924     /* Copy current packet. */
925     status = nx_packet_copy(packet_ptr, new_packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
926     if (status)
927     {
928 
929         /* No available packet to be stored. */
930         return(NXD_MQTT_PACKET_POOL_FAILURE);
931     }
932 
933     /* Save packet_id at the beginning of packet. */
934     *((USHORT *)(*new_packet_ptr) -> nx_packet_data_start) = packet_id;
935 
936     if (set_duplicate_flag)
937     {
938 
939         /* Update duplicate flag in fixed header. */
940         *((*new_packet_ptr) -> nx_packet_prepend_ptr) = (*((*new_packet_ptr) -> nx_packet_prepend_ptr)) | MQTT_PUBLISH_DUP_FLAG;
941     }
942 
943 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
944     /* Increase the transmit queue depth.  */
945     client_ptr -> message_transmit_queue_depth++;
946 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
947 
948     return(NXD_MQTT_SUCCESS);
949 }
950 
951 /**************************************************************************/
952 /*                                                                        */
953 /*  FUNCTION                                               RELEASE        */
954 /*                                                                        */
955 /*    _nxd_mqtt_release_transmit_packet                   PORTABLE C      */
956 /*                                                           6.1.8        */
957 /*  AUTHOR                                                                */
958 /*                                                                        */
959 /*    Yuxin Zhou, Microsoft Corporation                                   */
960 /*                                                                        */
961 /*  DESCRIPTION                                                           */
962 /*                                                                        */
963 /*    This internal function releases a transmit packet.                  */
964 /*    A transmit packet is allocated to store QoS 1 and 2 messages.       */
965 /*    Upon a message being properly acknowledged, the packet can          */
966 /*    be released.                                                        */
967 /*                                                                        */
968 /*                                                                        */
969 /*  INPUT                                                                 */
970 /*                                                                        */
971 /*    client_ptr                            Pointer to MQTT Client        */
972 /*    packet_ptr                            Pointer to the MQTT message   */
973 /*                                            packet to be removed        */
974 /*    previous_packet_ptr                   Pointer to the previous packet*/
975 /*                                            or NULL if none exists      */
976 /*                                                                        */
977 /*  OUTPUT                                                                */
978 /*                                                                        */
979 /*    None                                                                */
980 /*                                                                        */
981 /*  CALLS                                                                 */
982 /*                                                                        */
983 /*    None                                                                */
984 /*                                                                        */
985 /*  CALLED BY                                                             */
986 /*                                                                        */
987 /*    _nxd_mqtt_thread_entry                                              */
988 /*    _nxd_mqtt_client_event_process                                      */
989 /*                                                                        */
990 /*  RELEASE HISTORY                                                       */
991 /*                                                                        */
992 /*    DATE              NAME                      DESCRIPTION             */
993 /*                                                                        */
994 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
995 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
996 /*                                            resulting in version 6.1    */
997 /*  08-02-2021     Yuxin Zhou               Modified comment(s),          */
998 /*                                            supported maximum transmit  */
999 /*                                            queue depth,                */
1000 /*                                            resulting in version 6.1.8  */
1001 /*                                                                        */
1002 /**************************************************************************/
_nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET * previous_packet_ptr)1003 static VOID _nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr)
1004 {
1005 
1006     if (previous_packet_ptr)
1007     {
1008         previous_packet_ptr -> nx_packet_queue_next = packet_ptr -> nx_packet_queue_next;
1009     }
1010     else
1011     {
1012         client_ptr -> message_transmit_queue_head = packet_ptr -> nx_packet_queue_next;
1013     }
1014 
1015     if (packet_ptr == client_ptr -> message_transmit_queue_tail)
1016     {
1017         client_ptr -> message_transmit_queue_tail = previous_packet_ptr;
1018     }
1019     nx_packet_release(packet_ptr);
1020 
1021 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
1022     client_ptr -> message_transmit_queue_depth--;
1023 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
1024 }
1025 
1026 /**************************************************************************/
1027 /*                                                                        */
1028 /*  FUNCTION                                               RELEASE        */
1029 /*                                                                        */
1030 /*    _nxd_mqtt_release_receive_packet                    PORTABLE C      */
1031 /*                                                           6.1          */
1032 /*  AUTHOR                                                                */
1033 /*                                                                        */
1034 /*    Yuxin Zhou, Microsoft Corporation                                   */
1035 /*                                                                        */
1036 /*  DESCRIPTION                                                           */
1037 /*                                                                        */
1038 /*    This internal function releases a receive packet.                   */
1039 /*    A receive packet is allocated to store QoS 1 and 2 messages.        */
1040 /*    Upon a message being properly acknowledged, the packet can          */
1041 /*    be released.                                                        */
1042 /*                                                                        */
1043 /*                                                                        */
1044 /*  INPUT                                                                 */
1045 /*                                                                        */
1046 /*    client_ptr                            Pointer to MQTT Client        */
1047 /*    packet_ptr                            Pointer to the MQTT message   */
1048 /*                                            packet to be removed        */
1049 /*    previous_packet_ptr                   Pointer to the previous packet*/
1050 /*                                            or NULL if none exists      */
1051 /*                                                                        */
1052 /*  OUTPUT                                                                */
1053 /*                                                                        */
1054 /*    None                                                                */
1055 /*                                                                        */
1056 /*  CALLS                                                                 */
1057 /*                                                                        */
1058 /*    None                                                                */
1059 /*                                                                        */
1060 /*  CALLED BY                                                             */
1061 /*                                                                        */
1062 /*    _nxd_mqtt_thread_entry                                              */
1063 /*    _nxd_mqtt_client_event_process                                      */
1064 /*                                                                        */
1065 /*  RELEASE HISTORY                                                       */
1066 /*                                                                        */
1067 /*    DATE              NAME                      DESCRIPTION             */
1068 /*                                                                        */
1069 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1070 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1071 /*                                            resulting in version 6.1    */
1072 /*                                                                        */
1073 /**************************************************************************/
_nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET * previous_packet_ptr)1074 static VOID _nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr)
1075 {
1076 
1077     if (previous_packet_ptr)
1078     {
1079         previous_packet_ptr -> nx_packet_queue_next = packet_ptr -> nx_packet_queue_next;
1080     }
1081     else
1082     {
1083         client_ptr -> message_receive_queue_head = packet_ptr -> nx_packet_queue_next;
1084     }
1085 
1086     if (packet_ptr == client_ptr -> message_receive_queue_tail)
1087     {
1088         client_ptr -> message_receive_queue_tail = previous_packet_ptr;
1089     }
1090 
1091     client_ptr -> message_receive_queue_depth--;
1092 }
1093 
1094 /**************************************************************************/
1095 /*                                                                        */
1096 /*  FUNCTION                                               RELEASE        */
1097 /*                                                                        */
1098 /*    _nxd_mqtt_process_connack                           PORTABLE C      */
1099 /*                                                           6.1          */
1100 /*  AUTHOR                                                                */
1101 /*                                                                        */
1102 /*    Yuxin Zhou, Microsoft Corporation                                   */
1103 /*                                                                        */
1104 /*  DESCRIPTION                                                           */
1105 /*                                                                        */
1106 /*    This internal function processes a CONNACK message from the broker. */
1107 /*                                                                        */
1108 /*                                                                        */
1109 /*  INPUT                                                                 */
1110 /*                                                                        */
1111 /*    client_ptr                            Pointer to MQTT Client        */
1112 /*    packet_ptr                            Pointer to the packet         */
1113 /*    wait_option                           Timeout value                 */
1114 /*                                                                        */
1115 /*  OUTPUT                                                                */
1116 /*                                                                        */
1117 /*    status                                                              */
1118 /*                                                                        */
1119 /*  CALLS                                                                 */
1120 /*                                                                        */
1121 /*    [nxd_mqtt_connect_notify]             User supplied connect         */
1122 /*                                            callback function           */
1123 /*    tx_mutex_get                                                        */
1124 /*    tx_mutex_put                                                        */
1125 /*    _nxd_mqtt_client_retransmit_message                                 */
1126 /*    _nxd_mqtt_client_connection_end                                     */
1127 /*                                                                        */
1128 /*                                                                        */
1129 /*  CALLED BY                                                             */
1130 /*                                                                        */
1131 /*    _nxd_mqtt_packet_receive_process                                    */
1132 /*                                                                        */
1133 /*  RELEASE HISTORY                                                       */
1134 /*                                                                        */
1135 /*    DATE              NAME                      DESCRIPTION             */
1136 /*                                                                        */
1137 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1138 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1139 /*                                            resulting in version 6.1    */
1140 /*                                                                        */
1141 /**************************************************************************/
_nxd_mqtt_process_connack(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,ULONG wait_option)1142 static UINT _nxd_mqtt_process_connack(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, ULONG wait_option)
1143 {
1144 
1145 UINT    ret = NXD_MQTT_COMMUNICATION_FAILURE;
1146 MQTT_PACKET_CONNACK *connack_packet_ptr = (MQTT_PACKET_CONNACK *)(packet_ptr -> nx_packet_prepend_ptr);
1147 
1148 
1149     /* Check the length.  */
1150     if ((packet_ptr -> nx_packet_length != sizeof(MQTT_PACKET_CONNACK)) ||
1151         (connack_packet_ptr -> mqtt_connack_packet_header >> 4 != MQTT_CONTROL_PACKET_TYPE_CONNACK))
1152     {
1153         /* Invalid packet length.  Free the packet and process error. */
1154         ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1155     }
1156     else
1157     {
1158 
1159         /* Check remaining length.  */
1160         if (connack_packet_ptr -> mqtt_connack_packet_remaining_length != 2)
1161         {
1162             ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1163         }
1164         /* Follow MQTT-3.2.2-1 rule.  */
1165         else if ((client_ptr -> nxd_mqtt_clean_session) && (connack_packet_ptr -> mqtt_connack_packet_ack_flags & MQTT_CONNACK_CONNECT_FLAGS_SP))
1166         {
1167 
1168             /* Client requested clean session, and server responded with Session Present.  This is a violation. */
1169             ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1170         }
1171         else if (connack_packet_ptr -> mqtt_connack_packet_return_code >  MQTT_CONNACK_CONNECT_RETURN_CODE_NOT_AUTHORIZED)
1172         {
1173             ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1174         }
1175         else if (connack_packet_ptr -> mqtt_connack_packet_return_code > 0)
1176         {
1177 
1178             /* Pass the server return code to the application. */
1179             ret = (UINT)(NXD_MQTT_ERROR_CONNECT_RETURN_CODE + connack_packet_ptr -> mqtt_connack_packet_return_code);
1180         }
1181         else
1182         {
1183             ret = NXD_MQTT_SUCCESS;
1184 
1185             /* Obtain mutex before we modify client control block. */
1186             tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
1187 
1188             client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_CONNECTED;
1189 
1190             /* Initialize the packet identification field. */
1191             client_ptr -> nxd_mqtt_client_packet_identifier = NXD_MQTT_INITIAL_PACKET_ID_VALUE;
1192 
1193             /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
1194             if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
1195                 client_ptr -> nxd_mqtt_client_packet_identifier = 1;
1196 
1197             /* Release mutex */
1198             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1199         }
1200     }
1201 
1202     /* Check callback function.  */
1203     if (client_ptr -> nxd_mqtt_connect_notify)
1204     {
1205         client_ptr -> nxd_mqtt_connect_notify(client_ptr, ret, client_ptr -> nxd_mqtt_connect_context);
1206     }
1207 
1208     if (ret == NXD_MQTT_SUCCESS)
1209     {
1210 
1211         /* If client doesn't start with Clean Session, and there are un-acked PUBLISH messages,
1212            we shall re-publish these messages. */
1213         if ((client_ptr -> nxd_mqtt_clean_session != NX_TRUE) && (client_ptr -> message_transmit_queue_head))
1214         {
1215 
1216             tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
1217 
1218             /* There are messages from the previous session that has not been acknowledged. */
1219             ret = _nxd_mqtt_client_retransmit_message(client_ptr, wait_option);
1220 
1221             /* Release mutex */
1222             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1223         }
1224     }
1225     else
1226     {
1227 
1228         /* End connection. */
1229         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
1230     }
1231 
1232     return(ret);
1233 }
1234 
1235 /**************************************************************************/
1236 /*                                                                        */
1237 /*  FUNCTION                                               RELEASE        */
1238 /*                                                                        */
1239 /*    _nxd_mqtt_process_publish_packet                    PORTABLE C      */
1240 /*                                                           6.1          */
1241 /*  AUTHOR                                                                */
1242 /*                                                                        */
1243 /*    Yuxin Zhou, Microsoft Corporation                                   */
1244 /*                                                                        */
1245 /*  DESCRIPTION                                                           */
1246 /*                                                                        */
1247 /*    This internal function processes a packet and parses topic and      */
1248 /*    message.                                                            */
1249 /*                                                                        */
1250 /*                                                                        */
1251 /*  INPUT                                                                 */
1252 /*                                                                        */
1253 /*    packet_ptr                            Pointer to the packet         */
1254 /*    topic_offset_ptr                      Return topic offset           */
1255 /*    topic_length_ptr                      Return topic length           */
1256 /*    message_offset_ptr                    Return message offset         */
1257 /*    message_length_ptr                    Return message length         */
1258 /*                                                                        */
1259 /*  OUTPUT                                                                */
1260 /*                                                                        */
1261 /*    Status                                                              */
1262 /*                                                                        */
1263 /*  CALLS                                                                 */
1264 /*                                                                        */
1265 /*    _nxd_mqtt_read_remaining_length                                     */
1266 /*    nx_packet_data_extract_offset                                       */
1267 /*                                                                        */
1268 /*                                                                        */
1269 /*  CALLED BY                                                             */
1270 /*                                                                        */
1271 /*    _nxd_mqtt_process_publish                                           */
1272 /*                                                                        */
1273 /*  RELEASE HISTORY                                                       */
1274 /*                                                                        */
1275 /*    DATE              NAME                      DESCRIPTION             */
1276 /*                                                                        */
1277 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1278 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1279 /*                                            resulting in version 6.1    */
1280 /*                                                                        */
1281 /**************************************************************************/
_nxd_mqtt_process_publish_packet(NX_PACKET * packet_ptr,ULONG * topic_offset_ptr,USHORT * topic_length_ptr,ULONG * message_offset_ptr,ULONG * message_length_ptr)1282 UINT _nxd_mqtt_process_publish_packet(NX_PACKET *packet_ptr, ULONG *topic_offset_ptr, USHORT *topic_length_ptr,
1283                                       ULONG *message_offset_ptr, ULONG *message_length_ptr)
1284 {
1285 UCHAR  QoS;
1286 UINT   remaining_length = 0;
1287 UINT   topic_length;
1288 ULONG  offset;
1289 UCHAR  bytes[2];
1290 ULONG  bytes_copied;
1291 
1292 
1293     QoS = (UCHAR)((*(packet_ptr -> nx_packet_prepend_ptr) & MQTT_PUBLISH_QOS_LEVEL_FIELD) >> 1);
1294 
1295     if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1296     {
1297         return(NXD_MQTT_INVALID_PACKET);
1298     }
1299 
1300     if (remaining_length < 2)
1301     {
1302         return(NXD_MQTT_INVALID_PACKET);
1303     }
1304 
1305     /* Get topic length fields. */
1306     if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1307         (bytes_copied != sizeof(bytes)))
1308     {
1309         return(NXD_MQTT_INVALID_PACKET);
1310     }
1311 
1312     topic_length = (UINT)(*(bytes) << 8) | (*(bytes + 1));
1313 
1314     if (topic_length > remaining_length - 2u)
1315     {
1316         return(NXD_MQTT_INVALID_PACKET);
1317     }
1318 
1319     *topic_offset_ptr = offset + 2;
1320     *topic_length_ptr = (USHORT)topic_length;
1321 
1322     remaining_length = remaining_length - topic_length - 2;
1323     if ((QoS == 1) || (QoS == 2))
1324     {
1325         offset += 2 + 2 + topic_length;
1326 
1327         if (remaining_length < 2)
1328         {
1329             return(NXD_MQTT_INVALID_PACKET);
1330         }
1331         remaining_length = remaining_length - 2;
1332     }
1333     else
1334     {
1335         offset += 2 + topic_length;
1336     }
1337 
1338     *message_offset_ptr = offset;
1339     *message_length_ptr = (ULONG)remaining_length;
1340 
1341     /* Return */
1342     return(NXD_MQTT_SUCCESS);
1343 }
1344 /**************************************************************************/
1345 /*                                                                        */
1346 /*  FUNCTION                                               RELEASE        */
1347 /*                                                                        */
1348 /*    _nxd_mqtt_process_publish                           PORTABLE C      */
1349 /*                                                           6.2.0        */
1350 /*  AUTHOR                                                                */
1351 /*                                                                        */
1352 /*    Yuxin Zhou, Microsoft Corporation                                   */
1353 /*                                                                        */
1354 /*  DESCRIPTION                                                           */
1355 /*                                                                        */
1356 /*    This internal function process a publish message from the broker.   */
1357 /*                                                                        */
1358 /*                                                                        */
1359 /*  INPUT                                                                 */
1360 /*                                                                        */
1361 /*    client_ptr                            Pointer to MQTT Client        */
1362 /*    packet_ptr                            Pointer to the packet         */
1363 /*                                                                        */
1364 /*  OUTPUT                                                                */
1365 /*                                                                        */
1366 /*    NX_TRUE - packet is consumed                                        */
1367 /*    NX_FALSE - packet is not consumed                                   */
1368 /*                                                                        */
1369 /*  CALLS                                                                 */
1370 /*                                                                        */
1371 /*    [receive_notify]                      User supplied receive         */
1372 /*                                            callback function           */
1373 /*    _nxd_mqtt_packet_allocate                                           */
1374 /*    _nxd_mqtt_packet_send                                               */
1375 /*    nx_packet_release                                                   */
1376 /*    nx_secure_tls_session_send                                          */
1377 /*    _nxd_mqtt_process_publish_packet                                    */
1378 /*    _nxd_mqtt_copy_transmit_packet                                      */
1379 /*                                                                        */
1380 /*                                                                        */
1381 /*  CALLED BY                                                             */
1382 /*                                                                        */
1383 /*    _nxd_mqtt_packet_receive_process                                    */
1384 /*                                                                        */
1385 /*  RELEASE HISTORY                                                       */
1386 /*                                                                        */
1387 /*    DATE              NAME                      DESCRIPTION             */
1388 /*                                                                        */
1389 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1390 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1391 /*                                            resulting in version 6.1    */
1392 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
1393 /*                                            the logic of sending packet,*/
1394 /*                                            resulting in version 6.2.0  */
1395 /*                                                                        */
1396 /**************************************************************************/
_nxd_mqtt_process_publish(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1397 static UINT _nxd_mqtt_process_publish(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1398 {
1399 MQTT_PACKET_PUBLISH_RESPONSE *pubresp_ptr;
1400 UINT                          status;
1401 USHORT                        packet_id = 0;
1402 UCHAR                         QoS;
1403 UINT                          enqueue_message = 0;
1404 NX_PACKET                    *transmit_packet_ptr;
1405 UINT                          remaining_length = 0;
1406 UINT                          packet_consumed = NX_FALSE;
1407 UCHAR                         fixed_header;
1408 USHORT                        transmit_packet_id;
1409 UINT                          topic_length;
1410 ULONG                         offset;
1411 UCHAR                         bytes[2];
1412 ULONG                         bytes_copied;
1413 
1414     QoS = (UCHAR)((*(packet_ptr -> nx_packet_prepend_ptr) & MQTT_PUBLISH_QOS_LEVEL_FIELD) >> 1);
1415 
1416     if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1417     {
1418         return(NX_FALSE);
1419     }
1420 
1421     if (remaining_length < 2)
1422     {
1423         return(NXD_MQTT_INVALID_PACKET);
1424     }
1425 
1426     /* Get topic length fields. */
1427     if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1428         (bytes_copied != sizeof(bytes)))
1429     {
1430         return(NXD_MQTT_INVALID_PACKET);
1431     }
1432 
1433     topic_length = (UINT)(*(bytes) << 8) | (*(bytes + 1));
1434 
1435     if (topic_length > remaining_length - 2u)
1436     {
1437         return(NXD_MQTT_INVALID_PACKET);
1438     }
1439 
1440     if (QoS == 0)
1441     {
1442         enqueue_message = 1;
1443     }
1444     else
1445     {
1446         /* QoS 1 or QoS 2 messages. */
1447         /* Get packet id fields. */
1448         if (nx_packet_data_extract_offset(packet_ptr, offset + 2 + topic_length, &bytes, sizeof(bytes), &bytes_copied))
1449         {
1450             return(NXD_MQTT_INVALID_PACKET);
1451         }
1452 
1453         packet_id = (USHORT)(((*bytes) << 8) | (*(bytes + 1)));
1454 
1455         /* Look for an existing transmit packets with the same packet id */
1456         transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1457 
1458         while (transmit_packet_ptr)
1459         {
1460             fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1461             transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1462             if ((transmit_packet_id == packet_id) &&
1463                 ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_PUBREC << 4)))
1464             {
1465 
1466                 /* Found a packet containing the packet_id */
1467                 break;
1468             }
1469             transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1470         }
1471 
1472         if (transmit_packet_ptr)
1473         {
1474             /* This published data is already in our system.  No need to deliver this message to the application. */
1475             enqueue_message = 0;
1476         }
1477         else
1478         {
1479             enqueue_message = 1;
1480         }
1481     }
1482 
1483     if (enqueue_message)
1484     {
1485         if (packet_ptr -> nx_packet_length > (offset + remaining_length))
1486         {
1487 
1488             /* This packet contains multiple messages. */
1489             if (nx_packet_copy(packet_ptr, &packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_NO_WAIT))
1490             {
1491 
1492                 /* No packet is available. */
1493                 return(NX_FALSE);
1494             }
1495         }
1496         else
1497         {
1498             packet_consumed = NX_TRUE;
1499         }
1500 
1501         /* Increment the queue depth counter. */
1502         client_ptr -> message_receive_queue_depth++;
1503 
1504         if (client_ptr -> message_receive_queue_head == NX_NULL)
1505         {
1506             client_ptr -> message_receive_queue_head = packet_ptr;
1507         }
1508         else
1509         {
1510             client_ptr -> message_receive_queue_tail -> nx_packet_queue_next = packet_ptr;
1511         }
1512         client_ptr -> message_receive_queue_tail = packet_ptr;
1513 
1514         /* Invoke the user-defined receive notify function if it is set. */
1515         if (client_ptr -> nxd_mqtt_client_receive_notify)
1516         {
1517             (*(client_ptr -> nxd_mqtt_client_receive_notify))(client_ptr, client_ptr -> message_receive_queue_depth);
1518         }
1519     }
1520 
1521     /* If the message QoS level is 0, we are done. */
1522     if (QoS == 0)
1523     {
1524         /* Return */
1525         return(packet_consumed);
1526     }
1527 
1528     /* Send out proper ACKs for QoS 1 and 2 messages. */
1529     /* Allocate a new packet so we can send out a response. */
1530     status = _nxd_mqtt_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
1531     if (status)
1532     {
1533         /* Packet allocation fails. */
1534         return(packet_consumed);
1535     }
1536 
1537     /* Fill in the packet ID */
1538     pubresp_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)(packet_ptr -> nx_packet_prepend_ptr);
1539     pubresp_ptr -> mqtt_publish_response_packet_remaining_length = 2;
1540     pubresp_ptr -> mqtt_publish_response_packet_packet_identifier_msb = (UCHAR)(packet_id >> 8);
1541     pubresp_ptr -> mqtt_publish_response_packet_packet_identifier_lsb = (UCHAR)(packet_id & 0xFF);
1542 
1543     if (QoS == 1)
1544     {
1545 
1546         pubresp_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBACK << 4;
1547     }
1548     else
1549     {
1550         pubresp_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBREC << 4;
1551     }
1552 
1553     packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + sizeof(MQTT_PACKET_PUBLISH_RESPONSE);
1554     packet_ptr -> nx_packet_length = sizeof(MQTT_PACKET_PUBLISH_RESPONSE);
1555 
1556     if (QoS == 2)
1557     {
1558 
1559         /* Copy packet for checking duplicate publish packet. */
1560         if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
1561                                            packet_id, NX_FALSE, NX_WAIT_FOREVER))
1562         {
1563 
1564             /* Release the packet. */
1565             nx_packet_release(packet_ptr);
1566             return(packet_consumed);
1567         }
1568         if (client_ptr -> message_transmit_queue_head == NX_NULL)
1569         {
1570             client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
1571         }
1572         else
1573         {
1574             client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
1575         }
1576         client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
1577     }
1578 
1579     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1580 
1581     /* Send packet to server.  */
1582     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
1583 
1584     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
1585     if (status)
1586     {
1587 
1588         /* Release the packet. */
1589         nx_packet_release(packet_ptr);
1590     }
1591     else
1592     {
1593         /* Update the timeout value. */
1594         client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
1595     }
1596 
1597     /* Return */
1598     return(packet_consumed);
1599 }
1600 
1601 /**************************************************************************/
1602 /*                                                                        */
1603 /*  FUNCTION                                               RELEASE        */
1604 /*                                                                        */
1605 /*    _nxd_mqtt_process_publish_response                  PORTABLE C      */
1606 /*                                                           6.2.0        */
1607 /*  AUTHOR                                                                */
1608 /*                                                                        */
1609 /*    Yuxin Zhou, Microsoft Corporation                                   */
1610 /*                                                                        */
1611 /*  DESCRIPTION                                                           */
1612 /*                                                                        */
1613 /*    This internal function process a publish response messages.         */
1614 /*    Publish Response messages are: PUBACK, PUBREC, PUBREL               */
1615 /*                                                                        */
1616 /*  INPUT                                                                 */
1617 /*                                                                        */
1618 /*    client_ptr                            Pointer to MQTT Client        */
1619 /*    packet_ptr                            Pointer to the packet         */
1620 /*                                                                        */
1621 /*  OUTPUT                                                                */
1622 /*                                                                        */
1623 /*    Status                                                              */
1624 /*                                                                        */
1625 /*  CALLS                                                                 */
1626 /*                                                                        */
1627 /*    [nxd_mqtt_client_receive_notify]      User supplied publish         */
1628 /*                                            callback function           */
1629 /*    _nxd_mqtt_release_transmit_packet                                   */
1630 /*    _nxd_mqtt_packet_send                                               */
1631 /*                                                                        */
1632 /*                                                                        */
1633 /*  CALLED BY                                                             */
1634 /*                                                                        */
1635 /*    _nxd_mqtt_packet_receive_process                                    */
1636 /*                                                                        */
1637 /*  RELEASE HISTORY                                                       */
1638 /*                                                                        */
1639 /*    DATE              NAME                      DESCRIPTION             */
1640 /*                                                                        */
1641 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1642 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
1643 /*                                            added ack receive notify,   */
1644 /*                                            resulting in version 6.1    */
1645 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
1646 /*                                            the logic of sending packet,*/
1647 /*                                            resulting in version 6.2.0  */
1648 /*                                                                        */
1649 /**************************************************************************/
_nxd_mqtt_process_publish_response(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1650 static UINT _nxd_mqtt_process_publish_response(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1651 {
1652 MQTT_PACKET_PUBLISH_RESPONSE *response_ptr;
1653 USHORT                        packet_id;
1654 NX_PACKET                    *previous_packet_ptr;
1655 NX_PACKET                    *transmit_packet_ptr;
1656 NX_PACKET                    *response_packet;
1657 UINT                          ret;
1658 UCHAR                         fixed_header;
1659 USHORT                        transmit_packet_id;
1660 
1661     response_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)(packet_ptr -> nx_packet_prepend_ptr);
1662 
1663     /* Validate the packet. */
1664     if (response_ptr -> mqtt_publish_response_packet_remaining_length != 2)
1665     {
1666         /* Invalid remaining_length value. Return 1 so the caller can release
1667            the packet. */
1668 
1669         return(1);
1670     }
1671 
1672     packet_id = (USHORT)((response_ptr -> mqtt_publish_response_packet_packet_identifier_msb << 8) |
1673                          (response_ptr -> mqtt_publish_response_packet_packet_identifier_lsb));
1674 
1675     /* Search all the outstanding transmitted packets for a match. */
1676     previous_packet_ptr = NX_NULL;
1677     transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1678     while (transmit_packet_ptr)
1679     {
1680         fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1681         transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1682         if (transmit_packet_id == packet_id)
1683         {
1684 
1685             /* Found the matching packet id */
1686             if (((response_ptr -> mqtt_publish_response_packet_header) >> 4) == MQTT_CONTROL_PACKET_TYPE_PUBACK)
1687             {
1688 
1689                 /* PUBACK is the response to a PUBLISH packet with QoS Level 1*/
1690                 /* Therefore we verify that packet contains PUBLISH packet with QoS level 1*/
1691                 if ((fixed_header & 0xF6) == ((MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4) | MQTT_PUBLISH_QOS_LEVEL_1))
1692                 {
1693 
1694                     /* Check ack notify function.  */
1695                     if (client_ptr -> nxd_mqtt_ack_receive_notify)
1696                     {
1697 
1698                         /* Call notify function. Note: user routine should not release the packet.  */
1699                         client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_PUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1700                     }
1701 
1702                     /* QoS Level1 message receives an ACK. */
1703                     /* This message can be released. */
1704                     _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1705 
1706                     /* Return with value 1, so the caller will release packet_ptr */
1707                     return(1);
1708                 }
1709             }
1710             else if (((response_ptr -> mqtt_publish_response_packet_header) >> 4) == MQTT_CONTROL_PACKET_TYPE_PUBREL)
1711             {
1712 
1713                 /* QoS 2 publish Release received, part 2. */
1714                 /* Therefore we verify that packet contains PUBLISH packet with QoS level 2*/
1715                 if ((fixed_header & 0xF6) == (MQTT_CONTROL_PACKET_TYPE_PUBREC << 4))
1716                 {
1717 
1718                     /* QoS Level2 message receives an ACK. */
1719                     /* This message can be released. */
1720                     /* Send PUBCOMP */
1721 
1722                     /* Allocate a packet to send the response. */
1723                     ret = _nxd_mqtt_packet_allocate(client_ptr, &response_packet, NX_WAIT_FOREVER);
1724                     if (ret)
1725                     {
1726                         return(1);
1727                     }
1728 
1729                     if (4u > ((ULONG)(response_packet -> nx_packet_data_end) - (ULONG)(response_packet -> nx_packet_append_ptr)))
1730                     {
1731                         nx_packet_release(response_packet);
1732 
1733                         /* Packet buffer is too small to hold the message. */
1734                         return(NX_SIZE_ERROR);
1735                     }
1736 
1737                     response_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)response_packet -> nx_packet_prepend_ptr;
1738 
1739                     response_ptr ->  mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBCOMP << 4;
1740                     response_ptr ->  mqtt_publish_response_packet_remaining_length = 2;
1741 
1742                     /* Fill in packet ID */
1743                     response_packet -> nx_packet_prepend_ptr[3] = packet_ptr -> nx_packet_prepend_ptr[3];
1744                     response_packet -> nx_packet_prepend_ptr[4] = packet_ptr -> nx_packet_prepend_ptr[4];
1745                     response_packet -> nx_packet_append_ptr = response_packet -> nx_packet_prepend_ptr + 4;
1746                     response_packet -> nx_packet_length = 4;
1747 
1748                     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1749 
1750                     /* Send packet to server.  */
1751                     ret = _nxd_mqtt_packet_send(client_ptr, response_packet, NX_WAIT_FOREVER);
1752 
1753                     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
1754 
1755                     /* Update the timeout value. */
1756                     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
1757 
1758                     if (ret)
1759                     {
1760                         nx_packet_release(response_packet);
1761                     }
1762 
1763                     /* Check ack notify function.  */
1764                     if (client_ptr -> nxd_mqtt_ack_receive_notify)
1765                     {
1766 
1767                         /* Call notify function. Note: user routine should not release the packet.  */
1768                         client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_PUBREL, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1769                     }
1770 
1771                     /* This packet can be released. */
1772                     _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1773 
1774                     /* Return with value 1, so the caller will release packet_ptr */
1775                     return(1);
1776                 }
1777             }
1778         }
1779 
1780         /* Move on to the next packet */
1781         previous_packet_ptr = transmit_packet_ptr;
1782         transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1783     }
1784 
1785     /* nothing is found.  Return 1 to release the packet.*/
1786     return(1);
1787 }
1788 
1789 /**************************************************************************/
1790 /*                                                                        */
1791 /*  FUNCTION                                               RELEASE        */
1792 /*                                                                        */
1793 /*    _nxd_mqtt_process_sub_unsub_ack                     PORTABLE C      */
1794 /*                                                           6.1          */
1795 /*  AUTHOR                                                                */
1796 /*                                                                        */
1797 /*    Yuxin Zhou, Microsoft Corporation                                   */
1798 /*                                                                        */
1799 /*  DESCRIPTION                                                           */
1800 /*                                                                        */
1801 /*    This internal function process an ACK message for subscribe         */
1802 /*    or unsubscribe request.                                             */
1803 /*                                                                        */
1804 /*  INPUT                                                                 */
1805 /*                                                                        */
1806 /*    client_ptr                            Pointer to MQTT Client        */
1807 /*    packet_ptr                            Pointer to the packet         */
1808 /*                                                                        */
1809 /*  OUTPUT                                                                */
1810 /*                                                                        */
1811 /*    Status                                                              */
1812 /*                                                                        */
1813 /*  CALLS                                                                 */
1814 /*                                                                        */
1815 /*    [nxd_mqtt_client_receive_notify]      User supplied publish         */
1816 /*                                            callback function           */
1817 /*    _nxd_mqtt_release_transmit_packet                                   */
1818 /*                                          Release the memory block      */
1819 /*    _nxd_mqtt_read_remaining_length       Skip the remaining length     */
1820 /*                                            field                       */
1821 /*                                                                        */
1822 /*  CALLED BY                                                             */
1823 /*                                                                        */
1824 /*    _nxd_mqtt_packet_receive_process                                    */
1825 /*                                                                        */
1826 /*  RELEASE HISTORY                                                       */
1827 /*                                                                        */
1828 /*    DATE              NAME                      DESCRIPTION             */
1829 /*                                                                        */
1830 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1831 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
1832 /*                                            added ack receive notify,   */
1833 /*                                            resulting in version 6.1    */
1834 /*                                                                        */
1835 /**************************************************************************/
_nxd_mqtt_process_sub_unsub_ack(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1836 static UINT _nxd_mqtt_process_sub_unsub_ack(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1837 {
1838 
1839 USHORT     packet_id;
1840 NX_PACKET *previous_packet_ptr;
1841 NX_PACKET *transmit_packet_ptr;
1842 UCHAR      response_header;
1843 UCHAR      fixed_header;
1844 USHORT     transmit_packet_id;
1845 UINT       remaining_length;
1846 ULONG      offset;
1847 UCHAR      bytes[2];
1848 ULONG      bytes_copied;
1849 
1850 
1851     response_header = *(packet_ptr -> nx_packet_prepend_ptr);
1852 
1853     if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1854     {
1855 
1856         /* Unable to process the sub/unsub ack.  Simply return and release the packet. */
1857         return(NXD_MQTT_INVALID_PACKET);
1858     }
1859 
1860     /* Get packet id fields. */
1861     if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1862         (bytes_copied != sizeof(bytes)))
1863     {
1864         return(NXD_MQTT_INVALID_PACKET);
1865     }
1866 
1867     packet_id = (USHORT)(((*bytes) << 8) | (*(bytes + 1)));
1868 
1869     /* Search all the outstanding transmitted packets for a match. */
1870     previous_packet_ptr = NX_NULL;
1871     transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1872     while (transmit_packet_ptr)
1873     {
1874         fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1875         transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1876         if (transmit_packet_id == packet_id)
1877         {
1878 
1879             /* Found the matching packet id */
1880             if (((response_header >> 4) == MQTT_CONTROL_PACKET_TYPE_SUBACK) &&
1881                 ((fixed_header >> 4) == MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE))
1882             {
1883                 /* Validate the packet. */
1884                 if (remaining_length != 3)
1885                 {
1886                     /* Invalid remaining_length value. */
1887                     return(1);
1888                 }
1889 
1890                 /* Check ack notify function.  */
1891                 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1892                 {
1893 
1894                     /* Call notify function. Note: user routine should not release the packet.  */
1895                     client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_SUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1896                 }
1897 
1898                 /* Release the transmit packet. */
1899                 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1900 
1901                 return(1);
1902             }
1903             else if (((response_header >> 4) == MQTT_CONTROL_PACKET_TYPE_UNSUBACK) &&
1904                      ((fixed_header >> 4) == MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE))
1905             {
1906                 /* Validate the packet. */
1907                 if (remaining_length != 2)
1908                 {
1909                     /* Invalid remaining_length value. */
1910                     return(1);
1911                 }
1912 
1913                 /* Check ack notify function.  */
1914                 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1915                 {
1916 
1917                     /* Call notify function. Note: user routine should not release the packet.  */
1918                     client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_UNSUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1919                 }
1920 
1921                 /* Unsubscribe succeeded. */
1922                 /* Release the transmit packet. */
1923                 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1924 
1925                 return(1);
1926             }
1927         }
1928 
1929         /* Move on to the next packet */
1930         previous_packet_ptr = transmit_packet_ptr;
1931         transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1932     }
1933     return(1);
1934 }
1935 
1936 
1937 /**************************************************************************/
1938 /*                                                                        */
1939 /*  FUNCTION                                               RELEASE        */
1940 /*                                                                        */
1941 /*    _nxd_mqtt_process_pingresp                          PORTABLE C      */
1942 /*                                                           6.1          */
1943 /*  AUTHOR                                                                */
1944 /*                                                                        */
1945 /*    Yuxin Zhou, Microsoft Corporation                                   */
1946 /*                                                                        */
1947 /*  DESCRIPTION                                                           */
1948 /*                                                                        */
1949 /*    This internal function process a PINGRESP message.                  */
1950 /*                                                                        */
1951 /*  INPUT                                                                 */
1952 /*                                                                        */
1953 /*    client_ptr                            Pointer to MQTT Client        */
1954 /*    packet_ptr                            Pointer to the packet         */
1955 /*                                                                        */
1956 /*  OUTPUT                                                                */
1957 /*                                                                        */
1958 /*    Status                                                              */
1959 /*                                                                        */
1960 /*  CALLS                                                                 */
1961 /*                                                                        */
1962 /*    None                                                                */
1963 /*                                            callback function           */
1964 /*                                                                        */
1965 /*  CALLED BY                                                             */
1966 /*                                                                        */
1967 /*    _nxd_mqtt_packet_receive_process                                    */
1968 /*                                                                        */
1969 /*  RELEASE HISTORY                                                       */
1970 /*                                                                        */
1971 /*    DATE              NAME                      DESCRIPTION             */
1972 /*                                                                        */
1973 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1974 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1975 /*                                            resulting in version 6.1    */
1976 /*                                                                        */
1977 /**************************************************************************/
_nxd_mqtt_process_pingresp(NXD_MQTT_CLIENT * client_ptr)1978 static VOID _nxd_mqtt_process_pingresp(NXD_MQTT_CLIENT *client_ptr)
1979 {
1980 
1981 
1982     /* If there is an outstanding ping, mark it as responded. */
1983     if (client_ptr -> nxd_mqtt_ping_not_responded == NX_TRUE)
1984     {
1985         client_ptr -> nxd_mqtt_ping_not_responded = NX_FALSE;
1986 
1987         client_ptr -> nxd_mqtt_ping_sent_time = 0;
1988     }
1989 
1990     return;
1991 }
1992 
1993 
1994 /**************************************************************************/
1995 /*                                                                        */
1996 /*  FUNCTION                                               RELEASE        */
1997 /*                                                                        */
1998 /*    _nxd_mqtt_process_disconnect                        PORTABLE C      */
1999 /*                                                           6.1          */
2000 /*  AUTHOR                                                                */
2001 /*                                                                        */
2002 /*    Yuxin Zhou, Microsoft Corporation                                   */
2003 /*                                                                        */
2004 /*  DESCRIPTION                                                           */
2005 /*                                                                        */
2006 /*    This internal function process a DISCONNECT message.                */
2007 /*                                                                        */
2008 /*  INPUT                                                                 */
2009 /*                                                                        */
2010 /*    client_ptr                            Pointer to MQTT Client        */
2011 /*    packet_ptr                            Pointer to the packet         */
2012 /*                                                                        */
2013 /*  OUTPUT                                                                */
2014 /*                                                                        */
2015 /*    None                                                                */
2016 /*                                                                        */
2017 /*  CALLS                                                                 */
2018 /*                                                                        */
2019 /*   nx_secure_tls_session_send                                           */
2020 /*   nx_tcp_socket_disconnect                                             */
2021 /*   nx_tcp_client_socket_unbind                                          */
2022 /*   _nxd_mqtt_release_transmit_packet                                    */
2023 /*   _nxd_mqtt_release_receive_packet                                     */
2024 /*   _nxd_mqtt_client_connection_end                                      */
2025 /*                                                                        */
2026 /*  CALLED BY                                                             */
2027 /*                                                                        */
2028 /*   _nxd_mqtt_packet_receive_process                                     */
2029 /*                                                                        */
2030 /*  RELEASE HISTORY                                                       */
2031 /*                                                                        */
2032 /*    DATE              NAME                      DESCRIPTION             */
2033 /*                                                                        */
2034 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2035 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2036 /*                                            resulting in version 6.1    */
2037 /*                                                                        */
2038 /**************************************************************************/
_nxd_mqtt_process_disconnect(NXD_MQTT_CLIENT * client_ptr)2039 static VOID _nxd_mqtt_process_disconnect(NXD_MQTT_CLIENT *client_ptr)
2040 {
2041 NX_PACKET  *previous = NX_NULL;
2042 NX_PACKET  *current;
2043 NX_PACKET  *next;
2044 UINT        disconnect_callback = NX_FALSE;
2045 UINT        status;
2046 UCHAR       fixed_header;
2047 
2048     if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2049     {
2050         /* State changes from CONNECTED TO IDLE.  Call disconnect notify callback
2051            if the function is set. */
2052         disconnect_callback = NX_TRUE;
2053     }
2054     else if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTING)
2055     {
2056 
2057         /* If state isn't CONNECTED or CONNECTING, just return. */
2058         return;
2059     }
2060 
2061     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2062 
2063     /* End connection. */
2064     _nxd_mqtt_client_connection_end(client_ptr, NXD_MQTT_SOCKET_TIMEOUT);
2065 
2066     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
2067 
2068     /* Free up sub/unsub packets on the transmit queue. */
2069     current = client_ptr -> message_transmit_queue_head;
2070 
2071     while (current)
2072     {
2073         next = current -> nx_packet_queue_next;
2074         fixed_header = *(current -> nx_packet_prepend_ptr);
2075 
2076         if (((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4)) ||
2077             ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE << 4)))
2078         {
2079             _nxd_mqtt_release_transmit_packet(client_ptr, current, previous);
2080         }
2081         else
2082         {
2083             previous = current;
2084         }
2085         current = next;
2086     }
2087 
2088     /* If a callback notification is defined, call it now. */
2089     if ((disconnect_callback == NX_TRUE) && (client_ptr -> nxd_mqtt_disconnect_notify))
2090     {
2091         client_ptr -> nxd_mqtt_disconnect_notify(client_ptr);
2092     }
2093 
2094     /* If a connect callback notification is defined and is still in connecting stage, call it now. */
2095     if ((disconnect_callback == NX_FALSE) && (client_ptr -> nxd_mqtt_connect_notify))
2096     {
2097         client_ptr -> nxd_mqtt_connect_notify(client_ptr, NXD_MQTT_CONNECT_FAILURE, client_ptr -> nxd_mqtt_connect_context);
2098     }
2099 
2100     if (status == TX_SUCCESS)
2101     {
2102         /* Remove all the packets in the receive queue. */
2103         while (client_ptr -> message_receive_queue_head)
2104         {
2105             _nxd_mqtt_release_receive_packet(client_ptr, client_ptr -> message_receive_queue_head, NX_NULL);
2106         }
2107         client_ptr -> message_receive_queue_depth = 0;
2108 
2109         /* Clear the MQTT_PACKET_RECEIVE_EVENT */
2110 #ifndef NXD_MQTT_CLOUD_ENABLE
2111         tx_event_flags_set(&client_ptr -> nxd_mqtt_events, ~MQTT_PACKET_RECEIVE_EVENT, TX_AND);
2112 #else
2113         nx_cloud_module_event_clear(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PACKET_RECEIVE_EVENT);
2114 #endif /* NXD_MQTT_CLOUD_ENABLE */
2115     }
2116 
2117     /* Clear flags if keep alive is enabled. */
2118     if (client_ptr -> nxd_mqtt_keepalive)
2119     {
2120         client_ptr -> nxd_mqtt_ping_not_responded = 0;
2121         client_ptr -> nxd_mqtt_ping_sent_time = 0;
2122     }
2123 
2124     /* Clean up the information when disconnecting. */
2125     client_ptr -> nxd_mqtt_client_username = NX_NULL;
2126     client_ptr -> nxd_mqtt_client_password = NX_NULL;
2127     client_ptr -> nxd_mqtt_client_will_topic = NX_NULL;
2128     client_ptr -> nxd_mqtt_client_will_message = NX_NULL;
2129     client_ptr -> nxd_mqtt_client_will_qos_retain = 0;
2130 
2131     /* Release current processing packet. */
2132     if (client_ptr -> nxd_mqtt_client_processing_packet)
2133     {
2134         nx_packet_release(client_ptr -> nxd_mqtt_client_processing_packet);
2135         client_ptr -> nxd_mqtt_client_processing_packet = NX_NULL;
2136     }
2137 
2138     return;
2139 }
2140 
2141 
2142 /**************************************************************************/
2143 /*                                                                        */
2144 /*  FUNCTION                                               RELEASE        */
2145 /*                                                                        */
2146 /*    _nxd_mqtt_packet_receive_process                    PORTABLE C      */
2147 /*                                                           6.2.0        */
2148 /*  AUTHOR                                                                */
2149 /*                                                                        */
2150 /*    Yuxin Zhou, Microsoft Corporation                                   */
2151 /*                                                                        */
2152 /*  DESCRIPTION                                                           */
2153 /*                                                                        */
2154 /*    This internal function process MQTT message.                        */
2155 /*    NOTE: MQTT Mutex is NOT obtained on entering this function.         */
2156 /*    Therefore it shouldn't hold the mutex when it exists this function. */
2157 /*                                                                        */
2158 /*  INPUT                                                                 */
2159 /*                                                                        */
2160 /*    client_ptr                            Pointer to MQTT Client        */
2161 /*                                                                        */
2162 /*  OUTPUT                                                                */
2163 /*                                                                        */
2164 /*    None                                                                */
2165 /*                                                                        */
2166 /*  CALLS                                                                 */
2167 /*                                                                        */
2168 /*    nx_secure_tls_session_receive                                       */
2169 /*    nx_tcp_socket_receive                                               */
2170 /*    _nxd_mqtt_process_publish                                           */
2171 /*    _nxd_mqtt_process_publish_response                                  */
2172 /*    _nxd_mqtt_process_sub_unsub_ack                                     */
2173 /*    _nxd_mqtt_process_pingresp                                          */
2174 /*    _nxd_mqtt_process_disconnect                                        */
2175 /*    nx_packet_release                                                   */
2176 /*                                                                        */
2177 /*  CALLED BY                                                             */
2178 /*                                                                        */
2179 /*    _nxd_mqtt_client_event_process                                      */
2180 /*                                                                        */
2181 /*  RELEASE HISTORY                                                       */
2182 /*                                                                        */
2183 /*    DATE              NAME                      DESCRIPTION             */
2184 /*                                                                        */
2185 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2186 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2187 /*                                            resulting in version 6.1    */
2188 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
2189 /*                                            mqtt over websocket,        */
2190 /*                                            resulting in version 6.2.0  */
2191 /*                                                                        */
2192 /**************************************************************************/
_nxd_mqtt_packet_receive_process(NXD_MQTT_CLIENT * client_ptr)2193 static VOID _nxd_mqtt_packet_receive_process(NXD_MQTT_CLIENT *client_ptr)
2194 {
2195 NX_PACKET *packet_ptr;
2196 NX_PACKET *previous_packet_ptr;
2197 UINT       status;
2198 UCHAR      packet_type;
2199 UINT       remaining_length;
2200 UINT       packet_consumed;
2201 ULONG      offset;
2202 ULONG      bytes_copied;
2203 ULONG      packet_length;
2204 
2205     for (;;)
2206     {
2207 
2208         /* Release the mutex. */
2209         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2210 
2211         /* Make a receive call. */
2212         status = _nxd_mqtt_packet_receive(client_ptr, &packet_ptr, NX_NO_WAIT);
2213 
2214         if (status != NX_SUCCESS)
2215         {
2216             if ((status != NX_NO_PACKET) && (status != NX_CONTINUE))
2217             {
2218 
2219                 /* Network issue. Close the MQTT session. */
2220 #ifndef NXD_MQTT_CLOUD_ENABLE
2221                 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_NETWORK_DISCONNECT_EVENT, TX_OR);
2222 #else
2223                 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_NETWORK_DISCONNECT_EVENT);
2224 #endif /* NXD_MQTT_CLOUD_ENABLE */
2225             }
2226 
2227             break;
2228         }
2229 
2230         /* Obtain the mutex. */
2231         tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
2232 
2233         /* Is there a packet waiting for processing? */
2234         if (client_ptr -> nxd_mqtt_client_processing_packet)
2235         {
2236 
2237             /* Yes. Link received packet to existing one. */
2238             if (client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last)
2239             {
2240                 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last -> nx_packet_next = packet_ptr;
2241             }
2242             else
2243             {
2244                 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_next = packet_ptr;
2245             }
2246             if (packet_ptr -> nx_packet_last)
2247             {
2248                 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last = packet_ptr -> nx_packet_last;
2249             }
2250             else
2251             {
2252                 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last = packet_ptr;
2253             }
2254             client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_length += packet_ptr -> nx_packet_length;
2255 
2256             /* Start to process existing packet. */
2257             packet_ptr = client_ptr -> nxd_mqtt_client_processing_packet;
2258             client_ptr -> nxd_mqtt_client_processing_packet = NX_NULL;
2259         }
2260 
2261         /* Check notify function.  */
2262         if (client_ptr -> nxd_mqtt_packet_receive_notify)
2263         {
2264 
2265             /* Call notify function. Return NX_TRUE if the packet has been consumed.  */
2266             if (client_ptr -> nxd_mqtt_packet_receive_notify(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_packet_receive_context) == NX_TRUE)
2267             {
2268                 continue;
2269             }
2270         }
2271 
2272         packet_consumed = NX_FALSE;
2273         while (packet_ptr)
2274         {
2275             /* Parse the incoming packet. */
2276             status = _nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset);
2277             if (status == NXD_MQTT_PARTIAL_PACKET)
2278             {
2279 
2280                 /* We only have partial MQTT message.
2281                  * Put it to waiting list for more packets. */
2282                 client_ptr -> nxd_mqtt_client_processing_packet = packet_ptr;
2283                 packet_consumed = NX_TRUE;
2284                 break;
2285             }
2286             else if (status)
2287             {
2288 
2289                 /* Invalid packet. */
2290                 break;
2291             }
2292 
2293             /* Get packet type. */
2294             if (nx_packet_data_extract_offset(packet_ptr, 0, &packet_type, 1, &bytes_copied))
2295             {
2296 
2297                 /* Unable to read packet type. */
2298                 break;
2299             }
2300 
2301             /* Right shift 4 bits to get the packet type. */
2302             packet_type = packet_type >> 4;
2303 
2304             /* Process based on packet type. */
2305             switch (packet_type)
2306             {
2307             case MQTT_CONTROL_PACKET_TYPE_CONNECT:
2308                 /* Client does not accept connections.  Nothing needs to be done. */
2309                 break;
2310             case MQTT_CONTROL_PACKET_TYPE_CONNACK:
2311                 _nxd_mqtt_process_connack(client_ptr, packet_ptr, NX_NO_WAIT);
2312                 break;
2313 
2314             case MQTT_CONTROL_PACKET_TYPE_PUBLISH:
2315                 packet_consumed = _nxd_mqtt_process_publish(client_ptr, packet_ptr);
2316                 break;
2317 
2318             case MQTT_CONTROL_PACKET_TYPE_PUBACK:
2319             case MQTT_CONTROL_PACKET_TYPE_PUBREL:
2320                 _nxd_mqtt_process_publish_response(client_ptr, packet_ptr);
2321                 break;
2322 
2323             case MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE:
2324             case MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE:
2325                 /* Client should not process subscribe or unsubscribe message. */
2326                 break;
2327 
2328             case MQTT_CONTROL_PACKET_TYPE_SUBACK:
2329             case MQTT_CONTROL_PACKET_TYPE_UNSUBACK:
2330                 _nxd_mqtt_process_sub_unsub_ack(client_ptr, packet_ptr);
2331                 break;
2332 
2333 
2334             case MQTT_CONTROL_PACKET_TYPE_PINGREQ:
2335                 /* Client is not supposed to receive ping req.  Ignore it. */
2336                 break;
2337 
2338             case MQTT_CONTROL_PACKET_TYPE_PINGRESP:
2339                 _nxd_mqtt_process_pingresp(client_ptr);
2340                 break;
2341 
2342             case MQTT_CONTROL_PACKET_TYPE_DISCONNECT:
2343                 _nxd_mqtt_process_disconnect(client_ptr);
2344                 break;
2345 
2346             /* Publisher sender message type for QoS 2. Not supported. */
2347             case MQTT_CONTROL_PACKET_TYPE_PUBREC:
2348             case MQTT_CONTROL_PACKET_TYPE_PUBCOMP:
2349             default:
2350                 /* Unknown type. */
2351                 break;
2352             }
2353 
2354             if (packet_consumed)
2355             {
2356                 break;
2357             }
2358 
2359             /* Trim current packet. */
2360             offset += remaining_length;
2361             packet_length = packet_ptr -> nx_packet_length;
2362             if (packet_length > offset)
2363             {
2364 
2365                 /* Multiple MQTT message in one packet. */
2366                 packet_length = packet_ptr -> nx_packet_length - offset;
2367                 while ((ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr) <= offset)
2368                 {
2369                     offset -= (ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr);
2370 
2371                     /* Current packet can be released. */
2372                     previous_packet_ptr = packet_ptr;
2373                     packet_ptr = packet_ptr -> nx_packet_next;
2374                     previous_packet_ptr -> nx_packet_next = NX_NULL;
2375                     nx_packet_release(previous_packet_ptr);
2376                     if (packet_ptr == NX_NULL)
2377                     {
2378 
2379                         /* Invalid packet. */
2380                         break;
2381                     }
2382                 }
2383 
2384                 if (packet_ptr)
2385                 {
2386 
2387                     /* Adjust current packet. */
2388                     packet_ptr -> nx_packet_prepend_ptr = packet_ptr -> nx_packet_prepend_ptr + offset;
2389                     packet_ptr -> nx_packet_length = packet_length;
2390                 }
2391             }
2392             else
2393             {
2394 
2395                 /* All messages in current packet is processed. */
2396                 break;
2397             }
2398         }
2399 
2400         if (!packet_consumed)
2401         {
2402             nx_packet_release(packet_ptr);
2403         }
2404     }
2405 
2406     /* No more data in the receive queue.  Return. */
2407 
2408     return;
2409 }
2410 
2411 
2412 /**************************************************************************/
2413 /*                                                                        */
2414 /*  FUNCTION                                               RELEASE        */
2415 /*                                                                        */
2416 /*    _nxd_mqtt_tcp_establish_process                     PORTABLE C      */
2417 /*                                                           6.2.0        */
2418 /*  AUTHOR                                                                */
2419 /*                                                                        */
2420 /*    Yuxin Zhou, Microsoft Corporation                                   */
2421 /*                                                                        */
2422 /*  DESCRIPTION                                                           */
2423 /*                                                                        */
2424 /*    This function processes MQTT TCP connection establish event.        */
2425 /*                                                                        */
2426 /*  INPUT                                                                 */
2427 /*                                                                        */
2428 /*    client_ptr                            Pointer to MQTT Client        */
2429 /*                                                                        */
2430 /*  OUTPUT                                                                */
2431 /*                                                                        */
2432 /*    None                                                                */
2433 /*                                                                        */
2434 /*  CALLS                                                                 */
2435 /*                                                                        */
2436 /*    nx_secure_tls_session_start                                         */
2437 /*    _nxd_mqtt_client_connection_end                                     */
2438 /*    _nxd_mqtt_client_connect_packet_send                                */
2439 /*                                                                        */
2440 /*  CALLED BY                                                             */
2441 /*                                                                        */
2442 /*    _nxd_mqtt_client_event_process                                      */
2443 /*                                                                        */
2444 /*  RELEASE HISTORY                                                       */
2445 /*                                                                        */
2446 /*    DATE              NAME                      DESCRIPTION             */
2447 /*                                                                        */
2448 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2449 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2450 /*                                            resulting in version 6.1    */
2451 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
2452 /*                                            mqtt over websocket,        */
2453 /*                                            resulting in version 6.2.0  */
2454 /*                                                                        */
2455 /**************************************************************************/
_nxd_mqtt_tcp_establish_process(NXD_MQTT_CLIENT * client_ptr)2456 static VOID _nxd_mqtt_tcp_establish_process(NXD_MQTT_CLIENT *client_ptr)
2457 {
2458 UINT       status;
2459 
2460 
2461     /* TCP connection is established.  */
2462 
2463     /* If TLS is enabled, start TLS */
2464 #ifdef NX_SECURE_ENABLE
2465     if (client_ptr -> nxd_mqtt_client_use_tls)
2466     {
2467         status = nx_secure_tls_session_start(&(client_ptr -> nxd_mqtt_tls_session), &(client_ptr -> nxd_mqtt_client_socket), NX_NO_WAIT);
2468 
2469         if (status != NX_CONTINUE)
2470         {
2471 
2472             /* End connection. */
2473             _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2474 
2475             /* Check callback function.  */
2476             if (client_ptr -> nxd_mqtt_connect_notify)
2477             {
2478                 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2479             }
2480 
2481             return;
2482         }
2483 
2484         /* TLS in progress.  */
2485         client_ptr -> nxd_mqtt_tls_in_progress = NX_TRUE;
2486 
2487         return;
2488     }
2489 #endif /* NX_SECURE_ENABLE */
2490 
2491 #ifdef NXD_MQTT_OVER_WEBSOCKET
2492 
2493     /* If using websocket, start websocket connection.  */
2494     if (client_ptr -> nxd_mqtt_client_use_websocket)
2495     {
2496         status = nx_websocket_client_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_client_socket),
2497                                              client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
2498                                              client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
2499                                              (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
2500                                              NX_NO_WAIT);
2501 
2502         if (status != NX_IN_PROGRESS)
2503         {
2504 
2505             /* End connection. */
2506             _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2507 
2508             /* Check callback function.  */
2509             if (client_ptr -> nxd_mqtt_connect_notify)
2510             {
2511                 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2512             }
2513 
2514             return;
2515         }
2516 
2517         return;
2518     }
2519 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2520 
2521     /* Start to send MQTT connect packet.  */
2522     status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
2523 
2524     /* Check status.  */
2525     if (status)
2526     {
2527 
2528         /* End connection. */
2529         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2530 
2531         /* Check callback function.  */
2532         if (client_ptr -> nxd_mqtt_connect_notify)
2533         {
2534             client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2535         }
2536     }
2537 }
2538 
2539 
2540 #ifdef NX_SECURE_ENABLE
2541 /**************************************************************************/
2542 /*                                                                        */
2543 /*  FUNCTION                                               RELEASE        */
2544 /*                                                                        */
2545 /*    _nxd_mqtt_tls_establish_process                     PORTABLE C      */
2546 /*                                                           6.2.0        */
2547 /*  AUTHOR                                                                */
2548 /*                                                                        */
2549 /*    Yuxin Zhou, Microsoft Corporation                                   */
2550 /*                                                                        */
2551 /*  DESCRIPTION                                                           */
2552 /*                                                                        */
2553 /*    This function processes TLS connection establish event.             */
2554 /*                                                                        */
2555 /*  INPUT                                                                 */
2556 /*                                                                        */
2557 /*    client_ptr                            Pointer to MQTT Client        */
2558 /*                                                                        */
2559 /*  OUTPUT                                                                */
2560 /*                                                                        */
2561 /*    None                                                                */
2562 /*                                                                        */
2563 /*  CALLS                                                                 */
2564 /*                                                                        */
2565 /*    _nx_secure_tls_handshake_process                                    */
2566 /*    _nxd_mqtt_client_connect_packet_send                                */
2567 /*    _nxd_mqtt_client_connection_end                                     */
2568 /*                                                                        */
2569 /*  CALLED BY                                                             */
2570 /*                                                                        */
2571 /*    _nxd_mqtt_client_event_process                                      */
2572 /*                                                                        */
2573 /*  RELEASE HISTORY                                                       */
2574 /*                                                                        */
2575 /*    DATE              NAME                      DESCRIPTION             */
2576 /*                                                                        */
2577 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2578 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2579 /*                                            resulting in version 6.1    */
2580 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
2581 /*                                            mqtt over websocket,        */
2582 /*                                            resulting in version 6.2.0  */
2583 /*                                                                        */
2584 /**************************************************************************/
_nxd_mqtt_tls_establish_process(NXD_MQTT_CLIENT * client_ptr)2585 static VOID _nxd_mqtt_tls_establish_process(NXD_MQTT_CLIENT *client_ptr)
2586 {
2587 UINT       status;
2588 
2589 
2590     /* Directly call handshake process for async mode. */
2591     status = _nx_secure_tls_handshake_process(&(client_ptr -> nxd_mqtt_tls_session), NX_NO_WAIT);
2592     if (status == NX_SUCCESS)
2593     {
2594 
2595         /* TLS session established.   */
2596         client_ptr -> nxd_mqtt_tls_in_progress = NX_FALSE;
2597 
2598 #ifdef NXD_MQTT_OVER_WEBSOCKET
2599 
2600         /* If using websocket, start websocket connection.  */
2601         if (client_ptr -> nxd_mqtt_client_use_websocket)
2602         {
2603             status = nx_websocket_client_secure_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_tls_session),
2604                                                         client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
2605                                                         client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
2606                                                         (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
2607                                                         NX_NO_WAIT);
2608 
2609             if (status == NX_IN_PROGRESS)
2610             {
2611                 return;
2612             }
2613         }
2614         else
2615 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2616         {
2617 
2618             /* Start to send MQTT connect packet.  */
2619             status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
2620         }
2621     }
2622     else if (status == NX_CONTINUE)
2623     {
2624         return;
2625     }
2626 
2627     /* Check status.  */
2628     if (status)
2629     {
2630 
2631         /* End connection. */
2632         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2633 
2634         /* Check callback function.  */
2635         if (client_ptr -> nxd_mqtt_connect_notify)
2636         {
2637             client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2638         }
2639     }
2640 
2641     return;
2642 }
2643 #endif /* NX_SECURE_ENABLE */
2644 
2645 /**************************************************************************/
2646 /*                                                                        */
2647 /*  FUNCTION                                               RELEASE        */
2648 /*                                                                        */
2649 /*    _nxd_mqtt_client_append_message                     PORTABLE C      */
2650 /*                                                           6.1          */
2651 /*  AUTHOR                                                                */
2652 /*                                                                        */
2653 /*    Yuxin Zhou, Microsoft Corporation                                   */
2654 /*                                                                        */
2655 /*  DESCRIPTION                                                           */
2656 /*                                                                        */
2657 /*    This function writes the message length and message in the outgoing */
2658 /*    MQTT packet.                                                        */
2659 /*                                                                        */
2660 /*  INPUT                                                                 */
2661 /*                                                                        */
2662 /*    client_ptr                            Pointer to MQTT Client        */
2663 /*    packet_ptr                            Outgoing MQTT packet          */
2664 /*    message                               Pointer to the message        */
2665 /*    length                                Length of the message         */
2666 /*    wait_option                           Wait option                   */
2667 /*                                                                        */
2668 /*  OUTPUT                                                                */
2669 /*                                                                        */
2670 /*    status                                                              */
2671 /*                                                                        */
2672 /*  CALLS                                                                 */
2673 /*                                                                        */
2674 /*    nx_packet_data_append                 Append packet data            */
2675 /*                                                                        */
2676 /*  CALLED BY                                                             */
2677 /*                                                                        */
2678 /*    _nxd_mqtt_client_sub_unsub                                          */
2679 /*    _nxd_mqtt_client_connect                                            */
2680 /*    _nxd_mqtt_client_publish                                            */
2681 /*                                                                        */
2682 /*  RELEASE HISTORY                                                       */
2683 /*                                                                        */
2684 /*    DATE              NAME                      DESCRIPTION             */
2685 /*                                                                        */
2686 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2687 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2688 /*                                            resulting in version 6.1    */
2689 /*                                                                        */
2690 /**************************************************************************/
_nxd_mqtt_client_append_message(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,CHAR * message,UINT length,ULONG wait_option)2691 UINT _nxd_mqtt_client_append_message(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, CHAR *message, UINT length, ULONG wait_option)
2692 {
2693 UINT ret = 0;
2694 UCHAR len[2];
2695 
2696     len[0] = (length >> 8) & 0xFF;
2697     len[1] = length  & 0xFF;
2698 
2699     /* Append message length field. */
2700     ret = nx_packet_data_append(packet_ptr, len, 2, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
2701 
2702     if (ret)
2703     {
2704         return(ret);
2705     }
2706 
2707     if (length)
2708     {
2709         /* Copy the string into the packet. */
2710         ret = nx_packet_data_append(packet_ptr, message, length,
2711                                     client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
2712     }
2713 
2714     return(ret);
2715 }
2716 
2717 
2718 /**************************************************************************/
2719 /*                                                                        */
2720 /*  FUNCTION                                               RELEASE        */
2721 /*                                                                        */
2722 /*    _nxd_mqtt_client_connection_end                     PORTABLE C      */
2723 /*                                                           6.2.0        */
2724 /*  AUTHOR                                                                */
2725 /*                                                                        */
2726 /*    Yuxin Zhou, Microsoft Corporation                                   */
2727 /*                                                                        */
2728 /*  DESCRIPTION                                                           */
2729 /*                                                                        */
2730 /*    This function is used to end the MQTT connection.                   */
2731 /*                                                                        */
2732 /*  INPUT                                                                 */
2733 /*                                                                        */
2734 /*    client_ptr                            Pointer to MQTT Client        */
2735 /*    wait_option                           Wait option                   */
2736 /*                                                                        */
2737 /*  OUTPUT                                                                */
2738 /*                                                                        */
2739 /*    None                                                                */
2740 /*                                                                        */
2741 /*  CALLS                                                                 */
2742 /*                                                                        */
2743 /*    nx_secure_tls_session_end             End TLS session               */
2744 /*    nx_secure_tls_session_delete          Delete TLS session            */
2745 /*    nx_tcp_socket_disconnect              Close TCP connection          */
2746 /*    nx_tcp_client_socket_unbind           Unbind TCP socket             */
2747 /*    tx_timer_delete                       Delete timer                  */
2748 /*                                                                        */
2749 /*  CALLED BY                                                             */
2750 /*                                                                        */
2751 /*    _nxd_mqtt_client_connect                                            */
2752 /*    _nxd_mqtt_process_disconnect                                        */
2753 /*                                                                        */
2754 /*  RELEASE HISTORY                                                       */
2755 /*                                                                        */
2756 /*    DATE              NAME                      DESCRIPTION             */
2757 /*                                                                        */
2758 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2759 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2760 /*                                            resulting in version 6.1    */
2761 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
2762 /*                                            mqtt over websocket,        */
2763 /*                                            resulting in version 6.2.0  */
2764 /*                                                                        */
2765 /**************************************************************************/
_nxd_mqtt_client_connection_end(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)2766 VOID _nxd_mqtt_client_connection_end(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
2767 {
2768 
2769     /* Obtain the mutex. */
2770     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
2771 
2772     /* Mark the session as terminated. */
2773     client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
2774 
2775     /* Release the mutex. */
2776     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2777 
2778 #ifdef NXD_MQTT_OVER_WEBSOCKET
2779     if (client_ptr -> nxd_mqtt_client_use_websocket)
2780     {
2781         nx_websocket_client_disconnect(&(client_ptr -> nxd_mqtt_client_websocket), wait_option);
2782     }
2783 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2784 
2785 #ifdef NX_SECURE_ENABLE
2786     if (client_ptr -> nxd_mqtt_client_use_tls)
2787     {
2788         nx_secure_tls_session_end(&(client_ptr -> nxd_mqtt_tls_session), wait_option);
2789         nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
2790     }
2791 #endif
2792     nx_tcp_socket_disconnect(&(client_ptr -> nxd_mqtt_client_socket), wait_option);
2793     nx_tcp_client_socket_unbind(&(client_ptr -> nxd_mqtt_client_socket));
2794 
2795     /* Disable timer if timer has been started. */
2796     if (client_ptr -> nxd_mqtt_keepalive)
2797     {
2798          tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
2799     }
2800 }
2801 
2802 
2803 static UINT _nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT *client_ptr, UCHAR header_value);
2804 
2805 
2806 /* MQTT internal function */
2807 
2808 /**************************************************************************/
2809 /*                                                                        */
2810 /*  FUNCTION                                               RELEASE        */
2811 /*                                                                        */
2812 /*    _nxd_mqtt_periodic_timer_entry                      PORTABLE C      */
2813 /*                                                           6.1          */
2814 /*  AUTHOR                                                                */
2815 /*                                                                        */
2816 /*    Yuxin Zhou, Microsoft Corporation                                   */
2817 /*                                                                        */
2818 /*  DESCRIPTION                                                           */
2819 /*                                                                        */
2820 /*    This internal function is passed to MQTT client socket create call. */
2821 /*    This callback function notifies MQTT client thread when the TCP     */
2822 /*    connection is lost.                                                 */
2823 /*                                                                        */
2824 /*                                                                        */
2825 /*  INPUT                                                                 */
2826 /*                                                                        */
2827 /*    socket_ptr                            Pointer to TCP socket that    */
2828 /*                                            disconnected.               */
2829 /*                                                                        */
2830 /*  OUTPUT                                                                */
2831 /*                                                                        */
2832 /*    None                                                                */
2833 /*                                                                        */
2834 /*  CALLS                                                                 */
2835 /*                                                                        */
2836 /*    None                                                                */
2837 /*                                                                        */
2838 /*  CALLED BY                                                             */
2839 /*                                                                        */
2840 /*    TCP socket disconnect callback                                      */
2841 /*                                                                        */
2842 /*  RELEASE HISTORY                                                       */
2843 /*                                                                        */
2844 /*    DATE              NAME                      DESCRIPTION             */
2845 /*                                                                        */
2846 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2847 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2848 /*                                            resulting in version 6.1    */
2849 /*                                                                        */
2850 /**************************************************************************/
_nxd_mqtt_periodic_timer_entry(ULONG client)2851 static VOID _nxd_mqtt_periodic_timer_entry(ULONG client)
2852 {
2853 /* Check if it is time to send out a ping message. */
2854 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)client;
2855 
2856     /* If an outstanding ping response has not been received, and the client exceeds the time waiting for ping response,
2857        the client shall disconnect from the server. */
2858     if (client_ptr -> nxd_mqtt_ping_not_responded)
2859     {
2860         /* If current time is greater than the ping timeout */
2861         if ((tx_time_get() - client_ptr -> nxd_mqtt_ping_sent_time) >= client_ptr -> nxd_mqtt_ping_timeout)
2862         {
2863             /* Ping timed out.  Need to terminate the connection. */
2864 #ifndef NXD_MQTT_CLOUD_ENABLE
2865             tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_PING_TIMEOUT_EVENT, TX_OR);
2866 #else
2867             nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PING_TIMEOUT_EVENT);
2868 #endif /* NXD_MQTT_CLOUD_ENABLE */
2869 
2870             return;
2871         }
2872     }
2873 
2874     /* About to timeout? */
2875     if ((client_ptr -> nxd_mqtt_timeout - tx_time_get()) <= client_ptr -> nxd_mqtt_timer_value)
2876     {
2877         /* Set the flag so the MQTT thread can send the ping. */
2878 #ifndef NXD_MQTT_CLOUD_ENABLE
2879         tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_TIMEOUT_EVENT, TX_OR);
2880 #else
2881         nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_TIMEOUT_EVENT);
2882 #endif /* NXD_MQTT_CLOUD_ENABLE */
2883     }
2884 
2885     /* If keepalive is not enabled, just return. */
2886     return;
2887 }
2888 
2889 
2890 
2891 /**************************************************************************/
2892 /*                                                                        */
2893 /*  FUNCTION                                               RELEASE        */
2894 /*                                                                        */
2895 /*    _nxd_mqtt_client_event_process                      PORTABLE C      */
2896 /*                                                           6.1          */
2897 /*  AUTHOR                                                                */
2898 /*                                                                        */
2899 /*    Yuxin Zhou, Microsoft Corporation                                   */
2900 /*                                                                        */
2901 /*  DESCRIPTION                                                           */
2902 /*                                                                        */
2903 /*    This internal function serves as the entry point for the MQTT       */
2904 /*    client thread.                                                      */
2905 /*                                                                        */
2906 /*  INPUT                                                                 */
2907 /*                                                                        */
2908 /*    mqtt_client                           Pointer to MQTT Client        */
2909 /*                                                                        */
2910 /*  OUTPUT                                                                */
2911 /*                                                                        */
2912 /*    None                                                                */
2913 /*                                                                        */
2914 /*  CALLS                                                                 */
2915 /*                                                                        */
2916 /*    _nxd_mqtt_release_transmit_packet                                   */
2917 /*    _nxd_mqtt_release_receive_packet                                    */
2918 /*    _nxd_mqtt_send_simple_message                                       */
2919 /*    _nxd_mqtt_process_disconnect                                        */
2920 /*    _nxd_mqtt_packet_receive_process                                    */
2921 /*    tx_timer_delete                                                     */
2922 /*    tx_event_flags_delete                                               */
2923 /*    nx_tcp_socket_delete                                                */
2924 /*    tx_timer_delete                                                     */
2925 /*    tx_mutex_delete                                                     */
2926 /*                                                                        */
2927 /*  CALLED BY                                                             */
2928 /*                                                                        */
2929 /*   _nxd_mqtt_client_create                                              */
2930 /*                                                                        */
2931 /*  RELEASE HISTORY                                                       */
2932 /*                                                                        */
2933 /*    DATE              NAME                      DESCRIPTION             */
2934 /*                                                                        */
2935 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2936 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
2937 /*                                            corrected mqtt client state,*/
2938 /*                                            resulting in version 6.1    */
2939 /*                                                                        */
2940 /**************************************************************************/
_nxd_mqtt_client_event_process(VOID * mqtt_client,ULONG common_events,ULONG module_own_events)2941 static VOID _nxd_mqtt_client_event_process(VOID *mqtt_client, ULONG common_events, ULONG module_own_events)
2942 {
2943 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)mqtt_client;
2944 
2945 
2946     /* Obtain the mutex. */
2947     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
2948 
2949     /* Process common events.  */
2950     NX_PARAMETER_NOT_USED(common_events);
2951 
2952     if (module_own_events & MQTT_TIMEOUT_EVENT)
2953     {
2954         /* Send out PING only if the client is connected. */
2955         if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2956         {
2957             _nxd_mqtt_send_simple_message(client_ptr, MQTT_CONTROL_PACKET_TYPE_PINGREQ);
2958         }
2959     }
2960 
2961     if (module_own_events & MQTT_TCP_ESTABLISH_EVENT)
2962     {
2963         _nxd_mqtt_tcp_establish_process(client_ptr);
2964     }
2965 
2966     if (module_own_events & MQTT_PACKET_RECEIVE_EVENT)
2967     {
2968 #ifdef NX_SECURE_ENABLE
2969         /* TLS in progress on async mode.  */
2970         if (client_ptr -> nxd_mqtt_tls_in_progress)
2971         {
2972             _nxd_mqtt_tls_establish_process(client_ptr);
2973         }
2974         else
2975 #endif /* NX_SECURE_ENABLE */
2976 
2977         _nxd_mqtt_packet_receive_process(client_ptr);
2978     }
2979 
2980     if (module_own_events & MQTT_PING_TIMEOUT_EVENT)
2981     {
2982         /* The server/broker didn't respond to our ping request message. Disconnect from the server. */
2983         _nxd_mqtt_process_disconnect(client_ptr);
2984     }
2985     if (module_own_events & MQTT_NETWORK_DISCONNECT_EVENT)
2986     {
2987         /* The server closed TCP socket. We shall go through the disconnect code path. */
2988         _nxd_mqtt_process_disconnect(client_ptr);
2989     }
2990 
2991     if (module_own_events & MQTT_DELETE_EVENT)
2992     {
2993 
2994         /* Stop the client and disconnect from the server. */
2995         if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2996         {
2997             _nxd_mqtt_process_disconnect(client_ptr);
2998         }
2999 
3000         /* Delete the timer. Check first if it is already deleted. */
3001         if ((client_ptr -> nxd_mqtt_timer).tx_timer_id != 0)
3002             tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
3003 
3004 #ifndef NXD_MQTT_CLOUD_ENABLE
3005         /* Delete the event flag. Check first if it is already deleted. */
3006         if ((client_ptr -> nxd_mqtt_events).tx_event_flags_group_id != 0)
3007             tx_event_flags_delete(&client_ptr -> nxd_mqtt_events);
3008 #endif /* NXD_MQTT_CLOUD_ENABLE */
3009 
3010         /* Release all the messages on the receive queue. */
3011         while (client_ptr -> message_receive_queue_head)
3012         {
3013             _nxd_mqtt_release_receive_packet(client_ptr, client_ptr -> message_receive_queue_head, NX_NULL);
3014         }
3015         client_ptr -> message_receive_queue_depth = 0;
3016 
3017         /* Delete all the messages sitting in the receive and transmit queue. */
3018         while (client_ptr -> message_transmit_queue_head)
3019         {
3020             _nxd_mqtt_release_transmit_packet(client_ptr, client_ptr -> message_transmit_queue_head, NX_NULL);
3021         }
3022 
3023         /* Release mutex */
3024         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3025 
3026 #ifndef NXD_MQTT_CLOUD_ENABLE
3027         /* Delete the mutex. */
3028         tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3029 #endif /* NXD_MQTT_CLOUD_ENABLE */
3030 
3031         /* Deleting the socket, (the socket ID is cleared); this signals it is ok to delete this thread. */
3032         nx_tcp_socket_delete(&client_ptr -> nxd_mqtt_client_socket);
3033     }
3034     else
3035     {
3036 
3037         /* Release mutex */
3038         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3039     }
3040 }
3041 
3042 
3043 #ifndef NXD_MQTT_CLOUD_ENABLE
3044 /**************************************************************************/
3045 /*                                                                        */
3046 /*  FUNCTION                                               RELEASE        */
3047 /*                                                                        */
3048 /*    _nxd_mqtt_thread_entry                              PORTABLE C      */
3049 /*                                                           6.1          */
3050 /*  AUTHOR                                                                */
3051 /*                                                                        */
3052 /*    Yuxin Zhou, Microsoft Corporation                                   */
3053 /*                                                                        */
3054 /*  DESCRIPTION                                                           */
3055 /*                                                                        */
3056 /*    This internal function serves as the entry point for the MQTT       */
3057 /*    client thread.                                                      */
3058 /*                                                                        */
3059 /*  INPUT                                                                 */
3060 /*                                                                        */
3061 /*    mqtt_client                           Pointer to MQTT Client        */
3062 /*                                                                        */
3063 /*  OUTPUT                                                                */
3064 /*                                                                        */
3065 /*    None                                                                */
3066 /*                                                                        */
3067 /*  CALLS                                                                 */
3068 /*                                                                        */
3069 /*    tx_event_flags_get                                                  */
3070 /*    _nxd_mqtt_client_event_process                                      */
3071 /*                                                                        */
3072 /*  CALLED BY                                                             */
3073 /*                                                                        */
3074 /*   _nxd_mqtt_client_create                                              */
3075 /*                                                                        */
3076 /*  RELEASE HISTORY                                                       */
3077 /*                                                                        */
3078 /*    DATE              NAME                      DESCRIPTION             */
3079 /*                                                                        */
3080 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3081 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3082 /*                                            resulting in version 6.1    */
3083 /*                                                                        */
3084 /**************************************************************************/
_nxd_mqtt_thread_entry(ULONG mqtt_client)3085 static VOID _nxd_mqtt_thread_entry(ULONG mqtt_client)
3086 {
3087 NXD_MQTT_CLIENT *client_ptr;
3088 ULONG            events;
3089 
3090     client_ptr = (NXD_MQTT_CLIENT *)mqtt_client;
3091 
3092     /* Loop to process events on the MQTT client */
3093     for (;;)
3094     {
3095 
3096         tx_event_flags_get(&client_ptr -> nxd_mqtt_events, MQTT_ALL_EVENTS, TX_OR_CLEAR, &events, TX_WAIT_FOREVER);
3097 
3098         /* Call the event processing routine.  */
3099         _nxd_mqtt_client_event_process(client_ptr, NX_NULL, events);
3100 
3101         if (events & MQTT_DELETE_EVENT)
3102         {
3103             break;
3104         }
3105     }
3106 }
3107 #endif /* NXD_MQTT_CLOUD_ENABLE */
3108 
3109 
3110 /**************************************************************************/
3111 /*                                                                        */
3112 /*  FUNCTION                                               RELEASE        */
3113 /*                                                                        */
3114 /*    _mqtt_client_disconnect_callback                    PORTABLE C      */
3115 /*                                                           6.1          */
3116 /*  AUTHOR                                                                */
3117 /*                                                                        */
3118 /*    Yuxin Zhou, Microsoft Corporation                                   */
3119 /*                                                                        */
3120 /*  DESCRIPTION                                                           */
3121 /*                                                                        */
3122 /*    This internal function is passed to MQTT client socket create call. */
3123 /*    This callback function notifies MQTT client thread when the TCP     */
3124 /*    connection is lost.                                                 */
3125 /*                                                                        */
3126 /*                                                                        */
3127 /*  INPUT                                                                 */
3128 /*                                                                        */
3129 /*    socket_ptr                            Pointer to TCP socket that    */
3130 /*                                            disconnected.               */
3131 /*                                                                        */
3132 /*  OUTPUT                                                                */
3133 /*                                                                        */
3134 /*    None                                                                */
3135 /*                                                                        */
3136 /*  CALLS                                                                 */
3137 /*                                                                        */
3138 /*    None                                                                */
3139 /*                                                                        */
3140 /*  CALLED BY                                                             */
3141 /*                                                                        */
3142 /*    TCP socket disconnect callback                                      */
3143 /*                                                                        */
3144 /*  RELEASE HISTORY                                                       */
3145 /*                                                                        */
3146 /*    DATE              NAME                      DESCRIPTION             */
3147 /*                                                                        */
3148 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3149 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3150 /*                                            resulting in version 6.1    */
3151 /*                                                                        */
3152 /**************************************************************************/
_mqtt_client_disconnect_callback(NX_TCP_SOCKET * socket_ptr)3153 static VOID _mqtt_client_disconnect_callback(NX_TCP_SOCKET *socket_ptr)
3154 {
3155 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)(socket_ptr -> nx_tcp_socket_reserved_ptr);
3156 
3157     /* Set the MQTT_NETWORK_DISCONNECT  event.  This event indicates
3158        that the disconnect is initiated from the network. */
3159 #ifndef NXD_MQTT_CLOUD_ENABLE
3160     tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_NETWORK_DISCONNECT_EVENT, TX_OR);
3161 #else
3162     nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_NETWORK_DISCONNECT_EVENT);
3163 #endif /* NXD_MQTT_CLOUD_ENABLE */
3164 
3165     return;
3166 }
3167 
3168 
3169 /**************************************************************************/
3170 /*                                                                        */
3171 /*  FUNCTION                                               RELEASE        */
3172 /*                                                                        */
3173 /*    _nxd_mqtt_client_create                             PORTABLE C      */
3174 /*                                                           6.1          */
3175 /*  AUTHOR                                                                */
3176 /*                                                                        */
3177 /*    Yuxin Zhou, Microsoft Corporation                                   */
3178 /*                                                                        */
3179 /*  DESCRIPTION                                                           */
3180 /*                                                                        */
3181 /*    This function creates an instance for MQTT Client.  It initializes  */
3182 /*    MQTT Client control block, creates a thread, mutex and event queue  */
3183 /*    for MQTT Client, and creates the TCP socket for MQTT messaging.     */
3184 /*                                                                        */
3185 /*                                                                        */
3186 /*  INPUT                                                                 */
3187 /*                                                                        */
3188 /*    client_ptr                            Pointer to MQTT Client        */
3189 /*    client_id                             Client ID for this instance   */
3190 /*    client_id_length                      Length of Client ID, in bytes */
3191 /*    ip_ptr                                Pointer to IP instance        */
3192 /*    pool_ptr                              Pointer to packet pool        */
3193 /*    stack_ptr                             Client thread's stack pointer */
3194 /*    stack_size                            Client thread's stack size    */
3195 /*    mqtt_thread_priority                  Priority for MQTT thread      */
3196 /*    memory_ptr                            Deprecated and not used       */
3197 /*    memory_size                           Deprecated and not used       */
3198 /*                                                                        */
3199 /*  OUTPUT                                                                */
3200 /*                                                                        */
3201 /*    status                                Completion status             */
3202 /*                                                                        */
3203 /*  CALLS                                                                 */
3204 /*                                                                        */
3205 /*    _nxd_mqtt_client_create_internal      Create mqtt client            */
3206 /*                                                                        */
3207 /*  CALLED BY                                                             */
3208 /*                                                                        */
3209 /*    Application Code                                                    */
3210 /*                                                                        */
3211 /*  RELEASE HISTORY                                                       */
3212 /*                                                                        */
3213 /*    DATE              NAME                      DESCRIPTION             */
3214 /*                                                                        */
3215 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3216 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
3217 /*                                            corrected mqtt client state,*/
3218 /*                                            resulting in version 6.1    */
3219 /*                                                                        */
3220 /**************************************************************************/
_nxd_mqtt_client_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority,VOID * memory_ptr,ULONG memory_size)3221 UINT _nxd_mqtt_client_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
3222                              CHAR *client_id, UINT client_id_length,
3223                              NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
3224                              VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority,
3225                              VOID *memory_ptr, ULONG memory_size)
3226 {
3227 
3228 UINT    status;
3229 
3230 
3231     NX_PARAMETER_NOT_USED(memory_ptr);
3232     NX_PARAMETER_NOT_USED(memory_size);
3233 
3234     /* Create MQTT client.  */
3235     status = _nxd_mqtt_client_create_internal(client_ptr, client_name, client_id, client_id_length, ip_ptr,
3236                                               pool_ptr, stack_ptr, stack_size, mqtt_thread_priority);
3237 
3238     /* Check status.  */
3239     if (status)
3240     {
3241         return(status);
3242     }
3243 
3244 #ifdef NXD_MQTT_CLOUD_ENABLE
3245 
3246     /* Create cloud helper.  */
3247     status = nx_cloud_create(&(client_ptr -> nxd_mqtt_client_cloud), "Cloud Helper", stack_ptr, stack_size, mqtt_thread_priority);
3248 
3249     /* Determine if an error occurred.  */
3250     if (status != NX_SUCCESS)
3251     {
3252 
3253         /* Delete internal resource created in _nxd_mqtt_client_create_internal().  */
3254 
3255         /* Delete socket.  */
3256         nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
3257 
3258         return(NXD_MQTT_INTERNAL_ERROR);
3259     }
3260 
3261     /* Save the cloud pointer.  */
3262     client_ptr -> nxd_mqtt_client_cloud_ptr = &(client_ptr -> nxd_mqtt_client_cloud);
3263 
3264     /* Save the mutex pointer.  */
3265     client_ptr -> nxd_mqtt_client_mutex_ptr = &(client_ptr -> nxd_mqtt_client_cloud.nx_cloud_mutex);
3266 
3267     /* Register MQTT on cloud helper.  */
3268     status = nx_cloud_module_register(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module), client_name, NX_CLOUD_MODULE_MQTT_EVENT,
3269                                       _nxd_mqtt_client_event_process, client_ptr);
3270 
3271     /* Determine if an error occurred.  */
3272     if (status != NX_SUCCESS)
3273     {
3274 
3275         /* Delete own created cloud.  */
3276         nx_cloud_delete(&(client_ptr -> nxd_mqtt_client_cloud));
3277 
3278         /* Delete internal resource created in _nxd_mqtt_client_create_internal().  */
3279 
3280         /* Delete socket.  */
3281         nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
3282 
3283         return(NXD_MQTT_INTERNAL_ERROR);
3284     }
3285 
3286 #endif /* NXD_MQTT_CLOUD_ENABLE */
3287 
3288     /* Update state.  */
3289     client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
3290 
3291     /* Return.  */
3292     return(NXD_MQTT_SUCCESS);
3293 }
3294 
3295 
3296 /**************************************************************************/
3297 /*                                                                        */
3298 /*  FUNCTION                                               RELEASE        */
3299 /*                                                                        */
3300 /*    _nxd_mqtt_client_create_internal                    PORTABLE C      */
3301 /*                                                           6.1.12       */
3302 /*  AUTHOR                                                                */
3303 /*                                                                        */
3304 /*    Yuxin Zhou, Microsoft Corporation                                   */
3305 /*                                                                        */
3306 /*  DESCRIPTION                                                           */
3307 /*                                                                        */
3308 /*    This function creates an instance for MQTT Client.  It initializes  */
3309 /*    MQTT Client control block, creates a thread, mutex and event queue  */
3310 /*    for MQTT Client, and creates the TCP socket for MQTT messaging.     */
3311 /*                                                                        */
3312 /*                                                                        */
3313 /*  INPUT                                                                 */
3314 /*                                                                        */
3315 /*    client_ptr                            Pointer to MQTT Client        */
3316 /*    client_id                             Client ID for this instance   */
3317 /*    client_id_length                      Length of Client ID, in bytes */
3318 /*    ip_ptr                                Pointer to IP instance        */
3319 /*    pool_ptr                              Pointer to packet pool        */
3320 /*    stack_ptr                             Client thread's stack pointer */
3321 /*    stack_size                            Client thread's stack size    */
3322 /*    mqtt_thread_priority                  Priority for MQTT thread      */
3323 /*                                                                        */
3324 /*  OUTPUT                                                                */
3325 /*                                                                        */
3326 /*    status                                Completion status             */
3327 /*                                                                        */
3328 /*  CALLS                                                                 */
3329 /*                                                                        */
3330 /*    tx_thread_create                      Create a thread               */
3331 /*    tx_mutex_create                       Create mutex                  */
3332 /*    tx_mutex_get                                                        */
3333 /*    tx_mutex_put                                                        */
3334 /*    tx_event_flag_create                  Create event flag             */
3335 /*                                                                        */
3336 /*  CALLED BY                                                             */
3337 /*                                                                        */
3338 /*    Application Code                                                    */
3339 /*                                                                        */
3340 /*  RELEASE HISTORY                                                       */
3341 /*                                                                        */
3342 /*    DATE              NAME                      DESCRIPTION             */
3343 /*                                                                        */
3344 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3345 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
3346 /*                                            corrected mqtt client state,*/
3347 /*                                            resulting in version 6.1    */
3348 /*  07-29-2022     Spencer McDonough        Modified comment(s), and      */
3349 /*                                            improved internal logic,    */
3350 /*                                            resulting in version 6.1.12 */
3351 /*                                                                        */
3352 /**************************************************************************/
_nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority)3353 static UINT _nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
3354                                              CHAR *client_id, UINT client_id_length,
3355                                              NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
3356                                              VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority)
3357 {
3358 #ifndef NXD_MQTT_CLOUD_ENABLE
3359 UINT                status;
3360 #endif /* NXD_MQTT_CLOUD_ENABLE */
3361 
3362 #ifdef NXD_MQTT_CLOUD_ENABLE
3363     NX_PARAMETER_NOT_USED(stack_ptr);
3364     NX_PARAMETER_NOT_USED(stack_size);
3365     NX_PARAMETER_NOT_USED(mqtt_thread_priority);
3366 #endif /* NXD_MQTT_CLOUD_ENABLE */
3367 
3368     /* Clear the MQTT Client control block. */
3369     NXD_MQTT_SECURE_MEMSET((void *)client_ptr, 0, sizeof(NXD_MQTT_CLIENT));
3370 
3371 #ifndef NXD_MQTT_CLOUD_ENABLE
3372 
3373     /* Create MQTT mutex.  */
3374     status = tx_mutex_create(&client_ptr -> nxd_mqtt_protection, client_name, TX_NO_INHERIT);
3375 
3376     /* Determine if an error occurred. */
3377     if (status != TX_SUCCESS)
3378     {
3379 
3380         return(NXD_MQTT_INTERNAL_ERROR);
3381     }
3382     client_ptr -> nxd_mqtt_client_mutex_ptr = &(client_ptr -> nxd_mqtt_protection);
3383 
3384     /* Now create MQTT client thread */
3385     status = tx_thread_create(&(client_ptr -> nxd_mqtt_thread), client_name, _nxd_mqtt_thread_entry,
3386                               (ULONG)client_ptr, stack_ptr, stack_size, mqtt_thread_priority, mqtt_thread_priority,
3387                               NXD_MQTT_CLIENT_THREAD_TIME_SLICE, TX_DONT_START);
3388 
3389     /* Determine if an error occurred. */
3390     if (status != TX_SUCCESS)
3391     {
3392         /* Delete the mutex. */
3393         tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3394 
3395         /* Return error code. */
3396         return(NXD_MQTT_INTERNAL_ERROR);
3397     }
3398 
3399     status = tx_event_flags_create(&(client_ptr -> nxd_mqtt_events), client_name);
3400 
3401     if (status != TX_SUCCESS)
3402     {
3403         /* Delete the mutex. */
3404         tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3405 
3406         /* Delete the thread. */
3407         tx_thread_delete(&(client_ptr -> nxd_mqtt_thread));
3408 
3409         /* Return error code. */
3410         return(NXD_MQTT_INTERNAL_ERROR);
3411     }
3412 #endif /* NXD_MQTT_CLOUD_ENABLE */
3413 
3414     /* Record the client ID information. */
3415     client_ptr -> nxd_mqtt_client_id = client_id;
3416     client_ptr -> nxd_mqtt_client_id_length = client_id_length;
3417     client_ptr -> nxd_mqtt_client_ip_ptr = ip_ptr;
3418     client_ptr -> nxd_mqtt_client_packet_pool_ptr = pool_ptr;
3419     client_ptr -> nxd_mqtt_client_name = client_name;
3420 
3421     /* Create the socket. */
3422     nx_tcp_socket_create(client_ptr -> nxd_mqtt_client_ip_ptr, &(client_ptr -> nxd_mqtt_client_socket), client_ptr -> nxd_mqtt_client_name,
3423                          NX_IP_NORMAL, NX_DONT_FRAGMENT, 0x80, NXD_MQTT_CLIENT_SOCKET_WINDOW_SIZE,
3424                          NX_NULL, _mqtt_client_disconnect_callback);
3425 
3426 
3427     /* Record the client_ptr in the socket structure. */
3428     client_ptr -> nxd_mqtt_client_socket.nx_tcp_socket_reserved_ptr = (VOID *)client_ptr;
3429 
3430 #ifndef NXD_MQTT_CLOUD_ENABLE
3431     /* Start MQTT thread. */
3432     tx_thread_resume(&(client_ptr -> nxd_mqtt_thread));
3433 #endif /* NXD_MQTT_CLOUD_ENABLE */
3434     return(NXD_MQTT_SUCCESS);
3435 }
3436 
3437 
3438 /**************************************************************************/
3439 /*                                                                        */
3440 /*  FUNCTION                                               RELEASE        */
3441 /*                                                                        */
3442 /*    _nxd_mqtt_client_login_set                          PORTABLE C      */
3443 /*                                                           6.1          */
3444 /*  AUTHOR                                                                */
3445 /*                                                                        */
3446 /*    Yuxin Zhou, Microsoft Corporation                                   */
3447 /*                                                                        */
3448 /*  DESCRIPTION                                                           */
3449 /*                                                                        */
3450 /*    This function sets optional MQTT username and password.  Note       */
3451 /*    if the broker requires username and password, this information      */
3452 /*    must be set prior to calling nxd_mqtt_client_connect or             */
3453 /*    nxd_mqtt_client_secure_connect.                                     */
3454 /*                                                                        */
3455 /*                                                                        */
3456 /*  INPUT                                                                 */
3457 /*                                                                        */
3458 /*    client_ptr                            Pointer to MQTT Client        */
3459 /*    username                              User name, or NULL if user    */
3460 /*                                            name is not required        */
3461 /*    username_length                       Length of the user name, or   */
3462 /*                                            0 if user name is NULL      */
3463 /*    password                              Password string, or NULL if   */
3464 /*                                            password is not required    */
3465 /*    password_length                       Length of the password, or    */
3466 /*                                            0 if password is NULL       */
3467 /*                                                                        */
3468 /*  OUTPUT                                                                */
3469 /*                                                                        */
3470 /*    status                                Completion status             */
3471 /*                                                                        */
3472 /*  CALLS                                                                 */
3473 /*                                                                        */
3474 /*    tx_mutex_get                                                        */
3475 /*                                                                        */
3476 /*  CALLED BY                                                             */
3477 /*                                                                        */
3478 /*    Application Code                                                    */
3479 /*                                                                        */
3480 /*  RELEASE HISTORY                                                       */
3481 /*                                                                        */
3482 /*    DATE              NAME                      DESCRIPTION             */
3483 /*                                                                        */
3484 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3485 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3486 /*                                            resulting in version 6.1    */
3487 /*                                                                        */
3488 /**************************************************************************/
_nxd_mqtt_client_login_set(NXD_MQTT_CLIENT * client_ptr,CHAR * username,UINT username_length,CHAR * password,UINT password_length)3489 UINT _nxd_mqtt_client_login_set(NXD_MQTT_CLIENT *client_ptr,
3490                                 CHAR *username, UINT username_length, CHAR *password, UINT password_length)
3491 {
3492 UINT status;
3493 
3494     /* Obtain the mutex. */
3495     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3496 
3497     if (status != TX_SUCCESS)
3498     {
3499         return(NXD_MQTT_MUTEX_FAILURE);
3500     }
3501     client_ptr -> nxd_mqtt_client_username = username;
3502     client_ptr -> nxd_mqtt_client_username_length = (USHORT)username_length;
3503     client_ptr -> nxd_mqtt_client_password = password;
3504     client_ptr -> nxd_mqtt_client_password_length = (USHORT)password_length;
3505 
3506     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3507 
3508     return(NX_SUCCESS);
3509 }
3510 
3511 /**************************************************************************/
3512 /*                                                                        */
3513 /*  FUNCTION                                               RELEASE        */
3514 /*                                                                        */
3515 /*    _nxd_mqtt_client_will_message_set                   PORTABLE C      */
3516 /*                                                           6.1          */
3517 /*  AUTHOR                                                                */
3518 /*                                                                        */
3519 /*    Yuxin Zhou, Microsoft Corporation                                   */
3520 /*                                                                        */
3521 /*  DESCRIPTION                                                           */
3522 /*                                                                        */
3523 /*    This function sets optional MQTT will topic and will message.       */
3524 /*    Note that if will message is needed, application must set will      */
3525 /*    message prior to calling nxd_mqtt_client_connect or                 */
3526 /*    nxd_mqtt_client_secure_connect.                                     */
3527 /*                                                                        */
3528 /*                                                                        */
3529 /*  INPUT                                                                 */
3530 /*                                                                        */
3531 /*    client_ptr                            Pointer to MQTT Client        */
3532 /*    will_topic                            Will topic string.            */
3533 /*    will_topic_length                     Length of the will topic.     */
3534 /*    will_message                          Will message string.          */
3535 /*    will_message_length                   Length of the will message.   */
3536 /*    will_retain_flag                      Whether or not the will       */
3537 /*                                            message is to be retained   */
3538 /*                                            when it is published.       */
3539 /*                                            Valid values are NX_TRUE    */
3540 /*                                            NX_FALSE                    */
3541 /*    will_QoS                              QoS level to be used when     */
3542 /*                                            publishing will message.    */
3543 /*                                            Valid values are 0, 1, 2    */
3544 /*                                                                        */
3545 /*  OUTPUT                                                                */
3546 /*                                                                        */
3547 /*    status                                Completion status             */
3548 /*                                                                        */
3549 /*  CALLS                                                                 */
3550 /*                                                                        */
3551 /*    tx_mutex_get                                                        */
3552 /*                                                                        */
3553 /*  CALLED BY                                                             */
3554 /*                                                                        */
3555 /*    Application Code                                                    */
3556 /*                                                                        */
3557 /*  RELEASE HISTORY                                                       */
3558 /*                                                                        */
3559 /*    DATE              NAME                      DESCRIPTION             */
3560 /*                                                                        */
3561 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3562 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3563 /*                                            resulting in version 6.1    */
3564 /*                                                                        */
3565 /**************************************************************************/
_nxd_mqtt_client_will_message_set(NXD_MQTT_CLIENT * client_ptr,const UCHAR * will_topic,UINT will_topic_length,const UCHAR * will_message,UINT will_message_length,UINT will_retain_flag,UINT will_QoS)3566 UINT _nxd_mqtt_client_will_message_set(NXD_MQTT_CLIENT *client_ptr,
3567                                        const UCHAR *will_topic, UINT will_topic_length, const UCHAR *will_message,
3568                                        UINT will_message_length, UINT will_retain_flag, UINT will_QoS)
3569 {
3570 UINT status;
3571 
3572     if (will_QoS == 2)
3573     {
3574         return(NXD_MQTT_QOS2_NOT_SUPPORTED);
3575     }
3576 
3577     /* Obtain the mutex. */
3578     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3579 
3580     if (status != TX_SUCCESS)
3581     {
3582         return(NXD_MQTT_MUTEX_FAILURE);
3583     }
3584 
3585     client_ptr -> nxd_mqtt_client_will_topic = will_topic;
3586     client_ptr -> nxd_mqtt_client_will_topic_length = will_topic_length;
3587     client_ptr -> nxd_mqtt_client_will_message = will_message;
3588     client_ptr -> nxd_mqtt_client_will_message_length = will_message_length;
3589 
3590     if (will_retain_flag == NX_TRUE)
3591     {
3592         client_ptr -> nxd_mqtt_client_will_qos_retain = 0x80;
3593     }
3594     client_ptr -> nxd_mqtt_client_will_qos_retain = (UCHAR)(client_ptr -> nxd_mqtt_client_will_qos_retain | will_QoS);
3595 
3596     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3597 
3598     return(NX_SUCCESS);
3599 }
3600 
3601 
3602 /**************************************************************************/
3603 /*                                                                        */
3604 /*  FUNCTION                                               RELEASE        */
3605 /*                                                                        */
3606 /*    _nxde_mqtt_client_will_message_set                  PORTABLE C      */
3607 /*                                                           6.1          */
3608 /*  AUTHOR                                                                */
3609 /*                                                                        */
3610 /*    Yuxin Zhou, Microsoft Corporation                                   */
3611 /*                                                                        */
3612 /*  DESCRIPTION                                                           */
3613 /*                                                                        */
3614 /*    This function checks for errors in the                              */
3615 /*    nxd_mqtt_client_will_message_set call.                              */
3616 /*                                                                        */
3617 /*                                                                        */
3618 /*  INPUT                                                                 */
3619 /*                                                                        */
3620 /*    client_ptr                            Pointer to MQTT Client        */
3621 /*    will_topic                            Will topic string.            */
3622 /*    will_topic_length                     Length of the will topic.     */
3623 /*    will_message                          Will message string.          */
3624 /*    will_message_length                   Length of the will message.   */
3625 /*    will_retain_flag                      Whether or not the will       */
3626 /*                                            message is to be retained   */
3627 /*                                            when it is published.       */
3628 /*                                            Valid values are NX_TRUE    */
3629 /*                                            NX_FALSE                    */
3630 /*    will_QoS                              QoS level to be used when     */
3631 /*                                            publishing will message.    */
3632 /*                                            Valid values are 0, 1, 2    */
3633 /*                                                                        */
3634 /*  OUTPUT                                                                */
3635 /*                                                                        */
3636 /*    status                                Completion status             */
3637 /*                                                                        */
3638 /*  CALLS                                                                 */
3639 /*                                                                        */
3640 /*    _nxd_mqtt_client_will_message_set                                   */
3641 /*                                                                        */
3642 /*  CALLED BY                                                             */
3643 /*                                                                        */
3644 /*    Application Code                                                    */
3645 /*                                                                        */
3646 /*  RELEASE HISTORY                                                       */
3647 /*                                                                        */
3648 /*    DATE              NAME                      DESCRIPTION             */
3649 /*                                                                        */
3650 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3651 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3652 /*                                            resulting in version 6.1    */
3653 /*                                                                        */
3654 /**************************************************************************/
_nxde_mqtt_client_will_message_set(NXD_MQTT_CLIENT * client_ptr,const UCHAR * will_topic,UINT will_topic_length,const UCHAR * will_message,UINT will_message_length,UINT will_retain_flag,UINT will_QoS)3655 UINT _nxde_mqtt_client_will_message_set(NXD_MQTT_CLIENT *client_ptr,
3656                                         const UCHAR *will_topic, UINT will_topic_length, const UCHAR *will_message,
3657                                         UINT will_message_length, UINT will_retain_flag, UINT will_QoS)
3658 {
3659 
3660     if (client_ptr == NX_NULL)
3661     {
3662         return(NXD_MQTT_INVALID_PARAMETER);
3663     }
3664 
3665     /* Valid will_topic string.  The will topic string cannot be NULL. */
3666     if ((will_topic == NX_NULL) || (will_topic_length  == 0))
3667     {
3668         return(NXD_MQTT_INVALID_PARAMETER);
3669     }
3670 
3671     if ((will_retain_flag != NX_TRUE) && (will_retain_flag != NX_FALSE))
3672     {
3673         return(NXD_MQTT_INVALID_PARAMETER);
3674     }
3675 
3676     if (will_QoS > 2)
3677     {
3678         return(NXD_MQTT_INVALID_PARAMETER);
3679     }
3680 
3681     return(_nxd_mqtt_client_will_message_set(client_ptr, will_topic, will_topic_length, will_message,
3682                                              will_message_length, will_retain_flag, will_QoS));
3683 }
3684 
3685 /**************************************************************************/
3686 /*                                                                        */
3687 /*  FUNCTION                                               RELEASE        */
3688 /*                                                                        */
3689 /*    _nxde_mqtt_client_login_set                         PORTABLE C      */
3690 /*                                                           6.1          */
3691 /*  AUTHOR                                                                */
3692 /*                                                                        */
3693 /*    Yuxin Zhou, Microsoft Corporation                                   */
3694 /*                                                                        */
3695 /*  DESCRIPTION                                                           */
3696 /*                                                                        */
3697 /*    This function checks for errors in the nxd_mqtt_client_login call.  */
3698 /*                                                                        */
3699 /*                                                                        */
3700 /*  INPUT                                                                 */
3701 /*                                                                        */
3702 /*    client_ptr                            Pointer to MQTT Client        */
3703 /*    username                              User name, or NULL if user    */
3704 /*                                            name is not required        */
3705 /*    username_length                       Length of the user name, or   */
3706 /*                                            0 if user name is NULL      */
3707 /*    password                              Password string, or NULL if   */
3708 /*                                            password is not required    */
3709 /*    password_length                       Length of the password, or    */
3710 /*                                            0 if password is NULL       */
3711 /*                                                                        */
3712 /*  OUTPUT                                                                */
3713 /*                                                                        */
3714 /*    status                                Completion status             */
3715 /*                                                                        */
3716 /*  CALLS                                                                 */
3717 /*                                                                        */
3718 /*    _nxd_mqtt_client_login_set                                          */
3719 /*                                                                        */
3720 /*  CALLED BY                                                             */
3721 /*                                                                        */
3722 /*    Application Code                                                    */
3723 /*                                                                        */
3724 /*  RELEASE HISTORY                                                       */
3725 /*                                                                        */
3726 /*    DATE              NAME                      DESCRIPTION             */
3727 /*                                                                        */
3728 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3729 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3730 /*                                            resulting in version 6.1    */
3731 /*                                                                        */
3732 /**************************************************************************/
_nxde_mqtt_client_login_set(NXD_MQTT_CLIENT * client_ptr,CHAR * username,UINT username_length,CHAR * password,UINT password_length)3733 UINT _nxde_mqtt_client_login_set(NXD_MQTT_CLIENT *client_ptr,
3734                                  CHAR *username, UINT username_length, CHAR *password, UINT password_length)
3735 {
3736     if (client_ptr == NX_NULL)
3737     {
3738         return(NXD_MQTT_INVALID_PARAMETER);
3739     }
3740 
3741     /* Username and username length don't match,
3742        or password and password length don't match. */
3743     if (((username == NX_NULL) && (username_length != 0)) ||
3744         ((password == NX_NULL) && (password_length != 0)))
3745     {
3746         return(NXD_MQTT_INVALID_PARAMETER);
3747     }
3748 
3749     return(_nxd_mqtt_client_login_set(client_ptr, username, username_length, password, password_length));
3750 }
3751 
3752 
3753 /**************************************************************************/
3754 /*                                                                        */
3755 /*  FUNCTION                                               RELEASE        */
3756 /*                                                                        */
3757 /*    _nxd_mqtt_client_retransmit_message                 PORTABLE C      */
3758 /*                                                           6.2.0        */
3759 /*  AUTHOR                                                                */
3760 /*                                                                        */
3761 /*    Yuxin Zhou, Microsoft Corporation                                   */
3762 /*                                                                        */
3763 /*  DESCRIPTION                                                           */
3764 /*                                                                        */
3765 /*    This function retransmit QoS1 messages upon reconnection, if the    */
3766 /*    connection is not set CLEAN_SESSION.                                */
3767 /*                                                                        */
3768 /*                                                                        */
3769 /*  INPUT                                                                 */
3770 /*                                                                        */
3771 /*    client_ptr                            Pointer to MQTT Client        */
3772 /*    wait_option                           Timeout value                 */
3773 /*                                                                        */
3774 /*  OUTPUT                                                                */
3775 /*                                                                        */
3776 /*    status                                Completion status             */
3777 /*                                                                        */
3778 /*  CALLS                                                                 */
3779 /*                                                                        */
3780 /*    tx_mutex_get                                                        */
3781 /*    tx_mutex_put                                                        */
3782 /*    _nxd_mqtt_packet_send                                               */
3783 /*    nx_packet_release                                                   */
3784 /*    tx_time_get                                                         */
3785 /*    nx_packet_copy                                                      */
3786 /*                                                                        */
3787 /*  CALLED BY                                                             */
3788 /*                                                                        */
3789 /*    Application Code                                                    */
3790 /*                                                                        */
3791 /*  RELEASE HISTORY                                                       */
3792 /*                                                                        */
3793 /*    DATE              NAME                      DESCRIPTION             */
3794 /*                                                                        */
3795 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3796 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3797 /*                                            resulting in version 6.1    */
3798 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
3799 /*                                            the logic of sending packet,*/
3800 /*                                            resulting in version 6.2.0  */
3801 /*                                                                        */
3802 /**************************************************************************/
_nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)3803 static UINT _nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
3804 {
3805 NX_PACKET          *transmit_packet_ptr;
3806 NX_PACKET          *packet_ptr;
3807 UINT                status = NXD_MQTT_SUCCESS;
3808 UINT                mutex_status;
3809 UCHAR               fixed_header;
3810 
3811     transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
3812 
3813     while (transmit_packet_ptr)
3814     {
3815         fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
3816 
3817         if ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4))
3818         {
3819 
3820             /* Retransmit publish packet only. */
3821             /* Obtain a NetX Packet. */
3822             status = nx_packet_copy(transmit_packet_ptr, &packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
3823 
3824             if (status != NXD_MQTT_SUCCESS)
3825             {
3826                 return(NXD_MQTT_PACKET_POOL_FAILURE);
3827             }
3828 
3829             /* Release the mutex. */
3830             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3831 
3832             /* Send packet to server.  */
3833             status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
3834 
3835             if (status)
3836             {
3837                 /* Release the packet. */
3838                 nx_packet_release(packet_ptr);
3839 
3840                 status = NXD_MQTT_COMMUNICATION_FAILURE;
3841             }
3842             /* Obtain the mutex. */
3843             mutex_status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, wait_option);
3844 
3845             if (mutex_status != TX_SUCCESS)
3846             {
3847                 return(NXD_MQTT_MUTEX_FAILURE);
3848             }
3849             if (status)
3850             {
3851                 return(status);
3852             }
3853         }
3854         transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
3855     }
3856 
3857     /* Update the timeout value. */
3858     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
3859 
3860     return(status);
3861 }
3862 
3863 /**************************************************************************/
3864 /*                                                                        */
3865 /*  FUNCTION                                               RELEASE        */
3866 /*                                                                        */
3867 /*    _nxd_mqtt_client_connect                            PORTABLE C      */
3868 /*                                                           6.2.0        */
3869 /*  AUTHOR                                                                */
3870 /*                                                                        */
3871 /*    Yuxin Zhou, Microsoft Corporation                                   */
3872 /*                                                                        */
3873 /*  DESCRIPTION                                                           */
3874 /*                                                                        */
3875 /*    This function makes an initial connection to the MQTT server.       */
3876 /*                                                                        */
3877 /*                                                                        */
3878 /*  INPUT                                                                 */
3879 /*                                                                        */
3880 /*    client_ptr                            Pointer to MQTT Client        */
3881 /*    server_ip                             Server IP address structure   */
3882 /*    server_port                           Server port number, in host   */
3883 /*                                            byte order                  */
3884 /*    keepalive                             Keepalive flag                */
3885 /*    clean_session                         Clean session flag            */
3886 /*    wait_option                           Timeout value                 */
3887 /*                                                                        */
3888 /*                                                                        */
3889 /*  OUTPUT                                                                */
3890 /*                                                                        */
3891 /*    status                                Completion status             */
3892 /*                                                                        */
3893 /*  CALLS                                                                 */
3894 /*                                                                        */
3895 /*    tx_mutex_get                                                        */
3896 /*    nx_tcp_socket_create                  Create TCP socket             */
3897 /*    nx_tcp_socket_receive_notify                                        */
3898 /*    nx_tcp_client_socket_bind                                           */
3899 /*    nxd_tcp_client_socket_connect                                       */
3900 /*    nx_tcp_client_socket_unbind                                         */
3901 /*    tx_mutex_put                                                        */
3902 /*    nx_secure_tls_session_start                                         */
3903 /*    nx_secure_tls_session_send                                          */
3904 /*    nx_secure_tls_session_receive                                       */
3905 /*    _nxd_mqtt_packet_allocate                                           */
3906 /*    _nxd_mqtt_client_set_fixed_header                                   */
3907 /*    _nxd_mqtt_client_append_message                                     */
3908 /*    nx_tcp_socket_send                                                  */
3909 /*    nx_packet_release                                                   */
3910 /*    nx_tcp_socket_receive                                               */
3911 /*    tx_event_flag_set                                                   */
3912 /*    _nxd_mqtt_release_transmit_packet                                   */
3913 /*    tx_timer_create                                                     */
3914 /*    nx_secure_tls_session_receive                                       */
3915 /*    _nxd_mqtt_client_connection_end                                     */
3916 /*                                                                        */
3917 /*  CALLED BY                                                             */
3918 /*                                                                        */
3919 /*    Application Code                                                    */
3920 /*                                                                        */
3921 /*  RELEASE HISTORY                                                       */
3922 /*                                                                        */
3923 /*    DATE              NAME                      DESCRIPTION             */
3924 /*                                                                        */
3925 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3926 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
3927 /*                                            fixed return value when it  */
3928 /*                                            is set in CONNACK, corrected*/
3929 /*                                            mqtt client state,          */
3930 /*                                            resulting in version 6.1    */
3931 /*  08-02-2021     Yuxin Zhou               Modified comment(s), and      */
3932 /*                                            corrected the logic for     */
3933 /*                                            non-blocking mode,          */
3934 /*                                            resulting in version 6.1.8  */
3935 /*  07-29-2022     Spencer McDonough        Modified comment(s), and      */
3936 /*                                            improved internal logic,    */
3937 /*                                            resulting in version 6.1.12 */
3938 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
3939 /*                                            mqtt over websocket,        */
3940 /*                                            resulting in version 6.2.0  */
3941 /*                                                                        */
3942 /**************************************************************************/
_nxd_mqtt_client_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT keepalive,UINT clean_session,ULONG wait_option)3943 UINT _nxd_mqtt_client_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
3944                               UINT keepalive, UINT clean_session, ULONG wait_option)
3945 {
3946 NX_PACKET           *packet_ptr;
3947 UINT                 status;
3948 TX_THREAD           *thread_ptr;
3949 UINT                 new_priority;
3950 UINT                 old_priority;
3951 
3952 
3953     /* Obtain the mutex. */
3954     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3955 
3956     if (status != TX_SUCCESS)
3957     {
3958 #ifdef NX_SECURE_ENABLE
3959         if (client_ptr -> nxd_mqtt_client_use_tls)
3960         {
3961             nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
3962         }
3963 #endif /* NX_SECURE_ENABLE */
3964 
3965         return(NXD_MQTT_MUTEX_FAILURE);
3966     }
3967 
3968     /* Do nothing if the client is already connected. */
3969     if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
3970     {
3971         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3972         return(NXD_MQTT_ALREADY_CONNECTED);
3973     }
3974 
3975     /* Check if client is connecting. */
3976     if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTING)
3977     {
3978         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3979         return(NXD_MQTT_CONNECTING);
3980     }
3981 
3982     /* Client state must be in IDLE.  */
3983     if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_IDLE)
3984     {
3985 #ifdef NX_SECURE_ENABLE
3986         if (client_ptr -> nxd_mqtt_client_use_tls)
3987         {
3988             nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
3989         }
3990 #endif /* NX_SECURE_ENABLE */
3991         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3992         return(NXD_MQTT_INVALID_STATE);
3993     }
3994 
3995 #if defined(NX_SECURE_ENABLE) && defined(NXD_MQTT_REQUIRE_TLS)
3996     if (!client_ptr -> nxd_mqtt_client_use_tls)
3997     {
3998 
3999         /* NXD_MQTT_REQUIRE_TLS is defined but the application does not use TLS.
4000            This is security violation.  Return with failure code. */
4001         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4002         return(NXD_MQTT_CONNECT_FAILURE);
4003     }
4004 #endif /* NX_SECURE_ENABLE && NXD_MQTT_REQUIRE_TLS*/
4005 
4006     /* Record the keepalive value, converted to TX timer ticks. */
4007     client_ptr -> nxd_mqtt_keepalive = keepalive * NX_IP_PERIODIC_RATE;
4008     if (keepalive)
4009     {
4010         client_ptr -> nxd_mqtt_timer_value = NXD_MQTT_KEEPALIVE_TIMER_RATE;
4011         client_ptr -> nxd_mqtt_ping_timeout = NXD_MQTT_PING_TIMEOUT_DELAY;
4012 
4013         /* Create timer */
4014         tx_timer_create(&(client_ptr -> nxd_mqtt_timer), "MQTT Timer", _nxd_mqtt_periodic_timer_entry, (ULONG)client_ptr,
4015                         client_ptr -> nxd_mqtt_timer_value, client_ptr -> nxd_mqtt_timer_value, TX_AUTO_ACTIVATE);
4016     }
4017     else
4018     {
4019         client_ptr -> nxd_mqtt_timer_value = 0;
4020         client_ptr -> nxd_mqtt_ping_timeout = 0;
4021     }
4022 
4023     /* Record the clean session flag.  */
4024     client_ptr -> nxd_mqtt_clean_session = clean_session;
4025 
4026     /* Set TCP connection establish notify for non-blocking mode.  */
4027     if (wait_option == 0)
4028     {
4029         nx_tcp_socket_establish_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_tcp_establish_notify);
4030 
4031         /* Set the receive callback. */
4032         nx_tcp_socket_receive_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_receive_callback);
4033 
4034 #ifdef NXD_MQTT_OVER_WEBSOCKET
4035         if (client_ptr -> nxd_mqtt_client_use_websocket)
4036         {
4037 
4038             /* Set the websocket connection callback.  */
4039             nx_websocket_client_connection_status_callback_set(&client_ptr -> nxd_mqtt_client_websocket, client_ptr, _nxd_mqtt_client_websocket_connection_status_callback);
4040         }
4041 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4042     }
4043     else
4044     {
4045 
4046         /* Clean receive callback.  */
4047         client_ptr -> nxd_mqtt_client_socket.nx_tcp_receive_callback = NX_NULL;
4048 
4049 #ifdef NXD_MQTT_OVER_WEBSOCKET
4050         if (client_ptr -> nxd_mqtt_client_use_websocket)
4051         {
4052 
4053             /* Clean the websocket connection callback.  */
4054             nx_websocket_client_connection_status_callback_set(&client_ptr -> nxd_mqtt_client_websocket, NX_NULL, NX_NULL);
4055         }
4056 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4057     }
4058 
4059     /* Release mutex */
4060     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4061 
4062     /* First attempt to bind the client socket. */
4063     nx_tcp_client_socket_bind(&(client_ptr -> nxd_mqtt_client_socket), NX_ANY_PORT, wait_option);
4064 
4065     /* Obtain the mutex. */
4066     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4067 
4068     /* Make state as NXD_MQTT_CLIENT_STATE_CONNECTING. */
4069     client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_CONNECTING;
4070 
4071     /* Release mutex. */
4072     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4073 
4074     /* Connect to the MQTT server */
4075     status = nxd_tcp_client_socket_connect(&(client_ptr -> nxd_mqtt_client_socket), server_ip, server_port, wait_option);
4076     if ((status != NX_SUCCESS) && (status != NX_IN_PROGRESS))
4077     {
4078 
4079         /* Obtain the mutex. */
4080         tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4081 
4082         /* Make state as NXD_MQTT_CLIENT_STATE_IDLE. */
4083         client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
4084 
4085         /* Release mutex. */
4086         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4087 
4088 #ifdef NX_SECURE_ENABLE
4089         if (client_ptr -> nxd_mqtt_client_use_tls)
4090         {
4091             nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
4092         }
4093 #endif /* NX_SECURE_ENABLE */
4094         nx_tcp_client_socket_unbind(&(client_ptr -> nxd_mqtt_client_socket));
4095         tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
4096         return(NXD_MQTT_CONNECT_FAILURE);
4097     }
4098 
4099     /* Just return for non-blocking mode.  */
4100     if (wait_option == 0)
4101     {
4102         return(NX_IN_PROGRESS);
4103     }
4104 
4105     /* Increase priority to the same of internal thread to avoid out of order packet process. */
4106 #ifndef NXD_MQTT_CLOUD_ENABLE
4107     thread_ptr = &(client_ptr -> nxd_mqtt_thread);
4108 #else
4109     thread_ptr = &(client_ptr -> nxd_mqtt_client_cloud_ptr -> nx_cloud_thread);
4110 #endif /* NXD_MQTT_CLOUD_ENABLE */
4111     tx_thread_info_get(thread_ptr, NX_NULL, NX_NULL, NX_NULL,
4112                        &new_priority, NX_NULL, NX_NULL, NX_NULL, NX_NULL);
4113     tx_thread_priority_change(tx_thread_identify(), new_priority, &old_priority);
4114 
4115     /* If TLS is enabled, start TLS */
4116 #ifdef NX_SECURE_ENABLE
4117     if (client_ptr -> nxd_mqtt_client_use_tls)
4118     {
4119 
4120         status = nx_secure_tls_session_start(&(client_ptr -> nxd_mqtt_tls_session), &(client_ptr -> nxd_mqtt_client_socket), wait_option);
4121 
4122         if (status != NX_SUCCESS)
4123         {
4124 
4125             /* Revert thread priority. */
4126             tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4127 
4128             /* End connection. */
4129             _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4130 
4131             return(NXD_MQTT_CONNECT_FAILURE);
4132         }
4133     }
4134 #endif /* NX_SECURE_ENABLE */
4135 
4136 #ifdef NXD_MQTT_OVER_WEBSOCKET
4137     if (client_ptr -> nxd_mqtt_client_use_websocket)
4138     {
4139 #ifdef NX_SECURE_ENABLE
4140         if (client_ptr -> nxd_mqtt_client_use_tls)
4141         {
4142             status = nx_websocket_client_secure_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_tls_session),
4143                                                         client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
4144                                                         client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
4145                                                         (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
4146                                                         wait_option);
4147         }
4148         else
4149 #endif /* NX_SECURE_ENABLE */
4150         {
4151             status = nx_websocket_client_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_client_socket),
4152                                                  client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
4153                                                  client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
4154                                                  (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
4155                                                  wait_option);
4156         }
4157 
4158         if (status != NX_SUCCESS)
4159         {
4160 
4161             /* Revert thread priority. */
4162             tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4163 
4164             /* End connection. */
4165             _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4166 
4167             return(NXD_MQTT_CONNECT_FAILURE);
4168         }
4169     }
4170 
4171 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4172 
4173     /* Start to send connect packet.  */
4174     status = _nxd_mqtt_client_connect_packet_send(client_ptr, wait_option);
4175 
4176     if (status != NX_SUCCESS)
4177     {
4178 
4179         /* Revert thread priority. */
4180         tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4181 
4182         /* End connection. */
4183         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4184         return(NXD_MQTT_CONNECT_FAILURE);
4185     }
4186 
4187     /* Call a receive. */
4188     status = _nxd_mqtt_packet_receive(client_ptr, &packet_ptr, wait_option);
4189 
4190     /* Revert thread priority. */
4191     tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4192 
4193     /* Check status.  */
4194     if (status)
4195     {
4196 
4197         /* End connection. */
4198         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4199         return(NXD_MQTT_COMMUNICATION_FAILURE);
4200     }
4201 
4202     /* Process CONNACK message.  */
4203     status = _nxd_mqtt_process_connack(client_ptr, packet_ptr, wait_option);
4204 
4205     /* Release the packet.  */
4206     nx_packet_release(packet_ptr);
4207 
4208     /* Check status.  */
4209     if (status == NX_SUCCESS)
4210     {
4211 
4212         /* Set the receive callback. */
4213         nx_tcp_socket_receive_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_receive_callback);
4214     }
4215 
4216     return(status);
4217 }
4218 
4219 /**************************************************************************/
4220 /*                                                                        */
4221 /*  FUNCTION                                               RELEASE        */
4222 /*                                                                        */
4223 /*    _nxd_mqtt_client_connect_packet_send                PORTABLE C      */
4224 /*                                                           6.2.0        */
4225 /*  AUTHOR                                                                */
4226 /*                                                                        */
4227 /*    Yuxin Zhou, Microsoft Corporation                                   */
4228 /*                                                                        */
4229 /*  DESCRIPTION                                                           */
4230 /*                                                                        */
4231 /*    This function sends CONNECT packet to MQTT server.                  */
4232 /*                                                                        */
4233 /*                                                                        */
4234 /*  INPUT                                                                 */
4235 /*                                                                        */
4236 /*    client_ptr                            Pointer to MQTT Client        */
4237 /*    keepalive                             Keepalive flag                */
4238 /*    clean_session                         Clean session flag            */
4239 /*    wait_option                           Timeout value                 */
4240 /*                                                                        */
4241 /*                                                                        */
4242 /*  OUTPUT                                                                */
4243 /*                                                                        */
4244 /*    status                                Completion status             */
4245 /*                                                                        */
4246 /*  CALLS                                                                 */
4247 /*                                                                        */
4248 /*    nx_packet_release                                                   */
4249 /*    _nxd_mqtt_packet_allocate                                           */
4250 /*    _nxd_mqtt_release_transmit_packet                                   */
4251 /*    _nxd_mqtt_client_connection_end                                     */
4252 /*    _nxd_mqtt_client_set_fixed_header                                   */
4253 /*    _nxd_mqtt_client_append_message                                     */
4254 /*    _nxd_mqtt_packet_send                                               */
4255 /*                                                                        */
4256 /*  CALLED BY                                                             */
4257 /*                                                                        */
4258 /*    _nxd_mqtt_client_connect                                            */
4259 /*    _nxd_mqtt_tcp_establish_process                                     */
4260 /*    _nxd_mqtt_tls_establish_process                                     */
4261 /*                                                                        */
4262 /*  RELEASE HISTORY                                                       */
4263 /*                                                                        */
4264 /*    DATE              NAME                      DESCRIPTION             */
4265 /*                                                                        */
4266 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4267 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4268 /*                                            resulting in version 6.1    */
4269 /*  07-29-2022     Spencer McDonough        Modified comment(s),          */
4270 /*                                            improved internal logic,    */
4271 /*                                            resulting in version 6.1.12 */
4272 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
4273 /*                                            the logic of sending packet,*/
4274 /*                                            resulting in version 6.2.0  */
4275 /*                                                                        */
4276 /**************************************************************************/
_nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)4277 UINT _nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
4278 {
4279 NX_PACKET           *packet_ptr;
4280 UINT                 status;
4281 UINT                 length = 0;
4282 UCHAR                connection_flags = 0;
4283 UINT                 ret = NXD_MQTT_SUCCESS;
4284 UCHAR                temp_data[4];
4285 UINT                 keepalive = (client_ptr -> nxd_mqtt_keepalive/NX_IP_PERIODIC_RATE);
4286 
4287 
4288     /* Construct connect flags by taking the connect flag user supplies, or'ing the username and
4289        password bits, if they are supplied. */
4290     if (client_ptr -> nxd_mqtt_client_username)
4291     {
4292         connection_flags |= MQTT_CONNECT_FLAGS_USERNAME;
4293 
4294         /* Add the password flag only if username is supplied. */
4295         if (client_ptr -> nxd_mqtt_client_password)
4296         {
4297             connection_flags |= MQTT_CONNECT_FLAGS_PASSWORD;
4298         }
4299     }
4300 
4301     if (client_ptr -> nxd_mqtt_client_will_topic)
4302     {
4303         connection_flags = connection_flags | MQTT_CONNECT_FLAGS_WILL_FLAG;
4304 
4305 
4306         if (client_ptr -> nxd_mqtt_client_will_qos_retain & 0x80)
4307         {
4308             connection_flags = connection_flags | MQTT_CONNECT_FLAGS_WILL_RETAIN;
4309         }
4310 
4311         connection_flags = (UCHAR)(connection_flags | ((client_ptr -> nxd_mqtt_client_will_qos_retain & 0x3) << 3));
4312     }
4313 
4314     if (client_ptr -> nxd_mqtt_clean_session == NX_TRUE)
4315     {
4316         connection_flags = connection_flags | MQTT_CONNECT_FLAGS_CLEAN_SESSION;
4317 
4318         /* Clear any transmit blocks from the previous session. */
4319         while (client_ptr -> message_transmit_queue_head)
4320         {
4321             _nxd_mqtt_release_transmit_packet(client_ptr, client_ptr -> message_transmit_queue_head, NX_NULL);
4322         }
4323     }
4324 
4325     /* Set the length of the packet. */
4326     length = 10;
4327 
4328     /* Add the size of the client Identifier. */
4329     length += (client_ptr -> nxd_mqtt_client_id_length + 2);
4330 
4331     /* Add the will topic, if present. */
4332     if (connection_flags & MQTT_CONNECT_FLAGS_WILL_FLAG)
4333     {
4334         length += (client_ptr -> nxd_mqtt_client_will_topic_length + 2);
4335         length += (client_ptr -> nxd_mqtt_client_will_message_length + 2);
4336     }
4337     if (connection_flags & MQTT_CONNECT_FLAGS_USERNAME)
4338     {
4339         length += (UINT)(client_ptr -> nxd_mqtt_client_username_length + 2);
4340     }
4341     if (connection_flags & MQTT_CONNECT_FLAGS_PASSWORD)
4342     {
4343         length += (UINT)(client_ptr -> nxd_mqtt_client_password_length + 2);
4344     }
4345 
4346     /* Check for invalid length. */
4347     if (length > (127 * 127 * 127 * 127))
4348     {
4349         return(NXD_MQTT_INTERNAL_ERROR);
4350     }
4351 
4352     status = _nxd_mqtt_packet_allocate(client_ptr, &packet_ptr, wait_option);
4353 
4354     if (status)
4355     {
4356         return(status);
4357     }
4358 
4359     /* Construct MQTT CONNECT message. */
4360     temp_data[0] = ((MQTT_CONTROL_PACKET_TYPE_CONNECT << 4) & 0xF0);
4361 
4362     /* Fill in fixed header. */
4363     ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, temp_data[0], length, wait_option);
4364 
4365     if (ret)
4366     {
4367         /* Release the packet. */
4368         nx_packet_release(packet_ptr);
4369         return(NXD_MQTT_PACKET_POOL_FAILURE);
4370     }
4371 
4372     /* Fill in protocol name. */
4373     ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, "MQTT", 4, wait_option);
4374 
4375     if (ret)
4376     {
4377         /* Release the packet. */
4378         nx_packet_release(packet_ptr);
4379         return(NXD_MQTT_PACKET_POOL_FAILURE);
4380     }
4381 
4382     /* Fill in protocol level, */
4383     temp_data[0] = MQTT_PROTOCOL_LEVEL;
4384 
4385     /* Fill in byte 8: connect flags */
4386     temp_data[1] = connection_flags;
4387 
4388     /* Fill in byte 9 and 10: keep alive */
4389     temp_data[2] = (keepalive >> 8) & 0xFF;
4390     temp_data[3] = (keepalive & 0xFF);
4391 
4392     ret = nx_packet_data_append(packet_ptr, temp_data, 4, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4393 
4394     if (ret)
4395     {
4396 
4397         /* Release the packet. */
4398         nx_packet_release(packet_ptr);
4399 
4400         return(NXD_MQTT_PACKET_POOL_FAILURE);
4401     }
4402 
4403     /* Fill in payload area, in the order of: client identifier, will topic, will message,
4404        user name, and password. */
4405     ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_id,
4406                                           client_ptr -> nxd_mqtt_client_id_length, wait_option);
4407 
4408     /* Next fill will topic and will message if the will flag is set. */
4409     if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_WILL_FLAG))
4410     {
4411         ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, (CHAR *)client_ptr -> nxd_mqtt_client_will_topic,
4412                                               client_ptr -> nxd_mqtt_client_will_topic_length, wait_option);
4413 
4414         if (!ret)
4415         {
4416             ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, (CHAR *)client_ptr -> nxd_mqtt_client_will_message,
4417                                                   client_ptr -> nxd_mqtt_client_will_message_length, wait_option);
4418         }
4419     }
4420 
4421     /* Fill username if username flag is set */
4422     if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_USERNAME))
4423     {
4424         ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_username,
4425                                               client_ptr -> nxd_mqtt_client_username_length, wait_option);
4426     }
4427 
4428     /* Fill password if password flag is set */
4429     if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_PASSWORD))
4430     {
4431         ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_password,
4432                                               client_ptr -> nxd_mqtt_client_password_length, wait_option);
4433     }
4434 
4435     if (ret)
4436     {
4437 
4438         /* Release the packet. */
4439         nx_packet_release(packet_ptr);
4440 
4441         return(NXD_MQTT_PACKET_POOL_FAILURE);
4442     }
4443 
4444     /* Ready to send the connect message to the server. */
4445     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
4446 
4447     if (status)
4448     {
4449 
4450         /* Release the packet. */
4451         nx_packet_release(packet_ptr);
4452     }
4453 
4454     /* Update the timeout value. */
4455     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
4456 
4457     return(status);
4458 }
4459 
4460 /**************************************************************************/
4461 /*                                                                        */
4462 /*  FUNCTION                                               RELEASE        */
4463 /*                                                                        */
4464 /*    _nxd_mqtt_client_secure_connect                     PORTABLE C      */
4465 /*                                                           6.1          */
4466 /*  AUTHOR                                                                */
4467 /*                                                                        */
4468 /*    Yuxin Zhou, Microsoft Corporation                                   */
4469 /*                                                                        */
4470 /*  DESCRIPTION                                                           */
4471 /*                                                                        */
4472 /*    This function makes an initial secure (TLS) connection to           */
4473 /*    the MQTT server.                                                    */
4474 /*                                                                        */
4475 /*                                                                        */
4476 /*  INPUT                                                                 */
4477 /*                                                                        */
4478 /*    client_ptr                            Pointer to MQTT Client        */
4479 /*    server_ip                             Server IP address structure   */
4480 /*    server_port                           Server port number, in host   */
4481 /*                                            byte order                  */
4482 /*    tls_setup                             User-supplied callback        */
4483 /*                                            function to set up TLS      */
4484 /*                                            parameters.                 */
4485 /*    username                              User name, or NULL if user    */
4486 /*                                            name is not required        */
4487 /*    username_length                       Length of the user name, or   */
4488 /*                                            0 if user name is NULL      */
4489 /*    password                              Password string, or NULL if   */
4490 /*                                            password is not required    */
4491 /*    password_length                       Length of the password, or    */
4492 /*                                            0 if password is NULL       */
4493 /*    connection_flag                       Flag passed to the server     */
4494 /*    timeout                               Timeout value                 */
4495 /*                                                                        */
4496 /*                                                                        */
4497 /*  OUTPUT                                                                */
4498 /*                                                                        */
4499 /*    status                                Completion status             */
4500 /*                                                                        */
4501 /*  CALLS                                                                 */
4502 /*                                                                        */
4503 /*    _nxd_mqtt_client_connect              Actual MQTT Client connect    */
4504 /*                                            call                        */
4505 /*                                                                        */
4506 /*  CALLED BY                                                             */
4507 /*                                                                        */
4508 /*    Application Code                                                    */
4509 /*                                                                        */
4510 /*  RELEASE HISTORY                                                       */
4511 /*                                                                        */
4512 /*    DATE              NAME                      DESCRIPTION             */
4513 /*                                                                        */
4514 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4515 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4516 /*                                            resulting in version 6.1    */
4517 /*                                                                        */
4518 /**************************************************************************/
4519 #ifdef NX_SECURE_ENABLE
_nxd_mqtt_client_secure_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT (* tls_setup)(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION *,NX_SECURE_X509_CERT *,NX_SECURE_X509_CERT *),UINT keepalive,UINT clean_session,ULONG wait_option)4520 UINT _nxd_mqtt_client_secure_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
4521                                      UINT (*tls_setup)(NXD_MQTT_CLIENT *client_ptr, NX_SECURE_TLS_SESSION *,
4522                                                        NX_SECURE_X509_CERT *, NX_SECURE_X509_CERT *),
4523                                      UINT keepalive, UINT clean_session, ULONG wait_option)
4524 {
4525 UINT ret;
4526 
4527     /* Set up TLS session information. */
4528     ret = (*tls_setup)(client_ptr, &client_ptr -> nxd_mqtt_tls_session,
4529                        &client_ptr -> nxd_mqtt_tls_certificate,
4530                        &client_ptr -> nxd_mqtt_tls_trusted_certificate);
4531 
4532     if (ret)
4533     {
4534         return(ret);
4535     }
4536 
4537     /* Mark the connection as secure. */
4538     client_ptr -> nxd_mqtt_client_use_tls = 1;
4539 
4540     ret = _nxd_mqtt_client_connect(client_ptr, server_ip, server_port, keepalive, clean_session, wait_option);
4541 
4542     return(ret);
4543 }
4544 
4545 #endif /* NX_SECURE_ENABLE */
4546 
4547 
4548 /**************************************************************************/
4549 /*                                                                        */
4550 /*  FUNCTION                                               RELEASE        */
4551 /*                                                                        */
4552 /*    _nxd_mqtt_client_delete                             PORTABLE C      */
4553 /*                                                           6.2.0        */
4554 /*  AUTHOR                                                                */
4555 /*                                                                        */
4556 /*    Yuxin Zhou, Microsoft Corporation                                   */
4557 /*                                                                        */
4558 /*  DESCRIPTION                                                           */
4559 /*                                                                        */
4560 /*    This function deletes a previously created MQTT client instance.    */
4561 /*    If the NXD_MQTT_SOCKET_TIMEOUT is set to NX_WAIT_FOREVER, this may  */
4562 /*    suspend infinitely. This is because the MQTT Client must            */
4563 /*    disconnect with the server, and if the network link is disabled or  */
4564 /*    the server is not responding, this will blocks this function from   */
4565 /*    completing.                                                         */
4566 /*                                                                        */
4567 /*  INPUT                                                                 */
4568 /*                                                                        */
4569 /*    client_ptr                            Pointer to MQTT Client        */
4570 /*                                                                        */
4571 /*  OUTPUT                                                                */
4572 /*                                                                        */
4573 /*    status                                Completion status             */
4574 /*                                                                        */
4575 /*  CALLS                                                                 */
4576 /*                                                                        */
4577 /*    tx_event_flags_set                                                  */
4578 /*                                                                        */
4579 /*  CALLED BY                                                             */
4580 /*                                                                        */
4581 /*    Application Code                                                    */
4582 /*                                                                        */
4583 /*  RELEASE HISTORY                                                       */
4584 /*                                                                        */
4585 /*    DATE              NAME                      DESCRIPTION             */
4586 /*                                                                        */
4587 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4588 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4589 /*                                            resulting in version 6.1    */
4590 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
4591 /*                                            mqtt over websocket,        */
4592 /*                                            resulting in version 6.2.0  */
4593 /*                                                                        */
4594 /**************************************************************************/
_nxd_mqtt_client_delete(NXD_MQTT_CLIENT * client_ptr)4595 UINT _nxd_mqtt_client_delete(NXD_MQTT_CLIENT *client_ptr)
4596 {
4597 
4598 
4599     /* Set the event flag for DELETE. Next time when the MQTT client thread
4600        wakes up, it will perform the deletion process. */
4601 #ifndef NXD_MQTT_CLOUD_ENABLE
4602     tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_DELETE_EVENT, TX_OR);
4603 #else
4604     nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_DELETE_EVENT);
4605 #endif /* NXD_MQTT_CLOUD_ENABLE */
4606 
4607     /* Check if the MQTT client thread has completed. */
4608     while((client_ptr -> nxd_mqtt_client_socket).nx_tcp_socket_id != 0)
4609     {
4610         tx_thread_sleep(NX_IP_PERIODIC_RATE);
4611     }
4612 
4613 #ifndef NXD_MQTT_CLOUD_ENABLE
4614     /* Now we can delete the Client instance. */
4615     tx_thread_delete(&(client_ptr -> nxd_mqtt_thread));
4616 #else
4617     /* Deregister mqtt module from cloud helper.  */
4618     nx_cloud_module_deregister(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module));
4619 
4620     /* Delete own created cloud.  */
4621     if (client_ptr -> nxd_mqtt_client_cloud.nx_cloud_id == NX_CLOUD_ID)
4622         nx_cloud_delete(&(client_ptr -> nxd_mqtt_client_cloud));
4623 #endif /* NXD_MQTT_CLOUD_ENABLE */
4624 
4625 #ifdef NXD_MQTT_OVER_WEBSOCKET
4626     if (client_ptr -> nxd_mqtt_client_use_websocket)
4627     {
4628         nx_websocket_client_delete(&client_ptr -> nxd_mqtt_client_websocket);
4629         client_ptr -> nxd_mqtt_client_use_websocket = NX_FALSE;
4630     }
4631 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4632 
4633     return(NXD_MQTT_SUCCESS);
4634 }
4635 
4636 /**************************************************************************/
4637 /*                                                                        */
4638 /*  FUNCTION                                               RELEASE        */
4639 /*                                                                        */
4640 /*    _nxd_mqtt_client_publish_packet_send                PORTABLE C      */
4641 /*                                                           6.2.0        */
4642 /*  AUTHOR                                                                */
4643 /*                                                                        */
4644 /*    Yuxin Zhou, Microsoft Corporation                                   */
4645 /*                                                                        */
4646 /*  DESCRIPTION                                                           */
4647 /*                                                                        */
4648 /*    This function sends a publish packet to the connected broker.       */
4649 /*                                                                        */
4650 /*                                                                        */
4651 /*  INPUT                                                                 */
4652 /*                                                                        */
4653 /*    client_ptr                            Pointer to MQTT Client        */
4654 /*    packet_ptr                            Pointer to publish packet     */
4655 /*    packet_id                             Current packet ID             */
4656 /*    QoS                                   Quality of service            */
4657 /*    wait_option                           Suspension option             */
4658 /*                                                                        */
4659 /*  OUTPUT                                                                */
4660 /*                                                                        */
4661 /*    status                                Completion status             */
4662 /*                                                                        */
4663 /*  CALLS                                                                 */
4664 /*                                                                        */
4665 /*    tx_mutex_get                                                        */
4666 /*    tx_mutex_put                                                        */
4667 /*    nx_packet_release                                                   */
4668 /*    _nxd_mqtt_copy_transmit_packet                                      */
4669 /*    _nxd_mqtt_packet_send                                               */
4670 /*                                                                        */
4671 /*  CALLED BY                                                             */
4672 /*                                                                        */
4673 /*    _nxd_mqtt_client_publish                                            */
4674 /*                                                                        */
4675 /*  RELEASE HISTORY                                                       */
4676 /*                                                                        */
4677 /*    DATE              NAME                      DESCRIPTION             */
4678 /*                                                                        */
4679 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4680 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4681 /*                                            resulting in version 6.1    */
4682 /*  08-02-2021     Yuxin Zhou               Modified comment(s),          */
4683 /*                                            supported maximum transmit  */
4684 /*                                            queue depth,                */
4685 /*                                            resulting in version 6.1.8  */
4686 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
4687 /*                                            the logic of sending packet,*/
4688 /*                                            resulting in version 6.2.0  */
4689 /*                                                                        */
4690 /**************************************************************************/
_nxd_mqtt_client_publish_packet_send(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,USHORT packet_id,UINT QoS,ULONG wait_option)4691 UINT _nxd_mqtt_client_publish_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr,
4692                                           USHORT packet_id, UINT QoS, ULONG wait_option)
4693 {
4694 
4695 UINT       status;
4696 UINT       ret = NXD_MQTT_SUCCESS;
4697 
4698     if (QoS != 0)
4699     {
4700     /* This packet needs to be stored locally for possible retransmission. */
4701     NX_PACKET *transmit_packet_ptr;
4702 
4703         /* Copy packet for retransmission. */
4704         if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
4705                                            packet_id, NX_TRUE, wait_option))
4706         {
4707             return(NXD_MQTT_PACKET_POOL_FAILURE);
4708         }
4709 
4710         /* Obtain the mutex. */
4711         status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4712 
4713         if (status != TX_SUCCESS)
4714         {
4715             nx_packet_release(transmit_packet_ptr);
4716 
4717 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
4718             /* Decrease the transmit queue depth.  */
4719             client_ptr -> message_transmit_queue_depth--;
4720 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
4721             return(NXD_MQTT_MUTEX_FAILURE);
4722         }
4723 
4724         if (client_ptr -> message_transmit_queue_head == NX_NULL)
4725         {
4726             client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
4727         }
4728         else
4729         {
4730             client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
4731         }
4732         client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
4733     }
4734     else
4735     {
4736 
4737         /* Obtain the mutex. */
4738         status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4739 
4740         if (status != TX_SUCCESS)
4741         {
4742             return(NXD_MQTT_MUTEX_FAILURE);
4743         }
4744     }
4745 
4746     /* Update the timeout value. */
4747     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
4748 
4749 
4750     /* Release the mutex. */
4751     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4752 
4753     /* Ready to send the connect message to the server. */
4754     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
4755 
4756     if (status)
4757     {
4758         ret = NXD_MQTT_COMMUNICATION_FAILURE;
4759     }
4760 
4761     return(ret);
4762 }
4763 
4764 /**************************************************************************/
4765 /*                                                                        */
4766 /*  FUNCTION                                               RELEASE        */
4767 /*                                                                        */
4768 /*    _nxd_mqtt_client_publish                            PORTABLE C      */
4769 /*                                                           6.1.12       */
4770 /*  AUTHOR                                                                */
4771 /*                                                                        */
4772 /*    Yuxin Zhou, Microsoft Corporation                                   */
4773 /*                                                                        */
4774 /*  DESCRIPTION                                                           */
4775 /*                                                                        */
4776 /*    This function publishes a message to the connected broker.          */
4777 /*                                                                        */
4778 /*                                                                        */
4779 /*  INPUT                                                                 */
4780 /*                                                                        */
4781 /*    client_ptr                            Pointer to MQTT Client        */
4782 /*    topic_name                            Name of the topic             */
4783 /*    topic_name_length                     Length of the topic name      */
4784 /*    message                               Message string                */
4785 /*    message_length                        Length of the message,        */
4786 /*                                            in bytes                    */
4787 /*    retain                                The retain flag, whether      */
4788 /*                                            or not the broker should    */
4789 /*                                            store this message          */
4790 /*    QoS                                   Expected QoS level            */
4791 /*    wait_option                           Suspension option             */
4792 /*                                                                        */
4793 /*  OUTPUT                                                                */
4794 /*                                                                        */
4795 /*    status                                Completion status             */
4796 /*                                                                        */
4797 /*  CALLS                                                                 */
4798 /*                                                                        */
4799 /*    tx_mutex_get                                                        */
4800 /*    _nxd_mqtt_packet_allocate                                           */
4801 /*    _nxd_mqtt_client_set_fixed_header                                   */
4802 /*    _nxd_mqtt_client_append_message                                     */
4803 /*    tx_mutex_put                                                        */
4804 /*    nx_packet_release                                                   */
4805 /*    _nxd_mqtt_client_publish_packet_send                                */
4806 /*                                                                        */
4807 /*  CALLED BY                                                             */
4808 /*                                                                        */
4809 /*    Application Code                                                    */
4810 /*                                                                        */
4811 /*  RELEASE HISTORY                                                       */
4812 /*                                                                        */
4813 /*    DATE              NAME                      DESCRIPTION             */
4814 /*                                                                        */
4815 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4816 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4817 /*                                            resulting in version 6.1    */
4818 /*  07-29-2022     Spencer McDonough        Modified comment(s),          */
4819 /*                                            improved internal logic,    */
4820 /*                                            resulting in version 6.1.12 */
4821 /*                                                                        */
4822 /**************************************************************************/
_nxd_mqtt_client_publish(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,CHAR * message,UINT message_length,UINT retain,UINT QoS,ULONG wait_option)4823 UINT _nxd_mqtt_client_publish(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length,
4824                               CHAR *message, UINT message_length, UINT retain, UINT QoS, ULONG wait_option)
4825 {
4826 
4827 NX_PACKET *packet_ptr;
4828 UINT       status;
4829 UINT       length = 0;
4830 UCHAR      flags;
4831 USHORT     packet_id = 0;
4832 UINT       ret = NXD_MQTT_SUCCESS;
4833 
4834     if (QoS == 2)
4835     {
4836         return(NXD_MQTT_QOS2_NOT_SUPPORTED);
4837     }
4838 
4839 
4840     /* Do nothing if the client is already connected. */
4841     if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTED)
4842     {
4843         return(NXD_MQTT_NOT_CONNECTED);
4844     }
4845 
4846     status = _nxd_mqtt_packet_allocate(client_ptr, &packet_ptr, wait_option);
4847 
4848     if (status != NXD_MQTT_SUCCESS)
4849     {
4850         return(NXD_MQTT_PACKET_POOL_FAILURE);
4851     }
4852 
4853     flags = (UCHAR)((MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4) | (QoS << 1));
4854 
4855     if (retain)
4856     {
4857         flags = flags | MQTT_PUBLISH_RETAIN;
4858     }
4859 
4860 
4861     /* Compute the remaining length. */
4862 
4863     /* Topic Name. */
4864     /* Compute Topic Name length. */
4865     length = topic_name_length + 2;
4866 
4867     /* Count packet ID for QoS 1 or QoS 2 message. */
4868     if ((QoS == 1) || (QoS == 2))
4869     {
4870         length += 2;
4871     }
4872 
4873     /* Count message. */
4874     if ((message != NX_NULL) && (message_length != 0))
4875     {
4876         length += message_length;
4877     }
4878 
4879     /* Write out the control header and remaining length field. */
4880     ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, flags, length, wait_option);
4881 
4882     if (ret)
4883     {
4884 
4885         /* Release the packet. */
4886         nx_packet_release(packet_ptr);
4887 
4888         return(NXD_MQTT_INTERNAL_ERROR);
4889     }
4890 
4891 
4892     /* Write out topic */
4893     ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, topic_name, topic_name_length, wait_option);
4894 
4895     if (ret)
4896     {
4897 
4898         /* Release the packet. */
4899         nx_packet_release(packet_ptr);
4900 
4901         return(NXD_MQTT_INTERNAL_ERROR);
4902     }
4903 
4904     /* Append Packet Identifier for QoS level 1 or 2  MQTT 3.3.2.2 */
4905     if ((QoS == 1) || (QoS == 2))
4906     {
4907     UCHAR identifier[2];
4908 
4909         /* Obtain the mutex. */
4910         status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4911 
4912         if (status != TX_SUCCESS)
4913         {
4914             return(NXD_MQTT_MUTEX_FAILURE);
4915         }
4916 
4917         packet_id = (USHORT)client_ptr -> nxd_mqtt_client_packet_identifier;
4918         identifier[0] = (UCHAR)(client_ptr -> nxd_mqtt_client_packet_identifier >> 8);
4919         identifier[1] = (client_ptr -> nxd_mqtt_client_packet_identifier & 0xFF);
4920 
4921         /* Update packet id. */
4922         client_ptr -> nxd_mqtt_client_packet_identifier = (client_ptr -> nxd_mqtt_client_packet_identifier + 1) & 0xFFFF;
4923 
4924         /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
4925         if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
4926             client_ptr -> nxd_mqtt_client_packet_identifier = 1;
4927 
4928         /* Release the mutex. */
4929         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4930 
4931         ret = nx_packet_data_append(packet_ptr, identifier, 2,
4932                                     client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4933 
4934         if (ret)
4935         {
4936 
4937             /* Release the packet. */
4938             nx_packet_release(packet_ptr);
4939 
4940             return(NXD_MQTT_INTERNAL_ERROR);
4941         }
4942     }
4943 
4944     /* Append message. */
4945     if ((message != NX_NULL) && (message_length) != 0)
4946     {
4947 
4948         /* Use nx_packet_data_append to move user-supplied message data into the packet.
4949            nx_packet_data_append uses chained packet if the additional storage space is
4950            needed. */
4951         ret = nx_packet_data_append(packet_ptr, message, message_length,
4952                                        client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4953         if(ret)
4954         {
4955             /* Unable to obtain a new packet to store the message. */
4956 
4957             /* Release the packet. */
4958             nx_packet_release(packet_ptr);
4959 
4960             return(NXD_MQTT_INTERNAL_ERROR);
4961         }
4962     }
4963 
4964     /* Send publish packet. */
4965     ret = _nxd_mqtt_client_publish_packet_send(client_ptr, packet_ptr, packet_id, QoS, wait_option);
4966 
4967     if (ret)
4968     {
4969 
4970         /* Release the packet. */
4971         nx_packet_release(packet_ptr);
4972     }
4973     return(ret);
4974 }
4975 
4976 /**************************************************************************/
4977 /*                                                                        */
4978 /*  FUNCTION                                               RELEASE        */
4979 /*                                                                        */
4980 /*    _nxd_mqtt_client_subscribe                          PORTABLE C      */
4981 /*                                                           6.1.2        */
4982 /*  AUTHOR                                                                */
4983 /*                                                                        */
4984 /*    Yuxin Zhou, Microsoft Corporation                                   */
4985 /*                                                                        */
4986 /*  DESCRIPTION                                                           */
4987 /*                                                                        */
4988 /*    This function sends a subscribe message to the broker.              */
4989 /*                                                                        */
4990 /*                                                                        */
4991 /*  INPUT                                                                 */
4992 /*                                                                        */
4993 /*    client_ptr                            Pointer to MQTT Client        */
4994 /*    topic_name                            Pointer to the topic string   */
4995 /*                                            to subscribe to             */
4996 /*    topic_name_length                     Length of the topic string    */
4997 /*                                            in bytes                    */
4998 /*    QoS                                   Expected QoS level            */
4999 /*                                                                        */
5000 /*  OUTPUT                                                                */
5001 /*                                                                        */
5002 /*    status                                Completion status             */
5003 /*                                                                        */
5004 /*  CALLS                                                                 */
5005 /*                                                                        */
5006 /*    _nxd_mqtt_client_sub_unsub            The actual routine that       */
5007 /*                                            performs the sub/unsub      */
5008 /*                                            action.                     */
5009 /*                                                                        */
5010 /*  CALLED BY                                                             */
5011 /*                                                                        */
5012 /*    Application Code                                                    */
5013 /*                                                                        */
5014 /*  RELEASE HISTORY                                                       */
5015 /*                                                                        */
5016 /*    DATE              NAME                      DESCRIPTION             */
5017 /*                                                                        */
5018 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5019 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5020 /*                                            resulting in version 6.1    */
5021 /*  11-09-2020     Yuxin Zhou               Modified comment(s), and      */
5022 /*                                            added packet id parameter,  */
5023 /*                                            resulting in version 6.1.2  */
5024 /*                                                                        */
5025 /**************************************************************************/
_nxd_mqtt_client_subscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,UINT QoS)5026 UINT _nxd_mqtt_client_subscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length, UINT QoS)
5027 {
5028 
5029     if (QoS == 2)
5030     {
5031         return(NXD_MQTT_QOS2_NOT_SUPPORTED);
5032     }
5033 
5034     return(_nxd_mqtt_client_sub_unsub(client_ptr, (MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02,
5035                                       topic_name, topic_name_length, NX_NULL, QoS));
5036 }
5037 
5038 
5039 
5040 
5041 /**************************************************************************/
5042 /*                                                                        */
5043 /*  FUNCTION                                               RELEASE        */
5044 /*                                                                        */
5045 /*    _nxd_mqtt_client_unsubscribe                        PORTABLE C      */
5046 /*                                                           6.1.2        */
5047 /*  AUTHOR                                                                */
5048 /*                                                                        */
5049 /*    Yuxin Zhou, Microsoft Corporation                                   */
5050 /*                                                                        */
5051 /*  DESCRIPTION                                                           */
5052 /*                                                                        */
5053 /*    This function unsubscribes a topic from the broker.                 */
5054 /*                                                                        */
5055 /*                                                                        */
5056 /*  INPUT                                                                 */
5057 /*                                                                        */
5058 /*    client_ptr                            Pointer to MQTT Client        */
5059 /*    topic_name                            Pointer to the topic string   */
5060 /*                                            to subscribe to             */
5061 /*    topic_name_length                     Length of the topic string    */
5062 /*                                            in bytes                    */
5063 /*                                                                        */
5064 /*  OUTPUT                                                                */
5065 /*                                                                        */
5066 /*    status                                Completion status             */
5067 /*                                                                        */
5068 /*  CALLS                                                                 */
5069 /*                                                                        */
5070 /*    _nxd_mqtt_client_sub_unsub                                          */
5071 /*                                                                        */
5072 /*  CALLED BY                                                             */
5073 /*                                                                        */
5074 /*    Application Code                                                    */
5075 /*                                                                        */
5076 /*  RELEASE HISTORY                                                       */
5077 /*                                                                        */
5078 /*    DATE              NAME                      DESCRIPTION             */
5079 /*                                                                        */
5080 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5081 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5082 /*                                            resulting in version 6.1    */
5083 /*  11-09-2020     Yuxin Zhou               Modified comment(s), and      */
5084 /*                                            added packet id parameter,  */
5085 /*                                            resulting in version 6.1.2  */
5086 /*                                                                        */
5087 /**************************************************************************/
_nxd_mqtt_client_unsubscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length)5088 UINT _nxd_mqtt_client_unsubscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length)
5089 {
5090     return(_nxd_mqtt_client_sub_unsub(client_ptr, (MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE << 4) | 0x02,
5091                                       topic_name, topic_name_length, NX_NULL, 0));
5092 }
5093 
5094 /**************************************************************************/
5095 /*                                                                        */
5096 /*  FUNCTION                                               RELEASE        */
5097 /*                                                                        */
5098 /*    _nxd_mqtt_send_simple_message                       PORTABLE C      */
5099 /*                                                           6.2.0        */
5100 /*  AUTHOR                                                                */
5101 /*                                                                        */
5102 /*    Yuxin Zhou, Microsoft Corporation                                   */
5103 /*                                                                        */
5104 /*  DESCRIPTION                                                           */
5105 /*                                                                        */
5106 /*    This internal function handles the transmission of PINGREQ or       */
5107 /*    DISCONNECT message.                                                 */
5108 /*                                                                        */
5109 /*                                                                        */
5110 /*  INPUT                                                                 */
5111 /*                                                                        */
5112 /*    client_ptr                            Pointer to MQTT Client        */
5113 /*    header_value                          Value to be programmed into   */
5114 /*                                            MQTT header.                */
5115 /*                                                                        */
5116 /*  OUTPUT                                                                */
5117 /*                                                                        */
5118 /*    status                                Completion status             */
5119 /*                                                                        */
5120 /*  CALLS                                                                 */
5121 /*                                                                        */
5122 /*    tx_mutex_get                                                        */
5123 /*    _nxd_mqtt_packet_allocate                                           */
5124 /*    tx_mutex_put                                                        */
5125 /*    _nxd_mqtt_packet_send                                               */
5126 /*    nx_packet_release                                                   */
5127 /*                                                                        */
5128 /*  CALLED BY                                                             */
5129 /*                                                                        */
5130 /*    _nxd_mqtt_client_ping                                               */
5131 /*    _nxd_mqtt_client_disconnect                                         */
5132 /*                                                                        */
5133 /*  RELEASE HISTORY                                                       */
5134 /*                                                                        */
5135 /*    DATE              NAME                      DESCRIPTION             */
5136 /*                                                                        */
5137 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5138 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5139 /*                                            resulting in version 6.1    */
5140 /*  07-29-2022     Spencer McDonough        Modified comment(s),          */
5141 /*                                            improved internal logic,    */
5142 /*                                            resulting in version 6.1.12 */
5143 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
5144 /*                                            the logic of sending packet,*/
5145 /*                                            resulting in version 6.2.0  */
5146 /*                                                                        */
5147 /**************************************************************************/
_nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT * client_ptr,UCHAR header_value)5148 static UINT _nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT *client_ptr, UCHAR header_value)
5149 {
5150 
5151 NX_PACKET *packet_ptr;
5152 UINT       status;
5153 UINT       status_mutex;
5154 UCHAR     *byte;
5155 
5156     status = _nxd_mqtt_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
5157     if (status)
5158     {
5159         return(NXD_MQTT_INTERNAL_ERROR);
5160     }
5161 
5162     if (2u > ((ULONG)(packet_ptr -> nx_packet_data_end) - (ULONG)(packet_ptr -> nx_packet_append_ptr)))
5163     {
5164         nx_packet_release(packet_ptr);
5165 
5166         /* Packet buffer is too small to hold the message. */
5167         return(NX_SIZE_ERROR);
5168     }
5169 
5170     byte = packet_ptr -> nx_packet_prepend_ptr;
5171 
5172     *byte = (UCHAR)(header_value << 4);
5173     *(byte + 1) = 0;
5174 
5175     packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + 2;
5176     packet_ptr -> nx_packet_length = 2;
5177 
5178 
5179     /* Release MQTT protection before making NetX/TLS calls. */
5180     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5181 
5182     /* Send packet to server.  */
5183     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
5184 
5185     status_mutex = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5186     if (status)
5187     {
5188 
5189         /* Release the packet. */
5190         nx_packet_release(packet_ptr);
5191 
5192         status = NXD_MQTT_COMMUNICATION_FAILURE;
5193     }
5194     if (status_mutex)
5195     {
5196         return(NXD_MQTT_MUTEX_FAILURE);
5197     }
5198 
5199     if (header_value == MQTT_CONTROL_PACKET_TYPE_PINGREQ)
5200     {
5201         /* Do not update the ping sent time if the outstanding ping has not been responded yet */
5202         if (client_ptr -> nxd_mqtt_ping_not_responded != 1)
5203         {
5204             /* Record the time we send out the PINGREG */
5205             client_ptr -> nxd_mqtt_ping_sent_time = tx_time_get();
5206             client_ptr -> nxd_mqtt_ping_not_responded = 1;
5207         }
5208     }
5209 
5210     /* Update the timeout value. */
5211     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
5212 
5213     return(status);
5214 }
5215 
5216 
5217 /**************************************************************************/
5218 /*                                                                        */
5219 /*  FUNCTION                                               RELEASE        */
5220 /*                                                                        */
5221 /*    _nxd_mqtt_client_disconnect                         PORTABLE C      */
5222 /*                                                           6.1          */
5223 /*  AUTHOR                                                                */
5224 /*                                                                        */
5225 /*    Yuxin Zhou, Microsoft Corporation                                   */
5226 /*                                                                        */
5227 /*  DESCRIPTION                                                           */
5228 /*                                                                        */
5229 /*    This function disconnects the MQTT client from a server.            */
5230 /*                                                                        */
5231 /*                                                                        */
5232 /*  INPUT                                                                 */
5233 /*                                                                        */
5234 /*    client_ptr                            Pointer to MQTT Client        */
5235 /*                                                                        */
5236 /*  OUTPUT                                                                */
5237 /*                                                                        */
5238 /*    status                                Completion status             */
5239 /*                                                                        */
5240 /*  CALLS                                                                 */
5241 /*                                                                        */
5242 /*    _nxd_mqtt_send_simple_message                                       */
5243 /*    _nxd_mqtt_process_disconnect                                        */
5244 /*                                                                        */
5245 /*  CALLED BY                                                             */
5246 /*                                                                        */
5247 /*    Application Code                                                    */
5248 /*                                                                        */
5249 /*  RELEASE HISTORY                                                       */
5250 /*                                                                        */
5251 /*    DATE              NAME                      DESCRIPTION             */
5252 /*                                                                        */
5253 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5254 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5255 /*                                            resulting in version 6.1    */
5256 /*                                                                        */
5257 /**************************************************************************/
_nxd_mqtt_client_disconnect(NXD_MQTT_CLIENT * client_ptr)5258 UINT _nxd_mqtt_client_disconnect(NXD_MQTT_CLIENT *client_ptr)
5259 {
5260 UINT status;
5261 
5262     /* Obtain the mutex. */
5263     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
5264     if (status != TX_SUCCESS)
5265     {
5266         /* Disable timer if timer has been started. */
5267         if (client_ptr -> nxd_mqtt_keepalive)
5268         {
5269             tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
5270         }
5271 
5272         return(NXD_MQTT_MUTEX_FAILURE);
5273     }
5274 
5275     /* Let the server know we are ending the MQTT session. */
5276     _nxd_mqtt_send_simple_message(client_ptr, MQTT_CONTROL_PACKET_TYPE_DISCONNECT);
5277 
5278     /* Call the disconnect routine to disconnect the socket,
5279        release transmit packets, release received packets,
5280        and delete the client timer. */
5281     _nxd_mqtt_process_disconnect(client_ptr);
5282 
5283     /* Release the mutex. */
5284     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5285 
5286     return(status);
5287 }
5288 
5289 /**************************************************************************/
5290 /*                                                                        */
5291 /*  FUNCTION                                               RELEASE        */
5292 /*                                                                        */
5293 /*    _nxd_mqtt_client_receive_notify_set                 PORTABLE C      */
5294 /*                                                           6.1          */
5295 /*  AUTHOR                                                                */
5296 /*                                                                        */
5297 /*    Yuxin Zhou, Microsoft Corporation                                   */
5298 /*                                                                        */
5299 /*  DESCRIPTION                                                           */
5300 /*                                                                        */
5301 /*    This function installs the MQTT client publish notify callback      */
5302 /*    function.                                                           */
5303 /*                                                                        */
5304 /*                                                                        */
5305 /*  INPUT                                                                 */
5306 /*                                                                        */
5307 /*    client_ptr                            Pointer to MQTT Client        */
5308 /*    mqtt_client_receive_notify            User-supplied callback        */
5309 /*                                            function, which is invoked  */
5310 /*                                            upon receiving a publish    */
5311 /*                                            message.                    */
5312 /*                                                                        */
5313 /*  OUTPUT                                                                */
5314 /*                                                                        */
5315 /*    status                                Completion status             */
5316 /*                                                                        */
5317 /*  CALLS                                                                 */
5318 /*                                                                        */
5319 /*    tx_mutex_get                                                        */
5320 /*    tx_mutex_put                                                        */
5321 /*                                                                        */
5322 /*  CALLED BY                                                             */
5323 /*                                                                        */
5324 /*    Application Code                                                    */
5325 /*                                                                        */
5326 /*  RELEASE HISTORY                                                       */
5327 /*                                                                        */
5328 /*    DATE              NAME                      DESCRIPTION             */
5329 /*                                                                        */
5330 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5331 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5332 /*                                            resulting in version 6.1    */
5333 /*                                                                        */
5334 /**************************************************************************/
_nxd_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* receive_notify)(NXD_MQTT_CLIENT * client_ptr,UINT message_count))5335 UINT _nxd_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT *client_ptr,
5336                                          VOID (*receive_notify)(NXD_MQTT_CLIENT *client_ptr, UINT message_count))
5337 {
5338 
5339     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5340 
5341     client_ptr -> nxd_mqtt_client_receive_notify = receive_notify;
5342 
5343     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5344 
5345     return(NXD_MQTT_SUCCESS);
5346 }
5347 
5348 /**************************************************************************/
5349 /*                                                                        */
5350 /*  FUNCTION                                               RELEASE        */
5351 /*                                                                        */
5352 /*    _nxd_mqtt_client_message_get                        PORTABLE C      */
5353 /*                                                           6.1          */
5354 /*  AUTHOR                                                                */
5355 /*                                                                        */
5356 /*    Yuxin Zhou, Microsoft Corporation                                   */
5357 /*                                                                        */
5358 /*  DESCRIPTION                                                           */
5359 /*                                                                        */
5360 /*    This function retrieves a published MQTT message.                   */
5361 /*                                                                        */
5362 /*                                                                        */
5363 /*  INPUT                                                                 */
5364 /*                                                                        */
5365 /*    client_ptr                            Pointer to MQTT Client        */
5366 /*    topic_buffer                          Pointer to the topic buffer   */
5367 /*                                            where topic is copied to    */
5368 /*    topic_buffer_size                     Size of the topic buffer.     */
5369 /*    actual_topic_length                   Number of bytes copied into   */
5370 /*                                            topic_buffer                */
5371 /*    message_buffer                        Pointer to the buffer where   */
5372 /*                                            message is copied to        */
5373 /*    message_buffer_size                   Size of the message_buffer    */
5374 /*    actual_message_length                 Number of bytes copied into   */
5375 /*                                            the message buffer.         */
5376 /*                                                                        */
5377 /*  OUTPUT                                                                */
5378 /*                                                                        */
5379 /*    status                                Completion status             */
5380 /*                                                                        */
5381 /*  CALLS                                                                 */
5382 /*                                                                        */
5383 /*    _nxd_mqtt_client_disconnect           Actual MQTT Client disconnect */
5384 /*                                            call                        */
5385 /*    _nxd_mqtt_read_remaining_length       Skip the remaining length     */
5386 /*                                            field                       */
5387 /*    tx_mutex_get                                                        */
5388 /*    tx_mutex_put                                                        */
5389 /*                                                                        */
5390 /*  CALLED BY                                                             */
5391 /*                                                                        */
5392 /*    Application Code                                                    */
5393 /*                                                                        */
5394 /*  RELEASE HISTORY                                                       */
5395 /*                                                                        */
5396 /*    DATE              NAME                      DESCRIPTION             */
5397 /*                                                                        */
5398 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5399 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
5400 /*                                            fixed uninitialized value,  */
5401 /*                                            resulting in version 6.1    */
5402 /*                                                                        */
5403 /**************************************************************************/
_nxd_mqtt_client_message_get(NXD_MQTT_CLIENT * client_ptr,UCHAR * topic_buffer,UINT topic_buffer_size,UINT * actual_topic_length,UCHAR * message_buffer,UINT message_buffer_size,UINT * actual_message_length)5404 UINT _nxd_mqtt_client_message_get(NXD_MQTT_CLIENT *client_ptr, UCHAR *topic_buffer, UINT topic_buffer_size, UINT *actual_topic_length,
5405                                   UCHAR *message_buffer, UINT message_buffer_size, UINT *actual_message_length)
5406 {
5407 
5408 UINT                status;
5409 NX_PACKET          *packet_ptr;
5410 ULONG               topic_offset;
5411 USHORT              topic_length;
5412 ULONG               message_offset;
5413 ULONG               message_length;
5414 
5415     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5416     while (client_ptr -> message_receive_queue_depth)
5417     {
5418         packet_ptr = client_ptr -> message_receive_queue_head;
5419         status = _nxd_mqtt_process_publish_packet(packet_ptr, &topic_offset, &topic_length, &message_offset, &message_length);
5420         if (status == NXD_MQTT_SUCCESS)
5421         {
5422             if ((topic_buffer_size < topic_length) ||
5423                 (message_buffer_size < message_length))
5424             {
5425                 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5426                 return(NXD_MQTT_INSUFFICIENT_BUFFER_SPACE);
5427             }
5428         }
5429 
5430         client_ptr -> message_receive_queue_head = packet_ptr -> nx_packet_queue_next;
5431         if (client_ptr -> message_receive_queue_tail == packet_ptr)
5432         {
5433             client_ptr -> message_receive_queue_tail = NX_NULL;
5434         }
5435         client_ptr -> message_receive_queue_depth--;
5436 
5437         if (status == NXD_MQTT_SUCCESS)
5438         {
5439 
5440             /* Set topic and message lengths to avoid uninitialized value. */
5441             *actual_topic_length = 0;
5442             *actual_message_length = 0;
5443             nx_packet_data_extract_offset(packet_ptr, topic_offset, topic_buffer,
5444                                           topic_length, (ULONG *)actual_topic_length);
5445             nx_packet_data_extract_offset(packet_ptr, message_offset, message_buffer,
5446                                           message_length, (ULONG *)actual_message_length);
5447             nx_packet_release(packet_ptr);
5448 
5449             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5450             return(NXD_MQTT_SUCCESS);
5451         }
5452         nx_packet_release(packet_ptr);
5453     }
5454     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5455     return(NXD_MQTT_NO_MESSAGE);
5456 }
5457 
5458 /**************************************************************************/
5459 /*                                                                        */
5460 /*  FUNCTION                                               RELEASE        */
5461 /*                                                                        */
5462 /*    _nxde_mqtt_client_create                            PORTABLE C      */
5463 /*                                                           6.1          */
5464 /*  AUTHOR                                                                */
5465 /*                                                                        */
5466 /*    Yuxin Zhou, Microsoft Corporation                                   */
5467 /*                                                                        */
5468 /*  DESCRIPTION                                                           */
5469 /*                                                                        */
5470 /*    This function checks for errors in nxd_mqt_client_create call.      */
5471 /*                                                                        */
5472 /*                                                                        */
5473 /*  INPUT                                                                 */
5474 /*                                                                        */
5475 /*    client_ptr                            Pointer to MQTT Client        */
5476 /*    client_name                           Name string used in by the    */
5477 /*                                            client                      */
5478 /*    client_id                             Client ID used by the client  */
5479 /*    client_id_length                      Length of Client ID, in bytes */
5480 /*    ip_ptr                                Pointer to IP instance        */
5481 /*    pool_ptr                              Pointer to packet pool        */
5482 /*    stack_ptr                             Client thread's stack pointer */
5483 /*    stack_size                            Client thread's stack size    */
5484 /*    mqtt_thread_priority                  Priority for MQTT thread      */
5485 /*    memory_ptr                            Deprecated and not used       */
5486 /*    memory_size                           Deprecated and not used       */
5487 /*                                                                        */
5488 /*  OUTPUT                                                                */
5489 /*                                                                        */
5490 /*    status                                Completion status             */
5491 /*                                                                        */
5492 /*  CALLS                                                                 */
5493 /*                                                                        */
5494 /*    _nxd_mqtt_client_create               Actual client create call     */
5495 /*                                                                        */
5496 /*  CALLED BY                                                             */
5497 /*                                                                        */
5498 /*    Application Code                                                    */
5499 /*                                                                        */
5500 /*  RELEASE HISTORY                                                       */
5501 /*                                                                        */
5502 /*    DATE              NAME                      DESCRIPTION             */
5503 /*                                                                        */
5504 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5505 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5506 /*                                            resulting in version 6.1    */
5507 /*                                                                        */
5508 /**************************************************************************/
_nxde_mqtt_client_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority,VOID * memory_ptr,ULONG memory_size)5509 UINT _nxde_mqtt_client_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name, CHAR *client_id, UINT client_id_length,
5510                               NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
5511                               VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority,
5512                               VOID *memory_ptr, ULONG memory_size)
5513 {
5514 
5515 
5516     /* Check for invalid input pointers.  */
5517     if ((client_ptr == NX_NULL) || (ip_ptr == NX_NULL) || (ip_ptr -> nx_ip_id != NX_IP_ID) ||
5518         (stack_ptr == NX_NULL) || (stack_size == 0) || (pool_ptr == NX_NULL))
5519     {
5520         return(NX_PTR_ERROR);
5521     }
5522 
5523     return(_nxd_mqtt_client_create(client_ptr, client_name, client_id, client_id_length, ip_ptr,
5524                                    pool_ptr, stack_ptr, stack_size, mqtt_thread_priority,
5525                                    memory_ptr, memory_size));
5526 }
5527 
5528 
5529 /**************************************************************************/
5530 /*                                                                        */
5531 /*  FUNCTION                                               RELEASE        */
5532 /*                                                                        */
5533 /*    _nxde_mqtt_client_connect                           PORTABLE C      */
5534 /*                                                           6.1          */
5535 /*  AUTHOR                                                                */
5536 /*                                                                        */
5537 /*    Yuxin Zhou, Microsoft Corporation                                   */
5538 /*                                                                        */
5539 /*  DESCRIPTION                                                           */
5540 /*                                                                        */
5541 /*    This function checks for errors in MQTT client stop call.           */
5542 /*                                                                        */
5543 /*                                                                        */
5544 /*  INPUT                                                                 */
5545 /*                                                                        */
5546 /*    client_ptr                            Pointer to MQTT Client        */
5547 /*    server_ip                             Server IP address structure   */
5548 /*    server_port                           Server port number, in host   */
5549 /*                                            byte order                  */
5550 /*    keepalive                             The MQTT keepalive timer      */
5551 /*    clean_session                         Clean session flag            */
5552 /*    wait_option                           Suspension option             */
5553 /*                                                                        */
5554 /*                                                                        */
5555 /*  OUTPUT                                                                */
5556 /*                                                                        */
5557 /*    status                                Completion status             */
5558 /*                                                                        */
5559 /*  CALLS                                                                 */
5560 /*                                                                        */
5561 /*    _nxd_mqtt_client_connect              Actual MQTT Client connect    */
5562 /*                                            call                        */
5563 /*                                                                        */
5564 /*  CALLED BY                                                             */
5565 /*                                                                        */
5566 /*    Application Code                                                    */
5567 /*                                                                        */
5568 /*  RELEASE HISTORY                                                       */
5569 /*                                                                        */
5570 /*    DATE              NAME                      DESCRIPTION             */
5571 /*                                                                        */
5572 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5573 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
5574 /*                                            corrected mqtt client state,*/
5575 /*                                            resulting in version 6.1    */
5576 /*                                                                        */
5577 /**************************************************************************/
_nxde_mqtt_client_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT keepalive,UINT clean_session,ULONG wait_option)5578 UINT _nxde_mqtt_client_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
5579                                UINT keepalive, UINT clean_session, ULONG wait_option)
5580 {
5581 
5582 UINT status;
5583 
5584     if (client_ptr == NX_NULL)
5585     {
5586         return(NX_PTR_ERROR);
5587     }
5588 
5589     if (server_ip == NX_NULL)
5590     {
5591         return(NX_PTR_ERROR);
5592     }
5593 
5594     /* Test for IP version flag. */
5595     if ((server_ip -> nxd_ip_version != 4) && (server_ip -> nxd_ip_version != 6))
5596     {
5597         return(NXD_MQTT_INVALID_PARAMETER);
5598     }
5599 
5600     if (server_port == 0)
5601     {
5602         return(NX_INVALID_PORT);
5603     }
5604 
5605     status = _nxd_mqtt_client_connect(client_ptr, server_ip, server_port, keepalive, clean_session, wait_option);
5606 
5607     return(status);
5608 }
5609 
5610 /**************************************************************************/
5611 /*                                                                        */
5612 /*  FUNCTION                                               RELEASE        */
5613 /*                                                                        */
5614 /*    _nxde_mqtt_client_secure_connect                    PORTABLE C      */
5615 /*                                                           6.1          */
5616 /*  AUTHOR                                                                */
5617 /*                                                                        */
5618 /*    Yuxin Zhou, Microsoft Corporation                                   */
5619 /*                                                                        */
5620 /*  DESCRIPTION                                                           */
5621 /*                                                                        */
5622 /*    This function checks for errors in MQTT client TLS secure connect.  */
5623 /*                                                                        */
5624 /*                                                                        */
5625 /*  INPUT                                                                 */
5626 /*                                                                        */
5627 /*    client_ptr                            Pointer to MQTT Client        */
5628 /*    server_ip                             Server IP address structure   */
5629 /*    server_port                           Server port number, in host   */
5630 /*                                            byte order                  */
5631 /*    tls_setup                             User-supplied callback        */
5632 /*                                            function to set up TLS      */
5633 /*                                            parameters.                 */
5634 /*    keepalive                             The MQTT keepalive timer      */
5635 /*    wait_option                           Suspension option             */
5636 /*                                                                        */
5637 /*                                                                        */
5638 /*  OUTPUT                                                                */
5639 /*                                                                        */
5640 /*    status                                Completion status             */
5641 /*                                                                        */
5642 /*  CALLS                                                                 */
5643 /*                                                                        */
5644 /*    _nxd_mqtt_client_connect              Actual MQTT Client connect    */
5645 /*                                            call                        */
5646 /*                                                                        */
5647 /*  CALLED BY                                                             */
5648 /*                                                                        */
5649 /*    Application Code                                                    */
5650 /*                                                                        */
5651 /*  RELEASE HISTORY                                                       */
5652 /*                                                                        */
5653 /*    DATE              NAME                      DESCRIPTION             */
5654 /*                                                                        */
5655 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5656 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
5657 /*                                            corrected mqtt client state,*/
5658 /*                                            resulting in version 6.1    */
5659 /*                                                                        */
5660 /**************************************************************************/
5661 #ifdef NX_SECURE_ENABLE
5662 
_nxde_mqtt_client_secure_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT (* tls_setup)(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION *,NX_SECURE_X509_CERT *,NX_SECURE_X509_CERT *),UINT keepalive,UINT clean_session,ULONG wait_option)5663 UINT _nxde_mqtt_client_secure_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
5664                                       UINT (*tls_setup)(NXD_MQTT_CLIENT *client_ptr, NX_SECURE_TLS_SESSION *,
5665                                                         NX_SECURE_X509_CERT *, NX_SECURE_X509_CERT *),
5666                                       UINT keepalive, UINT clean_session, ULONG wait_option)
5667 {
5668 
5669 UINT status;
5670 
5671     if (client_ptr == NX_NULL)
5672     {
5673         return(NX_PTR_ERROR);
5674     }
5675 
5676     if (server_ip == NX_NULL)
5677     {
5678         return(NX_PTR_ERROR);
5679     }
5680 
5681     if (server_port == 0)
5682     {
5683         return(NX_INVALID_PORT);
5684     }
5685 
5686     status = _nxd_mqtt_client_secure_connect(client_ptr, server_ip, server_port, tls_setup,
5687                                              keepalive, clean_session, wait_option);
5688 
5689     return(status);
5690 }
5691 #endif /* NX_SECURE_ENABLE */
5692 
5693 
5694 /**************************************************************************/
5695 /*                                                                        */
5696 /*  FUNCTION                                               RELEASE        */
5697 /*                                                                        */
5698 /*    _nxde_mqtt_client_delete                            PORTABLE C      */
5699 /*                                                           6.1          */
5700 /*  AUTHOR                                                                */
5701 /*                                                                        */
5702 /*    Yuxin Zhou, Microsoft Corporation                                   */
5703 /*                                                                        */
5704 /*  DESCRIPTION                                                           */
5705 /*                                                                        */
5706 /*    This function checks for errors in MQTT client delete call.         */
5707 /*                                                                        */
5708 /*                                                                        */
5709 /*  INPUT                                                                 */
5710 /*                                                                        */
5711 /*    client_ptr                            Pointer to MQTT Client        */
5712 /*                                                                        */
5713 /*  OUTPUT                                                                */
5714 /*                                                                        */
5715 /*    status                                Completion status             */
5716 /*                                                                        */
5717 /*  CALLS                                                                 */
5718 /*                                                                        */
5719 /*    _nxd_mqtt_client_delete               Actual MQTT Client delete     */
5720 /*                                            call.                       */
5721 /*                                                                        */
5722 /*  CALLED BY                                                             */
5723 /*                                                                        */
5724 /*    Application Code                                                    */
5725 /*                                                                        */
5726 /*  RELEASE HISTORY                                                       */
5727 /*                                                                        */
5728 /*    DATE              NAME                      DESCRIPTION             */
5729 /*                                                                        */
5730 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5731 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5732 /*                                            resulting in version 6.1    */
5733 /*                                                                        */
5734 /**************************************************************************/
_nxde_mqtt_client_delete(NXD_MQTT_CLIENT * client_ptr)5735 UINT _nxde_mqtt_client_delete(NXD_MQTT_CLIENT *client_ptr)
5736 {
5737 
5738 UINT status;
5739 
5740     if (client_ptr == NX_NULL)
5741     {
5742         return(NX_PTR_ERROR);
5743     }
5744 
5745     status = _nxd_mqtt_client_delete(client_ptr);
5746 
5747     return(status);
5748 }
5749 
5750 
5751 
5752 /**************************************************************************/
5753 /*                                                                        */
5754 /*  FUNCTION                                               RELEASE        */
5755 /*                                                                        */
5756 /*    _nxde_mqtt_client_publish                           PORTABLE C      */
5757 /*                                                           6.1          */
5758 /*  AUTHOR                                                                */
5759 /*                                                                        */
5760 /*    Yuxin Zhou, Microsoft Corporation                                   */
5761 /*                                                                        */
5762 /*  DESCRIPTION                                                           */
5763 /*                                                                        */
5764 /*    This function performs error checking to the publish service.       */
5765 /*                                                                        */
5766 /*                                                                        */
5767 /*  INPUT                                                                 */
5768 /*                                                                        */
5769 /*    client_ptr                            Pointer to MQTT Client        */
5770 /*    topic_name                            Name of the topic             */
5771 /*    topic_name_length                     Length of the topic name      */
5772 /*    message                               Message string                */
5773 /*    message_length                        Length of the message,        */
5774 /*                                            in bytes                    */
5775 /*    retain                                The retain flag, whether      */
5776 /*                                            or not the broker should    */
5777 /*                                            store this message          */
5778 /*    QoS                                   Expected QoS level            */
5779 /*    wait_option                           Suspension option             */
5780 /*                                                                        */
5781 /*  OUTPUT                                                                */
5782 /*                                                                        */
5783 /*    status                                Completion status             */
5784 /*                                                                        */
5785 /*  CALLS                                                                 */
5786 /*                                                                        */
5787 /*                                                                        */
5788 /*  CALLED BY                                                             */
5789 /*                                                                        */
5790 /*    Application Code                                                    */
5791 /*                                                                        */
5792 /*  RELEASE HISTORY                                                       */
5793 /*                                                                        */
5794 /*    DATE              NAME                      DESCRIPTION             */
5795 /*                                                                        */
5796 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5797 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5798 /*                                            resulting in version 6.1    */
5799 /*                                                                        */
5800 /**************************************************************************/
_nxde_mqtt_client_publish(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,CHAR * message,UINT message_length,UINT retain,UINT QoS,ULONG wait_option)5801 UINT _nxde_mqtt_client_publish(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length,
5802                                CHAR *message, UINT message_length, UINT retain, UINT QoS, ULONG wait_option)
5803 {
5804     /* Validate client_ptr */
5805     if (client_ptr == NX_NULL)
5806     {
5807         return(NX_PTR_ERROR);
5808     }
5809 
5810     /* Validate topic_name */
5811     if ((topic_name == NX_NULL) || (topic_name_length == 0))
5812     {
5813         return(NXD_MQTT_INVALID_PARAMETER);
5814     }
5815 
5816     /* Validate message length. */
5817     if (message && (message_length == 0))
5818     {
5819         return(NXD_MQTT_INVALID_PARAMETER);
5820     }
5821 
5822     /* Validate QoS value. */
5823     if (QoS > 3)
5824     {
5825         return(NXD_MQTT_INVALID_PARAMETER);
5826     }
5827 
5828     return(_nxd_mqtt_client_publish(client_ptr, topic_name, topic_name_length, message, message_length, retain, QoS, wait_option));
5829 }
5830 
5831 
5832 /**************************************************************************/
5833 /*                                                                        */
5834 /*  FUNCTION                                               RELEASE        */
5835 /*                                                                        */
5836 /*    _nxde_mqtt_client_subscribe                         PORTABLE C      */
5837 /*                                                           6.1          */
5838 /*  AUTHOR                                                                */
5839 /*                                                                        */
5840 /*    Yuxin Zhou, Microsoft Corporation                                   */
5841 /*                                                                        */
5842 /*  DESCRIPTION                                                           */
5843 /*                                                                        */
5844 /*    This function performs error checking to the subscribe service.     */
5845 /*                                                                        */
5846 /*                                                                        */
5847 /*  INPUT                                                                 */
5848 /*                                                                        */
5849 /*    client_ptr                            Pointer to MQTT Client        */
5850 /*    topic_name                            Pointer to the topic string   */
5851 /*                                            to subscribe to             */
5852 /*    topic_name_length                     Length of the topic string    */
5853 /*                                            in bytes                    */
5854 /*                                                                        */
5855 /*  OUTPUT                                                                */
5856 /*                                                                        */
5857 /*    client_ptr                            Pointer to MQTT Client        */
5858 /*    topic_name                            Pointer to the topic string   */
5859 /*                                            to subscribe to             */
5860 /*    topic_name_length                     Length of the topic string    */
5861 /*                                            in bytes                    */
5862 /*    QoS                                   Expected QoS level            */
5863 /*                                                                        */
5864 /*  CALLS                                                                 */
5865 /*                                                                        */
5866 /*    _nxd_mqtt_client_subscribe                                          */
5867 /*                                                                        */
5868 /*  CALLED BY                                                             */
5869 /*                                                                        */
5870 /*    Application Code                                                    */
5871 /*                                                                        */
5872 /*  RELEASE HISTORY                                                       */
5873 /*                                                                        */
5874 /*    DATE              NAME                      DESCRIPTION             */
5875 /*                                                                        */
5876 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5877 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5878 /*                                            resulting in version 6.1    */
5879 /*                                                                        */
5880 /**************************************************************************/
_nxde_mqtt_client_subscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,UINT QoS)5881 UINT _nxde_mqtt_client_subscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length, UINT QoS)
5882 {
5883 
5884 
5885     /* Validate client_ptr */
5886     if (client_ptr == NX_NULL)
5887     {
5888         return(NX_PTR_ERROR);
5889     }
5890 
5891     /* Validate topic_name */
5892     if ((topic_name == NX_NULL) || (topic_name_length == 0))
5893     {
5894         return(NXD_MQTT_INVALID_PARAMETER);
5895     }
5896 
5897     /* Validate QoS value. */
5898     if (QoS > 2)
5899     {
5900         return(NXD_MQTT_INVALID_PARAMETER);
5901     }
5902 
5903     return(_nxd_mqtt_client_subscribe(client_ptr, topic_name, topic_name_length, QoS));
5904 }
5905 
5906 
5907 
5908 
5909 /**************************************************************************/
5910 /*                                                                        */
5911 /*  FUNCTION                                               RELEASE        */
5912 /*                                                                        */
5913 /*    _nxde_mqtt_client_unsubscribe                       PORTABLE C      */
5914 /*                                                           6.1          */
5915 /*  AUTHOR                                                                */
5916 /*                                                                        */
5917 /*    Yuxin Zhou, Microsoft Corporation                                   */
5918 /*                                                                        */
5919 /*  DESCRIPTION                                                           */
5920 /*                                                                        */
5921 /*    This function performs error checking to the unsubscribe service.   */
5922 /*                                                                        */
5923 /*                                                                        */
5924 /*  INPUT                                                                 */
5925 /*                                                                        */
5926 /*    client_ptr                            Pointer to MQTT Client        */
5927 /*    topic_name                            Pointer to the topic string   */
5928 /*                                            to unsubscribe              */
5929 /*    topic_name_length                     Length of the topic string    */
5930 /*                                            in bytes                    */
5931 /*                                                                        */
5932 /*  OUTPUT                                                                */
5933 /*                                                                        */
5934 /*    status                                Completion status             */
5935 /*                                                                        */
5936 /*  CALLS                                                                 */
5937 /*                                                                        */
5938 /*                                                                        */
5939 /*  CALLED BY                                                             */
5940 /*                                                                        */
5941 /*    Application Code                                                    */
5942 /*                                                                        */
5943 /*  RELEASE HISTORY                                                       */
5944 /*                                                                        */
5945 /*    DATE              NAME                      DESCRIPTION             */
5946 /*                                                                        */
5947 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5948 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5949 /*                                            resulting in version 6.1    */
5950 /*                                                                        */
5951 /**************************************************************************/
_nxde_mqtt_client_unsubscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length)5952 UINT _nxde_mqtt_client_unsubscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length)
5953 {
5954     /* Validate client_ptr */
5955     if (client_ptr == NX_NULL)
5956     {
5957         return(NX_PTR_ERROR);
5958     }
5959 
5960     /* Validate topic_name */
5961     if ((topic_name == NX_NULL) || (topic_name_length == 0))
5962     {
5963         return(NXD_MQTT_INVALID_PARAMETER);
5964     }
5965 
5966     return(_nxd_mqtt_client_unsubscribe(client_ptr, topic_name, topic_name_length));
5967 }
5968 
5969 
5970 /**************************************************************************/
5971 /*                                                                        */
5972 /*  FUNCTION                                               RELEASE        */
5973 /*                                                                        */
5974 /*    _nxde_mqtt_client_disconnect                        PORTABLE C      */
5975 /*                                                           6.1          */
5976 /*  AUTHOR                                                                */
5977 /*                                                                        */
5978 /*    Yuxin Zhou, Microsoft Corporation                                   */
5979 /*                                                                        */
5980 /*  DESCRIPTION                                                           */
5981 /*                                                                        */
5982 /*    This function checks for errors in MQTT client disconnect call.     */
5983 /*                                                                        */
5984 /*                                                                        */
5985 /*  INPUT                                                                 */
5986 /*                                                                        */
5987 /*    client_ptr                            Pointer to MQTT Client        */
5988 /*                                                                        */
5989 /*  OUTPUT                                                                */
5990 /*                                                                        */
5991 /*    status                                Completion status             */
5992 /*                                                                        */
5993 /*  CALLS                                                                 */
5994 /*                                                                        */
5995 /*    _nxd_mqtt_client_disconnect           Actual MQTT Client disconnect */
5996 /*                                            call                        */
5997 /*                                                                        */
5998 /*  CALLED BY                                                             */
5999 /*                                                                        */
6000 /*    Application Code                                                    */
6001 /*                                                                        */
6002 /*  RELEASE HISTORY                                                       */
6003 /*                                                                        */
6004 /*    DATE              NAME                      DESCRIPTION             */
6005 /*                                                                        */
6006 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6007 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6008 /*                                            resulting in version 6.1    */
6009 /*                                                                        */
6010 /**************************************************************************/
_nxde_mqtt_client_disconnect(NXD_MQTT_CLIENT * client_ptr)6011 UINT _nxde_mqtt_client_disconnect(NXD_MQTT_CLIENT *client_ptr)
6012 {
6013     /* Validate client_ptr */
6014     if (client_ptr == NX_NULL)
6015     {
6016         return(NX_PTR_ERROR);
6017     }
6018 
6019     return(_nxd_mqtt_client_disconnect(client_ptr));
6020 }
6021 
6022 
6023 /**************************************************************************/
6024 /*                                                                        */
6025 /*  FUNCTION                                               RELEASE        */
6026 /*                                                                        */
6027 /*    _nxde_mqtt_client_message_get                       PORTABLE C      */
6028 /*                                                           6.1          */
6029 /*  AUTHOR                                                                */
6030 /*                                                                        */
6031 /*    Yuxin Zhou, Microsoft Corporation                                   */
6032 /*                                                                        */
6033 /*  DESCRIPTION                                                           */
6034 /*                                                                        */
6035 /*    This function checks for errors in MQTT client message get call.    */
6036 /*                                                                        */
6037 /*                                                                        */
6038 /*  INPUT                                                                 */
6039 /*                                                                        */
6040 /*    client_ptr                            Pointer to MQTT Client        */
6041 /*    topic_buffer                          Pointer to the topic buffer   */
6042 /*                                            where topic is copied to    */
6043 /*    topic_buffer_size                     Size of the topic buffer.     */
6044 /*    actual_topic_length                   Number of bytes copied into   */
6045 /*                                            topic_buffer                */
6046 /*    message_buffer                        Pointer to the buffer where   */
6047 /*                                            message is copied to        */
6048 /*    message_buffer_size                   Size of the message_buffer    */
6049 /*    actual_message_length                 Number of bytes copied into   */
6050 /*                                            the message buffer.         */
6051 /*                                                                        */
6052 /*  OUTPUT                                                                */
6053 /*                                                                        */
6054 /*    status                                Completion status             */
6055 /*                                                                        */
6056 /*  CALLS                                                                 */
6057 /*                                                                        */
6058 /*    _nxd_mqtt_client_message_get                                        */
6059 /*                                                                        */
6060 /*                                                                        */
6061 /*  CALLED BY                                                             */
6062 /*                                                                        */
6063 /*    Application Code                                                    */
6064 /*                                                                        */
6065 /*  RELEASE HISTORY                                                       */
6066 /*                                                                        */
6067 /*    DATE              NAME                      DESCRIPTION             */
6068 /*                                                                        */
6069 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6070 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6071 /*                                            resulting in version 6.1    */
6072 /*                                                                        */
6073 /**************************************************************************/
_nxde_mqtt_client_message_get(NXD_MQTT_CLIENT * client_ptr,UCHAR * topic_buffer,UINT topic_buffer_size,UINT * actual_topic_length,UCHAR * message_buffer,UINT message_buffer_size,UINT * actual_message_length)6074 UINT _nxde_mqtt_client_message_get(NXD_MQTT_CLIENT *client_ptr, UCHAR *topic_buffer, UINT topic_buffer_size, UINT *actual_topic_length,
6075                                    UCHAR *message_buffer, UINT message_buffer_size, UINT *actual_message_length)
6076 {
6077 
6078     /* Validate client_ptr */
6079     if (client_ptr == NX_NULL)
6080     {
6081         return(NX_PTR_ERROR);
6082     }
6083 
6084     /* Topic and topic_length can be NULL if caller does not care the topic string. */
6085 
6086     /* Validate message.  Message_length can be NULL if caller does not care message length. */
6087     if ((message_buffer == NX_NULL) || (topic_buffer == NX_NULL))
6088     {
6089         return(NXD_MQTT_INVALID_PARAMETER);
6090     }
6091 
6092 
6093     return(_nxd_mqtt_client_message_get(client_ptr, topic_buffer, topic_buffer_size, actual_topic_length,
6094                                         message_buffer, message_buffer_size, actual_message_length));
6095 }
6096 
6097 
6098 /**************************************************************************/
6099 /*                                                                        */
6100 /*  FUNCTION                                               RELEASE        */
6101 /*                                                                        */
6102 /*    _nxde_mqtt_client_receive_notify_set                PORTABLE C      */
6103 /*                                                           6.1          */
6104 /*  AUTHOR                                                                */
6105 /*                                                                        */
6106 /*    Yuxin Zhou, Microsoft Corporation                                   */
6107 /*                                                                        */
6108 /*  DESCRIPTION                                                           */
6109 /*                                                                        */
6110 /*    This function checks for errors in MQTT client publish notify call. */
6111 /*                                                                        */
6112 /*                                                                        */
6113 /*  INPUT                                                                 */
6114 /*                                                                        */
6115 /*    client_ptr                            Pointer to MQTT Client        */
6116 /*    receive_notify                        User-supplied callback        */
6117 /*                                            function, which is invoked  */
6118 /*                                            upon receiving a publish    */
6119 /*                                            message.                    */
6120 /*                                                                        */
6121 /*  OUTPUT                                                                */
6122 /*                                                                        */
6123 /*    status                                Completion status             */
6124 /*                                                                        */
6125 /*  CALLS                                                                 */
6126 /*                                                                        */
6127 /*    _nxd_mqtt_client_receive_notify_set                                 */
6128 /*                                                                        */
6129 /*                                                                        */
6130 /*  CALLED BY                                                             */
6131 /*                                                                        */
6132 /*    Application Code                                                    */
6133 /*                                                                        */
6134 /*  RELEASE HISTORY                                                       */
6135 /*                                                                        */
6136 /*    DATE              NAME                      DESCRIPTION             */
6137 /*                                                                        */
6138 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6139 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6140 /*                                            resulting in version 6.1    */
6141 /*                                                                        */
6142 /**************************************************************************/
_nxde_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* receive_notify)(NXD_MQTT_CLIENT * client_ptr,UINT message_count))6143 UINT _nxde_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT *client_ptr,
6144                                           VOID (*receive_notify)(NXD_MQTT_CLIENT *client_ptr, UINT message_count))
6145 {
6146     /* Validate client_ptr */
6147     if (client_ptr == NX_NULL)
6148     {
6149         return(NX_PTR_ERROR);
6150     }
6151 
6152     if (receive_notify == NX_NULL)
6153     {
6154         return(NX_PTR_ERROR);
6155     }
6156 
6157     return(_nxd_mqtt_client_receive_notify_set(client_ptr, receive_notify));
6158 }
6159 
6160 
6161 
6162 /**************************************************************************/
6163 /*                                                                        */
6164 /*  FUNCTION                                               RELEASE        */
6165 /*                                                                        */
6166 /*    _nxd_mqtt_client_disconnect_notify_set              PORTABLE C      */
6167 /*                                                           6.1          */
6168 /*  AUTHOR                                                                */
6169 /*                                                                        */
6170 /*    Yuxin Zhou, Microsoft Corporation                                   */
6171 /*                                                                        */
6172 /*  DESCRIPTION                                                           */
6173 /*                                                                        */
6174 /*    This function sets the notify function for the disconnect event.    */
6175 /*                                                                        */
6176 /*  INPUT                                                                 */
6177 /*                                                                        */
6178 /*    client_ptr                            Pointer to MQTT Client        */
6179 /*    disconnect_notify                     The notify function to be     */
6180 /*                                            used when the client is     */
6181 /*                                            disconnected from the       */
6182 /*                                            server.                     */
6183 /*                                                                        */
6184 /*  OUTPUT                                                                */
6185 /*                                                                        */
6186 /*    status                                Completion status             */
6187 /*                                                                        */
6188 /*  CALLS                                                                 */
6189 /*                                                                        */
6190 /*    None                                                                */
6191 /*                                                                        */
6192 /*  CALLED BY                                                             */
6193 /*                                                                        */
6194 /*    Application Code                                                    */
6195 /*                                                                        */
6196 /*  RELEASE HISTORY                                                       */
6197 /*                                                                        */
6198 /*    DATE              NAME                      DESCRIPTION             */
6199 /*                                                                        */
6200 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6201 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6202 /*                                            resulting in version 6.1    */
6203 /*                                                                        */
6204 /**************************************************************************/
_nxd_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* disconnect_notify)(NXD_MQTT_CLIENT *))6205 UINT _nxd_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT *client_ptr, VOID (*disconnect_notify)(NXD_MQTT_CLIENT *))
6206 {
6207 
6208     client_ptr -> nxd_mqtt_disconnect_notify = disconnect_notify;
6209 
6210     return(NXD_MQTT_SUCCESS);
6211 }
6212 
6213 
6214 /**************************************************************************/
6215 /*                                                                        */
6216 /*  FUNCTION                                               RELEASE        */
6217 /*                                                                        */
6218 /*    _nxde_mqtt_client_disconnect_notify_set             PORTABLE C      */
6219 /*                                                           6.1          */
6220 /*  AUTHOR                                                                */
6221 /*                                                                        */
6222 /*    Yuxin Zhou, Microsoft Corporation                                   */
6223 /*                                                                        */
6224 /*  DESCRIPTION                                                           */
6225 /*                                                                        */
6226 /*    This function checks for errors in setting MQTT client disconnect   */
6227 /*    callback function.                                                  */
6228 /*                                                                        */
6229 /*  INPUT                                                                 */
6230 /*                                                                        */
6231 /*    client_ptr                            Pointer to MQTT Client        */
6232 /*    disconnect_callback                   The callback function to be   */
6233 /*                                            used when an on-going       */
6234 /*                                            connection is disconnected. */
6235 /*                                                                        */
6236 /*  OUTPUT                                                                */
6237 /*                                                                        */
6238 /*    status                                Completion status             */
6239 /*                                                                        */
6240 /*  CALLS                                                                 */
6241 /*                                                                        */
6242 /*    _nxd_mqtt_client_disconnect_notify_set                              */
6243 /*                                                                        */
6244 /*  CALLED BY                                                             */
6245 /*                                                                        */
6246 /*    Application Code                                                    */
6247 /*                                                                        */
6248 /*  RELEASE HISTORY                                                       */
6249 /*                                                                        */
6250 /*    DATE              NAME                      DESCRIPTION             */
6251 /*                                                                        */
6252 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6253 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6254 /*                                            resulting in version 6.1    */
6255 /*                                                                        */
6256 /**************************************************************************/
_nxde_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* disconnect_notify)(NXD_MQTT_CLIENT *))6257 UINT _nxde_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT *client_ptr, VOID (*disconnect_notify)(NXD_MQTT_CLIENT *))
6258 {
6259 
6260     /* Validate client_ptr */
6261     if (client_ptr == NX_NULL)
6262     {
6263         return(NX_PTR_ERROR);
6264     }
6265     return(_nxd_mqtt_client_disconnect_notify_set(client_ptr, disconnect_notify));
6266 }
6267 
6268 
6269 #ifdef NXD_MQTT_CLOUD_ENABLE
6270 /**************************************************************************/
6271 /*                                                                        */
6272 /*  FUNCTION                                               RELEASE        */
6273 /*                                                                        */
6274 /*    _nxd_mqtt_client_cloud_create                       PORTABLE C      */
6275 /*                                                           6.1          */
6276 /*  AUTHOR                                                                */
6277 /*                                                                        */
6278 /*    Yuxin Zhou, Microsoft Corporation                                   */
6279 /*                                                                        */
6280 /*  DESCRIPTION                                                           */
6281 /*                                                                        */
6282 /*    This function creates mqtt client running on cloud helper thread.   */
6283 /*                                                                        */
6284 /*                                                                        */
6285 /*  INPUT                                                                 */
6286 /*                                                                        */
6287 /*    client_ptr                            Pointer to MQTT Client        */
6288 /*    client_name                           Name string used in by the    */
6289 /*                                            client                      */
6290 /*    client_id                             Client ID used by the client  */
6291 /*    client_id_length                      Length of Client ID, in bytes */
6292 /*    ip_ptr                                Pointer to IP instance        */
6293 /*    pool_ptr                              Pointer to packet pool        */
6294 /*    cloud_ptr                             Pointer to Cloud instance     */
6295 /*                                                                        */
6296 /*  OUTPUT                                                                */
6297 /*                                                                        */
6298 /*    status                                Completion status             */
6299 /*                                                                        */
6300 /*  CALLS                                                                 */
6301 /*                                                                        */
6302 /*    _nxd_mqtt_client_create               Actual client create call     */
6303 /*                                                                        */
6304 /*  CALLED BY                                                             */
6305 /*                                                                        */
6306 /*    Application Code                                                    */
6307 /*                                                                        */
6308 /*  RELEASE HISTORY                                                       */
6309 /*                                                                        */
6310 /*    DATE              NAME                      DESCRIPTION             */
6311 /*                                                                        */
6312 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6313 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
6314 /*                                            corrected mqtt client state,*/
6315 /*                                            resulting in version 6.1    */
6316 /*                                                                        */
6317 /**************************************************************************/
_nxd_mqtt_client_cloud_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,NX_CLOUD * cloud_ptr)6318 UINT _nxd_mqtt_client_cloud_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name, CHAR *client_id, UINT client_id_length,
6319                                    NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr, NX_CLOUD *cloud_ptr)
6320 {
6321 
6322 UINT    status;
6323 
6324 
6325     /* Check for invalid input pointers.  */
6326     if ((client_ptr == NX_NULL) || (ip_ptr == NX_NULL) || (ip_ptr -> nx_ip_id != NX_IP_ID) ||
6327         (pool_ptr == NX_NULL) || (cloud_ptr == NX_NULL) || (cloud_ptr -> nx_cloud_id != NX_CLOUD_ID))
6328     {
6329         return(NX_PTR_ERROR);
6330     }
6331 
6332     /* Create MQTT client.  */
6333     status = _nxd_mqtt_client_create_internal(client_ptr, client_name, client_id, client_id_length, ip_ptr,
6334                                               pool_ptr, NX_NULL, 0, 0);
6335 
6336     /* Check status.  */
6337     if (status)
6338     {
6339         return(status);
6340     }
6341 
6342     /* Save the cloud pointer.  */
6343     client_ptr -> nxd_mqtt_client_cloud_ptr = cloud_ptr;
6344 
6345     /* Save the mutex pointer.  */
6346     client_ptr -> nxd_mqtt_client_mutex_ptr = &(cloud_ptr -> nx_cloud_mutex);
6347 
6348     /* Register MQTT on cloud helper.  */
6349     status = nx_cloud_module_register(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module), client_name, NX_CLOUD_MODULE_MQTT_EVENT,
6350                                       _nxd_mqtt_client_event_process, client_ptr);
6351 
6352     /* Determine if an error occurred.  */
6353     if (status != NX_SUCCESS)
6354     {
6355 
6356         /* Delete internal resource created in _nxd_mqtt_client_create_internal().  */
6357 
6358         /* Delete socket.  */
6359         nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
6360 
6361         return(NXD_MQTT_INTERNAL_ERROR);
6362     }
6363 
6364     /* Update state.  */
6365     client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
6366 
6367     return(NXD_MQTT_SUCCESS);
6368 }
6369 #endif /* NXD_MQTT_CLOUD_ENABLE */
6370 
6371 #ifdef NXD_MQTT_OVER_WEBSOCKET
6372 /**************************************************************************/
6373 /*                                                                        */
6374 /*  FUNCTION                                               RELEASE        */
6375 /*                                                                        */
6376 /*    _nxd_mqtt_client_websocket_connection_status_callback               */
6377 /*                                                        PORTABLE C      */
6378 /*                                                           6.2.0        */
6379 /*  AUTHOR                                                                */
6380 /*                                                                        */
6381 /*    Yuxin Zhou, Microsoft Corporation                                   */
6382 /*                                                                        */
6383 /*  DESCRIPTION                                                           */
6384 /*                                                                        */
6385 /*    This function is the websocket connection status callback.          */
6386 /*                                                                        */
6387 /*  INPUT                                                                 */
6388 /*                                                                        */
6389 /*    websocket_client_ptr                  Pointer to websocket client   */
6390 /*    context                               Pointer to MQTT client        */
6391 /*    status                                Websocket connection status   */
6392 /*                                                                        */
6393 /*  OUTPUT                                                                */
6394 /*                                                                        */
6395 /*    None                                                                */
6396 /*                                                                        */
6397 /*  CALLS                                                                 */
6398 /*                                                                        */
6399 /*    _nxd_mqtt_client_connect_packet_send                                */
6400 /*    _nxd_mqtt_client_connection_end                                     */
6401 /*                                                                        */
6402 /*  CALLED BY                                                             */
6403 /*                                                                        */
6404 /*    _nxd_mqtt_client_event_process                                      */
6405 /*                                                                        */
6406 /*  RELEASE HISTORY                                                       */
6407 /*                                                                        */
6408 /*    DATE              NAME                      DESCRIPTION             */
6409 /*                                                                        */
6410 /*  10-31-2022     Yuxin Zhou               Initial Version 6.2.0         */
6411 /*                                                                        */
6412 /**************************************************************************/
_nxd_mqtt_client_websocket_connection_status_callback(NX_WEBSOCKET_CLIENT * websocket_client_ptr,VOID * context,UINT status)6413 VOID _nxd_mqtt_client_websocket_connection_status_callback(NX_WEBSOCKET_CLIENT *websocket_client_ptr, VOID *context, UINT status)
6414 {
6415 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)context;
6416 
6417 
6418     NX_PARAMETER_NOT_USED(websocket_client_ptr);
6419 
6420     if (status == NX_SUCCESS)
6421     {
6422 
6423         /* Start to send MQTT connect packet.  */
6424         status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
6425     }
6426 
6427     /* If an error occurs.  */
6428     if (status)
6429     {
6430 
6431         /* End connection. */
6432         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
6433 
6434         /* Check callback function.  */
6435         if (client_ptr -> nxd_mqtt_connect_notify)
6436         {
6437             client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
6438         }
6439     }
6440 }
6441 
6442 /**************************************************************************/
6443 /*                                                                        */
6444 /*  FUNCTION                                               RELEASE        */
6445 /*                                                                        */
6446 /*    _nxd_mqtt_client_websocket_set                      PORTABLE C      */
6447 /*                                                           6.2.0        */
6448 /*  AUTHOR                                                                */
6449 /*                                                                        */
6450 /*    Yuxin Zhou, Microsoft Corporation                                   */
6451 /*                                                                        */
6452 /*  DESCRIPTION                                                           */
6453 /*                                                                        */
6454 /*    This function sets the websocket.                                   */
6455 /*                                                                        */
6456 /*  INPUT                                                                 */
6457 /*                                                                        */
6458 /*    client_ptr                            Pointer to MQTT Client        */
6459 /*    host                                  Host used by the client       */
6460 /*    host_length                           Length of host, in bytes      */
6461 /*    uri_path                              URI path used by the client   */
6462 /*    uri_path_length                       Length of uri path, in bytes  */
6463 /*                                                                        */
6464 /*  OUTPUT                                                                */
6465 /*                                                                        */
6466 /*    status                                Completion status             */
6467 /*                                                                        */
6468 /*  CALLS                                                                 */
6469 /*                                                                        */
6470 /*    None                                                                */
6471 /*                                                                        */
6472 /*  CALLED BY                                                             */
6473 /*                                                                        */
6474 /*    Application Code                                                    */
6475 /*                                                                        */
6476 /*  RELEASE HISTORY                                                       */
6477 /*                                                                        */
6478 /*    DATE              NAME                      DESCRIPTION             */
6479 /*                                                                        */
6480 /*  10-31-2022     Yuxin Zhou               Initial Version 6.2.0         */
6481 /*                                                                        */
6482 /**************************************************************************/
_nxd_mqtt_client_websocket_set(NXD_MQTT_CLIENT * client_ptr,UCHAR * host,UINT host_length,UCHAR * uri_path,UINT uri_path_length)6483 UINT _nxd_mqtt_client_websocket_set(NXD_MQTT_CLIENT *client_ptr, UCHAR *host, UINT host_length, UCHAR *uri_path, UINT uri_path_length)
6484 {
6485 UINT status;
6486 
6487     /* Obtain the mutex. */
6488     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
6489 
6490     if (status != TX_SUCCESS)
6491     {
6492         return(NXD_MQTT_MUTEX_FAILURE);
6493     }
6494 
6495     /* Set the host info.  */
6496     client_ptr -> nxd_mqtt_client_websocket_host = host;
6497     client_ptr -> nxd_mqtt_client_websocket_host_length = host_length;
6498     client_ptr -> nxd_mqtt_client_websocket_uri_path = uri_path;
6499     client_ptr -> nxd_mqtt_client_websocket_uri_path_length = uri_path_length;
6500 
6501     /* Create WebSocket.  */
6502     status = nx_websocket_client_create(&client_ptr -> nxd_mqtt_client_websocket, (UCHAR *)"",
6503                                         client_ptr -> nxd_mqtt_client_ip_ptr,
6504                                         client_ptr -> nxd_mqtt_client_packet_pool_ptr);
6505 
6506     /* Check status.  */
6507     if (status)
6508     {
6509         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
6510         return(status);
6511     }
6512 
6513     client_ptr -> nxd_mqtt_client_use_websocket = NX_TRUE;
6514 
6515     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
6516     return(NXD_MQTT_SUCCESS);
6517 }
6518 
6519 
6520 /**************************************************************************/
6521 /*                                                                        */
6522 /*  FUNCTION                                               RELEASE        */
6523 /*                                                                        */
6524 /*    _nxde_mqtt_client_websocket_set                     PORTABLE C      */
6525 /*                                                           6.2.0        */
6526 /*  AUTHOR                                                                */
6527 /*                                                                        */
6528 /*    Yuxin Zhou, Microsoft Corporation                                   */
6529 /*                                                                        */
6530 /*  DESCRIPTION                                                           */
6531 /*                                                                        */
6532 /*    This function checks for errors in setting MQTT client websocket.   */
6533 /*                                                                        */
6534 /*  INPUT                                                                 */
6535 /*                                                                        */
6536 /*    client_ptr                            Pointer to MQTT Client        */
6537 /*    host                                  Host used by the client       */
6538 /*    host_length                           Length of host, in bytes      */
6539 /*    uri_path                              URI path used by the client   */
6540 /*    uri_path_length                       Length of uri path, in bytes  */
6541 /*                                                                        */
6542 /*  OUTPUT                                                                */
6543 /*                                                                        */
6544 /*    status                                Completion status             */
6545 /*                                                                        */
6546 /*  CALLS                                                                 */
6547 /*                                                                        */
6548 /*    _nxd_mqtt_client_websocket_set                                      */
6549 /*                                                                        */
6550 /*  CALLED BY                                                             */
6551 /*                                                                        */
6552 /*    Application Code                                                    */
6553 /*                                                                        */
6554 /*  RELEASE HISTORY                                                       */
6555 /*                                                                        */
6556 /*    DATE              NAME                      DESCRIPTION             */
6557 /*                                                                        */
6558 /*  10-31-2022     Yuxin Zhou               Initial Version 6.2.0         */
6559 /*                                                                        */
6560 /**************************************************************************/
_nxde_mqtt_client_websocket_set(NXD_MQTT_CLIENT * client_ptr,UCHAR * host,UINT host_length,UCHAR * uri_path,UINT uri_path_length)6561 UINT _nxde_mqtt_client_websocket_set(NXD_MQTT_CLIENT *client_ptr, UCHAR *host, UINT host_length, UCHAR *uri_path, UINT uri_path_length)
6562 {
6563 
6564     /* Validate the parameters.  */
6565     if ((client_ptr == NX_NULL) || (host == NX_NULL) || (host_length == 0) ||
6566         (uri_path == NX_NULL) || (uri_path_length == 0))
6567     {
6568         return(NX_PTR_ERROR);
6569     }
6570 
6571     return(_nxd_mqtt_client_websocket_set(client_ptr, host, host_length, uri_path, uri_path_length));
6572 }
6573 #endif /* NXD_MQTT_OVER_WEBSOCKET */
6574