1 /**************************************************************************/
2 /*                                                                        */
3 /*       Copyright (c) Microsoft Corporation. All rights reserved.        */
4 /*                                                                        */
5 /*       This software is licensed under the Microsoft Software License   */
6 /*       Terms for Microsoft Azure RTOS. Full text of the license can be  */
7 /*       found in the LICENSE file at https://aka.ms/AzureRTOS_EULA       */
8 /*       and in the root directory of this software.                      */
9 /*                                                                        */
10 /**************************************************************************/
11 
12 
13 /**************************************************************************/
14 /**************************************************************************/
15 /**                                                                       */
16 /** NetX Component                                                        */
17 /**                                                                       */
18 /**   MQTT (MQTT)                                                         */
19 /**                                                                       */
20 /**************************************************************************/
21 /**************************************************************************/
22 
23 #define NXD_MQTT_CLIENT_SOURCE_CODE
24 
25 
26 /* Force error checking to be disabled in this module */
27 
28 #ifndef NX_DISABLE_ERROR_CHECKING
29 #define NX_DISABLE_ERROR_CHECKING
30 #endif
31 
32 
33 /* Include necessary system files.  */
34 
35 #include    "tx_api.h"
36 #include    "nx_api.h"
37 #include    "nx_ip.h"
38 #include    "nxd_mqtt_client.h"
39 
40 /* Bring in externals for caller checking code.  */
41 
42 #define MQTT_ALL_EVENTS               ((ULONG)0xFFFFFFFF)
43 #define MQTT_TIMEOUT_EVENT            ((ULONG)0x00000001)
44 #define MQTT_PACKET_RECEIVE_EVENT     ((ULONG)0x00000002)
45 #define MQTT_START_EVENT              ((ULONG)0x00000004)
46 #define MQTT_DELETE_EVENT             ((ULONG)0x00000008)
47 #define MQTT_PING_TIMEOUT_EVENT       ((ULONG)0x00000010)
48 #define MQTT_NETWORK_DISCONNECT_EVENT ((ULONG)0x00000020)
49 #define MQTT_TCP_ESTABLISH_EVENT      ((ULONG)0x00000040)
50 
51 static UINT _nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
52                                              CHAR *client_id, UINT client_id_length,
53                                              NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
54                                              VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority);
55 static UINT _nxd_mqtt_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UINT wait_option);
56 static UINT _nxd_mqtt_packet_receive(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, UINT wait_option);
57 static UINT _nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET **new_packet_ptr,
58                                            USHORT packet_id, UCHAR set_duplicate_flag, UINT wait_option);
59 static VOID _nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr);
60 static VOID _nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr);
61 static UINT _nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option);
62 static UINT _nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option);
63 
64 /**************************************************************************/
65 /*                                                                        */
66 /*  FUNCTION                                               RELEASE        */
67 /*                                                                        */
68 /*    _nxd_mqtt_client_set_fixed_header                   PORTABLE C      */
69 /*                                                           6.1          */
70 /*  AUTHOR                                                                */
71 /*                                                                        */
72 /*    Yuxin Zhou, Microsoft Corporation                                   */
73 /*                                                                        */
74 /*  DESCRIPTION                                                           */
75 /*                                                                        */
76 /*    This function writes the fixed header filed in the outgoing         */
77 /*    MQTT packet.                                                        */
78 /*                                                                        */
79 /*    This function follows the logic outlined in 2.2 in MQTT             */
80 /*    specification.                                                      */
81 /*                                                                        */
82 /*  INPUT                                                                 */
83 /*                                                                        */
84 /*    client_ptr                            Pointer to MQTT Client        */
85 /*    packet_ptr                            Outgoing MQTT packet          */
86 /*    control_header                        Control byte                  */
87 /*    length                                Remaining length in bytes     */
88 /*    wait_option                           Wait option                   */
89 /*                                                                        */
90 /*  OUTPUT                                                                */
91 /*                                                                        */
92 /*    status                                                              */
93 /*                                                                        */
94 /*  CALLS                                                                 */
95 /*                                                                        */
96 /*    nx_packet_data_append                 Append packet data            */
97 /*                                                                        */
98 /*  CALLED BY                                                             */
99 /*                                                                        */
100 /*    _nxd_mqtt_client_sub_unsub                                          */
101 /*    _nxd_mqtt_client_connect                                            */
102 /*    _nxd_mqtt_client_publish                                            */
103 /*                                                                        */
104 /*  RELEASE HISTORY                                                       */
105 /*                                                                        */
106 /*    DATE              NAME                      DESCRIPTION             */
107 /*                                                                        */
108 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
109 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
110 /*                                            resulting in version 6.1    */
111 /*                                                                        */
112 /**************************************************************************/
_nxd_mqtt_client_set_fixed_header(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,UCHAR control_header,UINT length,UINT wait_option)113 UINT _nxd_mqtt_client_set_fixed_header(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UCHAR control_header, UINT length, UINT wait_option)
114 {
115 UCHAR  fixed_header[5];
116 UCHAR *byte = fixed_header;
117 UINT   count = 0;
118 UINT   ret;
119 
120     *byte = control_header;
121     byte++;
122 
123     do
124     {
125         if (length & 0xFFFFFF80)
126         {
127             *(byte + count) = (UCHAR)((length & 0x7F) | 0x80);
128         }
129         else
130         {
131             *(byte + count) = length & 0x7F;
132         }
133         length = length >> 7;
134 
135         count++;
136     } while (length != 0);
137 
138     ret = nx_packet_data_append(packet_ptr, fixed_header, count + 1,
139                                 client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
140 
141     return(ret);
142 }
143 
144 
145 
146 /**************************************************************************/
147 /*                                                                        */
148 /*  FUNCTION                                               RELEASE        */
149 /*                                                                        */
150 /*    _nxd_mqtt_read_remaining_length                     PORTABLE C      */
151 /*                                                           6.1          */
152 /*  AUTHOR                                                                */
153 /*                                                                        */
154 /*    Yuxin Zhou, Microsoft Corporation                                   */
155 /*                                                                        */
156 /*  DESCRIPTION                                                           */
157 /*                                                                        */
158 /*    This function parses the remaining length filed in the incoming     */
159 /*    MQTT packet.                                                        */
160 /*                                                                        */
161 /*    This function follows the logic outlined in 2.2.3 in MQTT           */
162 /*    specification                                                       */
163 /*                                                                        */
164 /*  INPUT                                                                 */
165 /*                                                                        */
166 /*    packet_ptr                            Incoming MQTT packet.         */
167 /*    remaining_length                      remaining length in bytes,    */
168 /*                                            this is the return value.   */
169 /*    offset                                Pointer to offset of the      */
170 /*                                            remaining data              */
171 /*                                                                        */
172 /*  OUTPUT                                                                */
173 /*                                                                        */
174 /*    status                                                              */
175 /*                                                                        */
176 /*  CALLS                                                                 */
177 /*                                                                        */
178 /*    None                                                                */
179 /*                                                                        */
180 /*  CALLED BY                                                             */
181 /*                                                                        */
182 /*    _nxd_mqtt_process_publish                                           */
183 /*    _nxd_mqtt_client_message_get                                        */
184 /*    _nxd_mqtt_process_sub_unsub_ack                                     */
185 /*                                                                        */
186 /*  RELEASE HISTORY                                                       */
187 /*                                                                        */
188 /*    DATE              NAME                      DESCRIPTION             */
189 /*                                                                        */
190 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
191 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
192 /*                                            resulting in version 6.1    */
193 /*                                                                        */
194 /**************************************************************************/
_nxd_mqtt_read_remaining_length(NX_PACKET * packet_ptr,UINT * remaining_length,ULONG * offset_ptr)195 UINT _nxd_mqtt_read_remaining_length(NX_PACKET *packet_ptr, UINT *remaining_length, ULONG *offset_ptr)
196 {
197 UINT   value = 0;
198 UCHAR  bytes[4] = {0};
199 UINT   multiplier = 1;
200 UINT   byte_count = 0;
201 ULONG  bytes_copied;
202 
203     if (nx_packet_data_extract_offset(packet_ptr, 1, &bytes, sizeof(bytes), &bytes_copied))
204     {
205 
206         /* Packet is incomplete. */
207         return(NXD_MQTT_PARTIAL_PACKET);
208     }
209 
210     do
211     {
212         if (byte_count >= bytes_copied)
213         {
214             if (byte_count == 4)
215             {
216                 return(NXD_MQTT_INTERNAL_ERROR);
217             }
218             else
219             {
220 
221                 /* Packet is incomplete. */
222                 return(NXD_MQTT_PARTIAL_PACKET);
223             }
224         }
225         value += (((bytes[byte_count]) & 0x7F) * multiplier);
226         multiplier = multiplier << 7;
227     } while ((bytes[byte_count++]) & 0x80);
228 
229     if ((1 + byte_count + value) > packet_ptr -> nx_packet_length)
230     {
231 
232         /* Packet is incomplete. */
233         /* Remaining length is larger than packet size. */
234         return(NXD_MQTT_PARTIAL_PACKET);
235     }
236 
237     *remaining_length = value;
238     *offset_ptr = (1 + byte_count);
239 
240     return(NXD_MQTT_SUCCESS);
241 }
242 
243 
244 
245 /**************************************************************************/
246 /*                                                                        */
247 /*  FUNCTION                                               RELEASE        */
248 /*                                                                        */
249 /*    _nxd_mqtt_client_sub_unsub                          PORTABLE C      */
250 /*                                                           6.3.0        */
251 /*  AUTHOR                                                                */
252 /*                                                                        */
253 /*    Yuxin Zhou, Microsoft Corporation                                   */
254 /*                                                                        */
255 /*  DESCRIPTION                                                           */
256 /*                                                                        */
257 /*    This function sends a subscribe or unsubscribe message to the       */
258 /*    broker.                                                             */
259 /*                                                                        */
260 /*                                                                        */
261 /*  INPUT                                                                 */
262 /*                                                                        */
263 /*    client_ptr                            Pointer to MQTT Client        */
264 /*    op                                    Subscribe or Unsubscribe      */
265 /*    topic_name                            Pointer to the topic string   */
266 /*                                            to subscribe to             */
267 /*    topic_name_length                     Length of the topic string    */
268 /*                                            in bytes                    */
269 /*    packet_id_ptr                         Pointer to packet id that     */
270 /*                                            will be filled with         */
271 /*                                            assigned packet id for      */
272 /*                                            sub/unsub message           */
273 /*    QoS                                   Expected QoS level            */
274 /*                                                                        */
275 /*  OUTPUT                                                                */
276 /*                                                                        */
277 /*    status                                Completion status             */
278 /*                                                                        */
279 /*  CALLS                                                                 */
280 /*                                                                        */
281 /*    tx_mutex_get                                                        */
282 /*    _nxd_mqtt_client_packet_allocate                                    */
283 /*    _nxd_mqtt_client_set_fixed_header                                   */
284 /*    _nxd_mqtt_client_append_message                                     */
285 /*    tx_mutex_put                                                        */
286 /*    _nxd_mqtt_packet_send                                               */
287 /*    nx_packet_release                                                   */
288 /*    _nxd_mqtt_copy_transmit_packet                                      */
289 /*                                                                        */
290 /*  CALLED BY                                                             */
291 /*                                                                        */
292 /*    _nxd_mqtt_client_subscribe                                          */
293 /*    _nxd_mqtt_client_unsubscribe                                        */
294 /*                                                                        */
295 /*  RELEASE HISTORY                                                       */
296 /*                                                                        */
297 /*    DATE              NAME                      DESCRIPTION             */
298 /*                                                                        */
299 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
300 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
301 /*                                            resulting in version 6.1    */
302 /*  11-09-2020     Yuxin Zhou               Modified comment(s), and      */
303 /*                                            added packet id parameter,  */
304 /*                                            resulting in version 6.1.2  */
305 /*  07-29-2022     Spencer McDonough        Modified comment(s),          */
306 /*                                            improved internal logic,    */
307 /*                                            resulting in version 6.1.12 */
308 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
309 /*                                            the logic of sending packet,*/
310 /*                                            resulting in version 6.2.0  */
311 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
312 /*                                            internal logic,             */
313 /*                                            resulting in version 6.3.0  */
314 /*                                                                        */
315 /**************************************************************************/
_nxd_mqtt_client_sub_unsub(NXD_MQTT_CLIENT * client_ptr,UINT op,CHAR * topic_name,UINT topic_name_length,USHORT * packet_id_ptr,UINT QoS)316 UINT _nxd_mqtt_client_sub_unsub(NXD_MQTT_CLIENT *client_ptr, UINT op,
317                                 CHAR *topic_name, UINT topic_name_length,
318                                 USHORT *packet_id_ptr, UINT QoS)
319 {
320 
321 
322 NX_PACKET          *packet_ptr;
323 NX_PACKET          *transmit_packet_ptr;
324 UINT                status;
325 UINT                length = 0;
326 UINT                ret = NXD_MQTT_SUCCESS;
327 UCHAR               temp_data[2];
328 
329     /* Obtain the mutex. */
330     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
331 
332     if (status != TX_SUCCESS)
333     {
334         return(NXD_MQTT_MUTEX_FAILURE);
335     }
336 
337     /* Do nothing if the client is already connected. */
338     if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTED)
339     {
340         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
341         return(NXD_MQTT_NOT_CONNECTED);
342     }
343 
344     status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
345     if (status)
346     {
347         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
348         return(status);
349     }
350 
351     /* Compute the remaining length field, starting with 2 bytes of packet ID */
352     length = 2;
353 
354     /* Count the topic. */
355     length += (2 + topic_name_length);
356 
357     if (op == ((MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02))
358     {
359         /* Count one byte for QoS */
360         length++;
361     }
362 
363     /* Write out the control header and remaining length field. */
364     ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, (UCHAR )op, length, NX_WAIT_FOREVER);
365 
366     if (ret)
367     {
368 
369         /* Release the mutex. */
370         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
371 
372         /* Release the packet. */
373         nx_packet_release(packet_ptr);
374 
375         return(NXD_MQTT_PACKET_POOL_FAILURE);
376     }
377 
378     temp_data[0] = (UCHAR)(client_ptr -> nxd_mqtt_client_packet_identifier >> 8);
379     temp_data[1] = (client_ptr -> nxd_mqtt_client_packet_identifier &  0xFF);
380 
381     if (packet_id_ptr)
382     {
383         *packet_id_ptr = (USHORT)(client_ptr -> nxd_mqtt_client_packet_identifier & 0xFFFF);
384     }
385 
386     /* Append packet ID. */
387     ret = nx_packet_data_append(packet_ptr, temp_data, 2, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_WAIT_FOREVER);
388 
389     if (ret)
390     {
391 
392         /* Release the mutex. */
393         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
394 
395         /* Release the packet. */
396         nx_packet_release(packet_ptr);
397 
398         return(NXD_MQTT_PACKET_POOL_FAILURE);
399     }
400 
401     /* Append topic name */
402     ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, topic_name, topic_name_length, NX_WAIT_FOREVER);
403 
404     if (ret)
405     {
406 
407         /* Release the mutex. */
408         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
409 
410         /* Release the packet. */
411         nx_packet_release(packet_ptr);
412 
413         return(NXD_MQTT_PACKET_POOL_FAILURE);
414     }
415 
416     if (op == ((MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02))
417     {
418         /* Fill in QoS value. */
419         temp_data[0] = QoS & 0x3;
420 
421         ret = nx_packet_data_append(packet_ptr, temp_data, 1, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_WAIT_FOREVER);
422 
423         if (ret)
424         {
425 
426             /* Release the mutex. */
427             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
428 
429             /* Release the packet. */
430             nx_packet_release(packet_ptr);
431 
432             return(NXD_MQTT_PACKET_POOL_FAILURE);
433         }
434     }
435 
436     /* Copy packet for retransmission. */
437     if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
438                                        (USHORT)(client_ptr -> nxd_mqtt_client_packet_identifier),
439                                        NX_FALSE, NX_WAIT_FOREVER))
440     {
441         /* Release the mutex. */
442         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
443 
444         /* Release the packet. */
445         nx_packet_release(packet_ptr);
446 
447         return(NXD_MQTT_PACKET_POOL_FAILURE);
448     }
449 
450     if (client_ptr -> message_transmit_queue_head == NX_NULL)
451     {
452         client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
453     }
454     else
455     {
456         client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
457     }
458     client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
459 
460     client_ptr -> nxd_mqtt_client_packet_identifier = (client_ptr -> nxd_mqtt_client_packet_identifier + 1) & 0xFFFF;
461 
462     /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
463     if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
464         client_ptr -> nxd_mqtt_client_packet_identifier = 1;
465 
466     /* Update the timeout value. */
467     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
468 
469     /* Release the mutex. */
470     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
471 
472     /* Ready to send the connect message to the server. */
473     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
474 
475     if (status)
476     {
477         /* Release the packet. */
478         nx_packet_release(packet_ptr);
479 
480         ret = NXD_MQTT_COMMUNICATION_FAILURE;
481     }
482 
483     return(ret);
484 }
485 
486 
487 /**************************************************************************/
488 /*                                                                        */
489 /*  FUNCTION                                               RELEASE        */
490 /*                                                                        */
491 /*    _nxd_mqtt_client_packet_allocate                    PORTABLE C      */
492 /*                                                           6.3.0        */
493 /*  AUTHOR                                                                */
494 /*                                                                        */
495 /*    Yuxin Zhou, Microsoft Corporation                                   */
496 /*                                                                        */
497 /*  DESCRIPTION                                                           */
498 /*                                                                        */
499 /*    This function allocates a packet for transmitting MQTT message.     */
500 /*    Special care has to be taken for accommodating IPv4/IPv6 header,    */
501 /*    and possibly TLS record if TLS is being used. On failure, the       */
502 /*    TLS mutex is released and the caller can simply return.             */
503 /*                                                                        */
504 /*                                                                        */
505 /*  INPUT                                                                 */
506 /*                                                                        */
507 /*    client_ptr                            Pointer to MQTT Client        */
508 /*    packet_ptr                            Allocated packet to be        */
509 /*                                            returned to the caller.     */
510 /*                                                                        */
511 /*  OUTPUT                                                                */
512 /*                                                                        */
513 /*    status                                Completion status             */
514 /*                                                                        */
515 /*  CALLS                                                                 */
516 /*                                                                        */
517 /*    nx_secure_tls_packet_allocate         Allocate packet for MQTT      */
518 /*                                            over TLS socket             */
519 /*    nx_packet_allocate                    Allocate a packet for MQTT    */
520 /*                                            over regular TCP socket     */
521 /*    tx_mutex_put                          Release a mutex               */
522 /*                                                                        */
523 /*  CALLED BY                                                             */
524 /*                                                                        */
525 /*    _nxd_mqtt_process_publish                                           */
526 /*    _nxd_mqtt_client_connect                                            */
527 /*    _nxd_mqtt_client_publish                                            */
528 /*    _nxd_mqtt_client_subscribe                                          */
529 /*    _nxd_mqtt_client_unsubscribe                                        */
530 /*    _nxd_mqtt_client_send_simple_message                                */
531 /*                                                                        */
532 /*  RELEASE HISTORY                                                       */
533 /*                                                                        */
534 /*    DATE              NAME                      DESCRIPTION             */
535 /*                                                                        */
536 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
537 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
538 /*                                            resulting in version 6.1    */
539 /*  07-29-2022     Spencer McDonough        Modified comment(s), and      */
540 /*                                            improved internal logic,    */
541 /*                                            resulting in version 6.1.12 */
542 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
543 /*                                            mqtt over websocket,        */
544 /*                                            resulting in version 6.2.0  */
545 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
546 /*                                            internal logic,             */
547 /*                                            resulting in version 6.3.0  */
548 /*                                                                        */
549 /**************************************************************************/
_nxd_mqtt_client_packet_allocate(NXD_MQTT_CLIENT * client_ptr,NX_PACKET ** packet_ptr,ULONG wait_option)550 UINT _nxd_mqtt_client_packet_allocate(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, ULONG wait_option)
551 {
552 UINT status = NXD_MQTT_SUCCESS;
553 
554 #ifdef NXD_MQTT_OVER_WEBSOCKET
555     if (client_ptr -> nxd_mqtt_client_use_websocket)
556     {
557 
558         /* Use WebSocket packet allocate since it is able to count for WebSocket-related header space */
559         status = nx_websocket_client_packet_allocate(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, wait_option);
560     }
561     else
562 #endif /* NXD_MQTT_OVER_WEBSOCKET */
563 #ifdef NX_SECURE_ENABLE
564     if (client_ptr -> nxd_mqtt_client_use_tls)
565     {
566         /* Use TLS packet allocate.  The TLS packet allocate is able to count for
567            TLS-related header space including crypto initial vector area. */
568         status = nx_secure_tls_packet_allocate(&client_ptr -> nxd_mqtt_tls_session, client_ptr -> nxd_mqtt_client_packet_pool_ptr,
569                                                packet_ptr, wait_option);
570     }
571     /* Allocate a packet  */
572     else
573     {
574 #endif /* NX_SECURE_ENABLE */
575         if (client_ptr -> nxd_mqtt_client_socket.nx_tcp_socket_connect_ip.nxd_ip_version == NX_IP_VERSION_V4)
576         {
577             status = nx_packet_allocate(client_ptr -> nxd_mqtt_client_packet_pool_ptr, packet_ptr, NX_IPv4_TCP_PACKET,
578                                         wait_option);
579         }
580         else
581         {
582             status = nx_packet_allocate(client_ptr -> nxd_mqtt_client_packet_pool_ptr, packet_ptr, NX_IPv6_TCP_PACKET,
583                                         wait_option);
584         }
585 #ifdef NX_SECURE_ENABLE
586     }
587 #endif /* NX_SECURE_ENABLE */
588 
589     if (status != NX_SUCCESS)
590     {
591         return(NXD_MQTT_PACKET_POOL_FAILURE);
592     }
593 
594     return(NXD_MQTT_SUCCESS);
595 }
596 
597 /**************************************************************************/
598 /*                                                                        */
599 /*  FUNCTION                                               RELEASE        */
600 /*                                                                        */
601 /*    _nxd_mqtt_packet_send                               PORTABLE C      */
602 /*                                                           6.2.0        */
603 /*  AUTHOR                                                                */
604 /*                                                                        */
605 /*    Yuxin Zhou, Microsoft Corporation                                   */
606 /*                                                                        */
607 /*  DESCRIPTION                                                           */
608 /*                                                                        */
609 /*    This function sends out a packet.                                   */
610 /*                                                                        */
611 /*                                                                        */
612 /*  INPUT                                                                 */
613 /*                                                                        */
614 /*    client_ptr                            Pointer to MQTT Client        */
615 /*    packet_ptr                            Pointer to packet             */
616 /*    wait_option                           Timeout value                 */
617 /*                                                                        */
618 /*  OUTPUT                                                                */
619 /*                                                                        */
620 /*    status                                Completion status             */
621 /*                                                                        */
622 /*  CALLS                                                                 */
623 /*                                                                        */
624 /*    nx_websocket_client_send                                            */
625 /*    nx_secure_tls_session_send                                          */
626 /*    nx_tcp_socket_send                                                  */
627 /*                                                                        */
628 /*  CALLED BY                                                             */
629 /*                                                                        */
630 /*                                                                        */
631 /*  RELEASE HISTORY                                                       */
632 /*                                                                        */
633 /*    DATE              NAME                      DESCRIPTION             */
634 /*                                                                        */
635 /*  10-31-2022     Bo Chen                  Initial Version 6.2.0         */
636 /*                                                                        */
637 /**************************************************************************/
_nxd_mqtt_packet_send(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,UINT wait_option)638 static UINT _nxd_mqtt_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, UINT wait_option)
639 {
640 UINT status = NXD_MQTT_SUCCESS;
641 
642 #ifdef NXD_MQTT_OVER_WEBSOCKET
643     if (client_ptr -> nxd_mqtt_client_use_websocket)
644     {
645         status = nx_websocket_client_send(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, NX_WEBSOCKET_OPCODE_BINARY_FRAME, NX_TRUE, wait_option);
646     }
647     else
648 #endif /* NXD_MQTT_OVER_WEBSOCKET */
649 
650 #ifdef NX_SECURE_ENABLE
651     if (client_ptr -> nxd_mqtt_client_use_tls)
652     {
653         status = nx_secure_tls_session_send(&(client_ptr -> nxd_mqtt_tls_session), packet_ptr, wait_option);
654     }
655     else
656 #endif /* NX_SECURE_ENABLE */
657     {
658         status = nx_tcp_socket_send(&client_ptr -> nxd_mqtt_client_socket, packet_ptr, wait_option);
659     }
660 
661     return(status);
662 }
663 
664 /**************************************************************************/
665 /*                                                                        */
666 /*  FUNCTION                                               RELEASE        */
667 /*                                                                        */
668 /*    _nxd_mqtt_packet_receive                            PORTABLE C      */
669 /*                                                           6.2.0        */
670 /*  AUTHOR                                                                */
671 /*                                                                        */
672 /*    Yuxin Zhou, Microsoft Corporation                                   */
673 /*                                                                        */
674 /*  DESCRIPTION                                                           */
675 /*                                                                        */
676 /*    This function receives a packet.                                    */
677 /*                                                                        */
678 /*                                                                        */
679 /*  INPUT                                                                 */
680 /*                                                                        */
681 /*    client_ptr                            Pointer to MQTT Client        */
682 /*    packet_ptr                            Pointer to packet             */
683 /*    wait_option                           Timeout value                 */
684 /*                                                                        */
685 /*  OUTPUT                                                                */
686 /*                                                                        */
687 /*    status                                Completion status             */
688 /*                                                                        */
689 /*  CALLS                                                                 */
690 /*                                                                        */
691 /*    nx_websocket_client_receive                                         */
692 /*    nx_secure_tls_session_receive                                       */
693 /*    nx_tcp_socket_receive                                               */
694 /*                                                                        */
695 /*  CALLED BY                                                             */
696 /*                                                                        */
697 /*                                                                        */
698 /*  RELEASE HISTORY                                                       */
699 /*                                                                        */
700 /*    DATE              NAME                      DESCRIPTION             */
701 /*                                                                        */
702 /*  10-31-2022     Bo Chen                  Initial Version 6.2.0         */
703 /*                                                                        */
704 /**************************************************************************/
_nxd_mqtt_packet_receive(NXD_MQTT_CLIENT * client_ptr,NX_PACKET ** packet_ptr,UINT wait_option)705 static UINT _nxd_mqtt_packet_receive(NXD_MQTT_CLIENT *client_ptr, NX_PACKET **packet_ptr, UINT wait_option)
706 {
707 UINT status = NXD_MQTT_SUCCESS;
708 #ifdef NXD_MQTT_OVER_WEBSOCKET
709 UINT op_code = 0;
710 #endif /* NXD_MQTT_OVER_WEBSOCKET*/
711 
712 #ifdef NXD_MQTT_OVER_WEBSOCKET
713     if (client_ptr -> nxd_mqtt_client_use_websocket)
714     {
715         status = nx_websocket_client_receive(&(client_ptr -> nxd_mqtt_client_websocket), packet_ptr, &op_code, wait_option);
716         if ((status == NX_SUCCESS) && (op_code != NX_WEBSOCKET_OPCODE_BINARY_FRAME))
717         {
718             return(NX_INVALID_PACKET);
719         }
720     }
721     else
722 #endif /* NXD_MQTT_OVER_WEBSOCKET */
723 
724 #ifdef NX_SECURE_ENABLE
725     if (client_ptr -> nxd_mqtt_client_use_tls)
726     {
727         status = nx_secure_tls_session_receive(&(client_ptr -> nxd_mqtt_tls_session), packet_ptr, wait_option);
728     }
729     else
730 #endif /* NX_SECURE_ENABLE */
731     {
732         status = nx_tcp_socket_receive(&(client_ptr -> nxd_mqtt_client_socket), packet_ptr, wait_option);
733     }
734 
735     return(status);
736 }
737 
738 /**************************************************************************/
739 /*                                                                        */
740 /*  FUNCTION                                               RELEASE        */
741 /*                                                                        */
742 /*    _nxd_mqtt_tcp_establish_notify                      PORTABLE C      */
743 /*                                                           6.1          */
744 /*  AUTHOR                                                                */
745 /*                                                                        */
746 /*    Yuxin Zhou, Microsoft Corporation                                   */
747 /*                                                                        */
748 /*  DESCRIPTION                                                           */
749 /*                                                                        */
750 /*    This internal function is installed as TCP connection establish     */
751 /*    callback function.                                                  */
752 /*                                                                        */
753 /*  INPUT                                                                 */
754 /*                                                                        */
755 /*    socket_ptr                            The socket that receives      */
756 /*                                           the message.                 */
757 /*                                                                        */
758 /*  OUTPUT                                                                */
759 /*                                                                        */
760 /*    None                                                                */
761 /*                                                                        */
762 /*  CALLS                                                                 */
763 /*                                                                        */
764 /*    tx_event_flags_set                                                  */
765 /*                                                                        */
766 /*  CALLED BY                                                             */
767 /*                                                                        */
768 /*    _nxd_mqtt_thread_entry                                              */
769 /*    _nxd_mqtt_client_event_process                                      */
770 /*                                                                        */
771 /*  RELEASE HISTORY                                                       */
772 /*                                                                        */
773 /*    DATE              NAME                      DESCRIPTION             */
774 /*                                                                        */
775 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
776 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
777 /*                                            resulting in version 6.1    */
778 /*                                                                        */
779 /**************************************************************************/
_nxd_mqtt_tcp_establish_notify(NX_TCP_SOCKET * socket_ptr)780 static VOID _nxd_mqtt_tcp_establish_notify(NX_TCP_SOCKET *socket_ptr)
781 {
782 NXD_MQTT_CLIENT *client_ptr;
783 
784     client_ptr = (NXD_MQTT_CLIENT *)socket_ptr -> nx_tcp_socket_reserved_ptr;
785 
786     if (&(client_ptr -> nxd_mqtt_client_socket) == socket_ptr)
787     {
788 
789         /* Set the event flag. */
790 #ifndef NXD_MQTT_CLOUD_ENABLE
791         tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_TCP_ESTABLISH_EVENT, TX_OR);
792 #else
793         nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_TCP_ESTABLISH_EVENT);
794 #endif /* NXD_MQTT_CLOUD_ENABLE */
795     }
796 }
797 
798 /**************************************************************************/
799 /*                                                                        */
800 /*  FUNCTION                                               RELEASE        */
801 /*                                                                        */
802 /*    _nxd_mqtt_receive_callback                          PORTABLE C      */
803 /*                                                           6.1          */
804 /*  AUTHOR                                                                */
805 /*                                                                        */
806 /*    Yuxin Zhou, Microsoft Corporation                                   */
807 /*                                                                        */
808 /*  DESCRIPTION                                                           */
809 /*                                                                        */
810 /*    This internal function is installed as TCP receive callback         */
811 /*    function.  On receiving a TCP message, the callback function        */
812 /*    sets an event flag to trigger MQTT client to process received       */
813 /*    message.                                                            */
814 /*                                                                        */
815 /*                                                                        */
816 /*  INPUT                                                                 */
817 /*                                                                        */
818 /*    socket_ptr                            The socket that receives      */
819 /*                                           the message.                 */
820 /*                                                                        */
821 /*  OUTPUT                                                                */
822 /*                                                                        */
823 /*    None                                                                */
824 /*                                                                        */
825 /*  CALLS                                                                 */
826 /*                                                                        */
827 /*    tx_event_flags_set                                                  */
828 /*                                                                        */
829 /*  CALLED BY                                                             */
830 /*                                                                        */
831 /*    _nxd_mqtt_thread_entry                                              */
832 /*    _nxd_mqtt_client_event_process                                      */
833 /*                                                                        */
834 /*  RELEASE HISTORY                                                       */
835 /*                                                                        */
836 /*    DATE              NAME                      DESCRIPTION             */
837 /*                                                                        */
838 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
839 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
840 /*                                            resulting in version 6.1    */
841 /*                                                                        */
842 /**************************************************************************/
_nxd_mqtt_receive_callback(NX_TCP_SOCKET * socket_ptr)843 static VOID _nxd_mqtt_receive_callback(NX_TCP_SOCKET *socket_ptr)
844 {
845 NXD_MQTT_CLIENT *client_ptr;
846 
847     client_ptr = (NXD_MQTT_CLIENT *)socket_ptr -> nx_tcp_socket_reserved_ptr;
848 
849     if (&(client_ptr -> nxd_mqtt_client_socket) == socket_ptr)
850     {
851         /* Set the event flag. */
852 #ifndef NXD_MQTT_CLOUD_ENABLE
853         tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_PACKET_RECEIVE_EVENT, TX_OR);
854 #else
855         nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PACKET_RECEIVE_EVENT);
856 #endif /* NXD_MQTT_CLOUD_ENABLE */
857     }
858 }
859 
860 /**************************************************************************/
861 /*                                                                        */
862 /*  FUNCTION                                               RELEASE        */
863 /*                                                                        */
864 /*    _nxd_mqtt_copy_transmit_packet                      PORTABLE C      */
865 /*                                                           6.1.8        */
866 /*  AUTHOR                                                                */
867 /*                                                                        */
868 /*    Yuxin Zhou, Microsoft Corporation                                   */
869 /*                                                                        */
870 /*  DESCRIPTION                                                           */
871 /*                                                                        */
872 /*    This internal function saves a transmit packet.                     */
873 /*    A transmit packet is allocated to store QoS 1 and 2 messages.       */
874 /*    Upon a message being properly acknowledged, the packet will         */
875 /*    be released.                                                        */
876 /*                                                                        */
877 /*                                                                        */
878 /*  INPUT                                                                 */
879 /*                                                                        */
880 /*    client_ptr                            Pointer to MQTT Client        */
881 /*    packet_ptr                            Pointer to the MQTT message   */
882 /*                                            packet to be saved          */
883 /*    new_packet_ptr                        Return a copied packet        */
884 /*    packet_id                             Current packet ID             */
885 /*    set_duplicate_flag                    Set duplicate flag for fixed  */
886 /*                                            header or not               */
887 /*    wait_option                           Timeout value                 */
888 /*                                                                        */
889 /*  OUTPUT                                                                */
890 /*                                                                        */
891 /*    status                                                              */
892 /*                                                                        */
893 /*  CALLS                                                                 */
894 /*                                                                        */
895 /*    nx_packet_copy                                                      */
896 /*                                                                        */
897 /*  CALLED BY                                                             */
898 /*                                                                        */
899 /*    _nxd_mqtt_client_sub_unsub                                          */
900 /*    _nxd_mqtt_process_publish                                           */
901 /*                                                                        */
902 /*  RELEASE HISTORY                                                       */
903 /*                                                                        */
904 /*    DATE              NAME                      DESCRIPTION             */
905 /*                                                                        */
906 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
907 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
908 /*                                            resulting in version 6.1    */
909 /*  08-02-2021     Yuxin Zhou               Modified comment(s),          */
910 /*                                            supported maximum transmit  */
911 /*                                            queue depth,                */
912 /*                                            resulting in version 6.1.8  */
913 /*                                                                        */
914 /**************************************************************************/
_nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET ** new_packet_ptr,USHORT packet_id,UCHAR set_duplicate_flag,UINT wait_option)915 static UINT _nxd_mqtt_copy_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET **new_packet_ptr,
916                                            USHORT packet_id, UCHAR set_duplicate_flag, UINT wait_option)
917 {
918 UINT status;
919 
920 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
921     if (client_ptr -> message_transmit_queue_depth >= NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH)
922     {
923 
924         /* Hit the transmit queue depeth. No more packets should be queued. */
925         return(NX_TX_QUEUE_DEPTH);
926     }
927 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
928 
929     /* Copy current packet. */
930     status = nx_packet_copy(packet_ptr, new_packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
931     if (status)
932     {
933 
934         /* No available packet to be stored. */
935         return(NXD_MQTT_PACKET_POOL_FAILURE);
936     }
937 
938     /* Save packet_id at the beginning of packet. */
939     *((USHORT *)(*new_packet_ptr) -> nx_packet_data_start) = packet_id;
940 
941     if (set_duplicate_flag)
942     {
943 
944         /* Update duplicate flag in fixed header. */
945         *((*new_packet_ptr) -> nx_packet_prepend_ptr) = (*((*new_packet_ptr) -> nx_packet_prepend_ptr)) | MQTT_PUBLISH_DUP_FLAG;
946     }
947 
948 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
949     /* Increase the transmit queue depth.  */
950     client_ptr -> message_transmit_queue_depth++;
951 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
952 
953     return(NXD_MQTT_SUCCESS);
954 }
955 
956 /**************************************************************************/
957 /*                                                                        */
958 /*  FUNCTION                                               RELEASE        */
959 /*                                                                        */
960 /*    _nxd_mqtt_release_transmit_packet                   PORTABLE C      */
961 /*                                                           6.1.8        */
962 /*  AUTHOR                                                                */
963 /*                                                                        */
964 /*    Yuxin Zhou, Microsoft Corporation                                   */
965 /*                                                                        */
966 /*  DESCRIPTION                                                           */
967 /*                                                                        */
968 /*    This internal function releases a transmit packet.                  */
969 /*    A transmit packet is allocated to store QoS 1 and 2 messages.       */
970 /*    Upon a message being properly acknowledged, the packet can          */
971 /*    be released.                                                        */
972 /*                                                                        */
973 /*                                                                        */
974 /*  INPUT                                                                 */
975 /*                                                                        */
976 /*    client_ptr                            Pointer to MQTT Client        */
977 /*    packet_ptr                            Pointer to the MQTT message   */
978 /*                                            packet to be removed        */
979 /*    previous_packet_ptr                   Pointer to the previous packet*/
980 /*                                            or NULL if none exists      */
981 /*                                                                        */
982 /*  OUTPUT                                                                */
983 /*                                                                        */
984 /*    None                                                                */
985 /*                                                                        */
986 /*  CALLS                                                                 */
987 /*                                                                        */
988 /*    None                                                                */
989 /*                                                                        */
990 /*  CALLED BY                                                             */
991 /*                                                                        */
992 /*    _nxd_mqtt_thread_entry                                              */
993 /*    _nxd_mqtt_client_event_process                                      */
994 /*                                                                        */
995 /*  RELEASE HISTORY                                                       */
996 /*                                                                        */
997 /*    DATE              NAME                      DESCRIPTION             */
998 /*                                                                        */
999 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1000 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1001 /*                                            resulting in version 6.1    */
1002 /*  08-02-2021     Yuxin Zhou               Modified comment(s),          */
1003 /*                                            supported maximum transmit  */
1004 /*                                            queue depth,                */
1005 /*                                            resulting in version 6.1.8  */
1006 /*                                                                        */
1007 /**************************************************************************/
_nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET * previous_packet_ptr)1008 static VOID _nxd_mqtt_release_transmit_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr)
1009 {
1010 
1011     if (previous_packet_ptr)
1012     {
1013         previous_packet_ptr -> nx_packet_queue_next = packet_ptr -> nx_packet_queue_next;
1014     }
1015     else
1016     {
1017         client_ptr -> message_transmit_queue_head = packet_ptr -> nx_packet_queue_next;
1018     }
1019 
1020     if (packet_ptr == client_ptr -> message_transmit_queue_tail)
1021     {
1022         client_ptr -> message_transmit_queue_tail = previous_packet_ptr;
1023     }
1024     nx_packet_release(packet_ptr);
1025 
1026 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
1027     client_ptr -> message_transmit_queue_depth--;
1028 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
1029 }
1030 
1031 /**************************************************************************/
1032 /*                                                                        */
1033 /*  FUNCTION                                               RELEASE        */
1034 /*                                                                        */
1035 /*    _nxd_mqtt_release_receive_packet                    PORTABLE C      */
1036 /*                                                           6.1          */
1037 /*  AUTHOR                                                                */
1038 /*                                                                        */
1039 /*    Yuxin Zhou, Microsoft Corporation                                   */
1040 /*                                                                        */
1041 /*  DESCRIPTION                                                           */
1042 /*                                                                        */
1043 /*    This internal function releases a receive packet.                   */
1044 /*    A receive packet is allocated to store QoS 1 and 2 messages.        */
1045 /*    Upon a message being properly acknowledged, the packet can          */
1046 /*    be released.                                                        */
1047 /*                                                                        */
1048 /*                                                                        */
1049 /*  INPUT                                                                 */
1050 /*                                                                        */
1051 /*    client_ptr                            Pointer to MQTT Client        */
1052 /*    packet_ptr                            Pointer to the MQTT message   */
1053 /*                                            packet to be removed        */
1054 /*    previous_packet_ptr                   Pointer to the previous packet*/
1055 /*                                            or NULL if none exists      */
1056 /*                                                                        */
1057 /*  OUTPUT                                                                */
1058 /*                                                                        */
1059 /*    None                                                                */
1060 /*                                                                        */
1061 /*  CALLS                                                                 */
1062 /*                                                                        */
1063 /*    None                                                                */
1064 /*                                                                        */
1065 /*  CALLED BY                                                             */
1066 /*                                                                        */
1067 /*    _nxd_mqtt_thread_entry                                              */
1068 /*    _nxd_mqtt_client_event_process                                      */
1069 /*                                                                        */
1070 /*  RELEASE HISTORY                                                       */
1071 /*                                                                        */
1072 /*    DATE              NAME                      DESCRIPTION             */
1073 /*                                                                        */
1074 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1075 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1076 /*                                            resulting in version 6.1    */
1077 /*                                                                        */
1078 /**************************************************************************/
_nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,NX_PACKET * previous_packet_ptr)1079 static VOID _nxd_mqtt_release_receive_packet(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, NX_PACKET *previous_packet_ptr)
1080 {
1081 
1082     if (previous_packet_ptr)
1083     {
1084         previous_packet_ptr -> nx_packet_queue_next = packet_ptr -> nx_packet_queue_next;
1085     }
1086     else
1087     {
1088         client_ptr -> message_receive_queue_head = packet_ptr -> nx_packet_queue_next;
1089     }
1090 
1091     if (packet_ptr == client_ptr -> message_receive_queue_tail)
1092     {
1093         client_ptr -> message_receive_queue_tail = previous_packet_ptr;
1094     }
1095 
1096     client_ptr -> message_receive_queue_depth--;
1097 }
1098 
1099 /**************************************************************************/
1100 /*                                                                        */
1101 /*  FUNCTION                                               RELEASE        */
1102 /*                                                                        */
1103 /*    _nxd_mqtt_process_connack                           PORTABLE C      */
1104 /*                                                           6.1          */
1105 /*  AUTHOR                                                                */
1106 /*                                                                        */
1107 /*    Yuxin Zhou, Microsoft Corporation                                   */
1108 /*                                                                        */
1109 /*  DESCRIPTION                                                           */
1110 /*                                                                        */
1111 /*    This internal function processes a CONNACK message from the broker. */
1112 /*                                                                        */
1113 /*                                                                        */
1114 /*  INPUT                                                                 */
1115 /*                                                                        */
1116 /*    client_ptr                            Pointer to MQTT Client        */
1117 /*    packet_ptr                            Pointer to the packet         */
1118 /*    wait_option                           Timeout value                 */
1119 /*                                                                        */
1120 /*  OUTPUT                                                                */
1121 /*                                                                        */
1122 /*    status                                                              */
1123 /*                                                                        */
1124 /*  CALLS                                                                 */
1125 /*                                                                        */
1126 /*    [nxd_mqtt_connect_notify]             User supplied connect         */
1127 /*                                            callback function           */
1128 /*    tx_mutex_get                                                        */
1129 /*    tx_mutex_put                                                        */
1130 /*    _nxd_mqtt_client_retransmit_message                                 */
1131 /*    _nxd_mqtt_client_connection_end                                     */
1132 /*                                                                        */
1133 /*                                                                        */
1134 /*  CALLED BY                                                             */
1135 /*                                                                        */
1136 /*    _nxd_mqtt_packet_receive_process                                    */
1137 /*                                                                        */
1138 /*  RELEASE HISTORY                                                       */
1139 /*                                                                        */
1140 /*    DATE              NAME                      DESCRIPTION             */
1141 /*                                                                        */
1142 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1143 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1144 /*                                            resulting in version 6.1    */
1145 /*                                                                        */
1146 /**************************************************************************/
_nxd_mqtt_process_connack(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,ULONG wait_option)1147 static UINT _nxd_mqtt_process_connack(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, ULONG wait_option)
1148 {
1149 
1150 UINT    ret = NXD_MQTT_COMMUNICATION_FAILURE;
1151 MQTT_PACKET_CONNACK *connack_packet_ptr = (MQTT_PACKET_CONNACK *)(packet_ptr -> nx_packet_prepend_ptr);
1152 
1153 
1154     /* Check the length.  */
1155     if ((packet_ptr -> nx_packet_length != sizeof(MQTT_PACKET_CONNACK)) ||
1156         (connack_packet_ptr -> mqtt_connack_packet_header >> 4 != MQTT_CONTROL_PACKET_TYPE_CONNACK))
1157     {
1158         /* Invalid packet length.  Free the packet and process error. */
1159         ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1160     }
1161     else
1162     {
1163 
1164         /* Check remaining length.  */
1165         if (connack_packet_ptr -> mqtt_connack_packet_remaining_length != 2)
1166         {
1167             ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1168         }
1169         /* Follow MQTT-3.2.2-1 rule.  */
1170         else if ((client_ptr -> nxd_mqtt_clean_session) && (connack_packet_ptr -> mqtt_connack_packet_ack_flags & MQTT_CONNACK_CONNECT_FLAGS_SP))
1171         {
1172 
1173             /* Client requested clean session, and server responded with Session Present.  This is a violation. */
1174             ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1175         }
1176         else if (connack_packet_ptr -> mqtt_connack_packet_return_code >  MQTT_CONNACK_CONNECT_RETURN_CODE_NOT_AUTHORIZED)
1177         {
1178             ret = NXD_MQTT_SERVER_MESSAGE_FAILURE;
1179         }
1180         else if (connack_packet_ptr -> mqtt_connack_packet_return_code > 0)
1181         {
1182 
1183             /* Pass the server return code to the application. */
1184             ret = (UINT)(NXD_MQTT_ERROR_CONNECT_RETURN_CODE + connack_packet_ptr -> mqtt_connack_packet_return_code);
1185         }
1186         else
1187         {
1188             ret = NXD_MQTT_SUCCESS;
1189 
1190             /* Obtain mutex before we modify client control block. */
1191             tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
1192 
1193             client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_CONNECTED;
1194 
1195             /* Initialize the packet identification field. */
1196             client_ptr -> nxd_mqtt_client_packet_identifier = NXD_MQTT_INITIAL_PACKET_ID_VALUE;
1197 
1198             /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
1199             if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
1200                 client_ptr -> nxd_mqtt_client_packet_identifier = 1;
1201 
1202             /* Release mutex */
1203             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1204         }
1205     }
1206 
1207     /* Check callback function.  */
1208     if (client_ptr -> nxd_mqtt_connect_notify)
1209     {
1210         client_ptr -> nxd_mqtt_connect_notify(client_ptr, ret, client_ptr -> nxd_mqtt_connect_context);
1211     }
1212 
1213     if (ret == NXD_MQTT_SUCCESS)
1214     {
1215 
1216         /* If client doesn't start with Clean Session, and there are un-acked PUBLISH messages,
1217            we shall re-publish these messages. */
1218         if ((client_ptr -> nxd_mqtt_clean_session != NX_TRUE) && (client_ptr -> message_transmit_queue_head))
1219         {
1220 
1221             tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
1222 
1223             /* There are messages from the previous session that has not been acknowledged. */
1224             ret = _nxd_mqtt_client_retransmit_message(client_ptr, wait_option);
1225 
1226             /* Release mutex */
1227             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1228         }
1229     }
1230     else
1231     {
1232 
1233         /* End connection. */
1234         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
1235     }
1236 
1237     return(ret);
1238 }
1239 
1240 /**************************************************************************/
1241 /*                                                                        */
1242 /*  FUNCTION                                               RELEASE        */
1243 /*                                                                        */
1244 /*    _nxd_mqtt_process_publish_packet                    PORTABLE C      */
1245 /*                                                           6.1          */
1246 /*  AUTHOR                                                                */
1247 /*                                                                        */
1248 /*    Yuxin Zhou, Microsoft Corporation                                   */
1249 /*                                                                        */
1250 /*  DESCRIPTION                                                           */
1251 /*                                                                        */
1252 /*    This internal function processes a packet and parses topic and      */
1253 /*    message.                                                            */
1254 /*                                                                        */
1255 /*                                                                        */
1256 /*  INPUT                                                                 */
1257 /*                                                                        */
1258 /*    packet_ptr                            Pointer to the packet         */
1259 /*    topic_offset_ptr                      Return topic offset           */
1260 /*    topic_length_ptr                      Return topic length           */
1261 /*    message_offset_ptr                    Return message offset         */
1262 /*    message_length_ptr                    Return message length         */
1263 /*                                                                        */
1264 /*  OUTPUT                                                                */
1265 /*                                                                        */
1266 /*    Status                                                              */
1267 /*                                                                        */
1268 /*  CALLS                                                                 */
1269 /*                                                                        */
1270 /*    _nxd_mqtt_read_remaining_length                                     */
1271 /*    nx_packet_data_extract_offset                                       */
1272 /*                                                                        */
1273 /*                                                                        */
1274 /*  CALLED BY                                                             */
1275 /*                                                                        */
1276 /*    _nxd_mqtt_process_publish                                           */
1277 /*                                                                        */
1278 /*  RELEASE HISTORY                                                       */
1279 /*                                                                        */
1280 /*    DATE              NAME                      DESCRIPTION             */
1281 /*                                                                        */
1282 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1283 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1284 /*                                            resulting in version 6.1    */
1285 /*                                                                        */
1286 /**************************************************************************/
_nxd_mqtt_process_publish_packet(NX_PACKET * packet_ptr,ULONG * topic_offset_ptr,USHORT * topic_length_ptr,ULONG * message_offset_ptr,ULONG * message_length_ptr)1287 UINT _nxd_mqtt_process_publish_packet(NX_PACKET *packet_ptr, ULONG *topic_offset_ptr, USHORT *topic_length_ptr,
1288                                       ULONG *message_offset_ptr, ULONG *message_length_ptr)
1289 {
1290 UCHAR  QoS;
1291 UINT   remaining_length = 0;
1292 UINT   topic_length;
1293 ULONG  offset;
1294 UCHAR  bytes[2];
1295 ULONG  bytes_copied;
1296 
1297 
1298     QoS = (UCHAR)((*(packet_ptr -> nx_packet_prepend_ptr) & MQTT_PUBLISH_QOS_LEVEL_FIELD) >> 1);
1299 
1300     if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1301     {
1302         return(NXD_MQTT_INVALID_PACKET);
1303     }
1304 
1305     if (remaining_length < 2)
1306     {
1307         return(NXD_MQTT_INVALID_PACKET);
1308     }
1309 
1310     /* Get topic length fields. */
1311     if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1312         (bytes_copied != sizeof(bytes)))
1313     {
1314         return(NXD_MQTT_INVALID_PACKET);
1315     }
1316 
1317     topic_length = (UINT)(*(bytes) << 8) | (*(bytes + 1));
1318 
1319     if (topic_length > remaining_length - 2u)
1320     {
1321         return(NXD_MQTT_INVALID_PACKET);
1322     }
1323 
1324     *topic_offset_ptr = offset + 2;
1325     *topic_length_ptr = (USHORT)topic_length;
1326 
1327     remaining_length = remaining_length - topic_length - 2;
1328     if ((QoS == 1) || (QoS == 2))
1329     {
1330         offset += 2 + 2 + topic_length;
1331 
1332         if (remaining_length < 2)
1333         {
1334             return(NXD_MQTT_INVALID_PACKET);
1335         }
1336         remaining_length = remaining_length - 2;
1337     }
1338     else
1339     {
1340         offset += 2 + topic_length;
1341     }
1342 
1343     *message_offset_ptr = offset;
1344     *message_length_ptr = (ULONG)remaining_length;
1345 
1346     /* Return */
1347     return(NXD_MQTT_SUCCESS);
1348 }
1349 /**************************************************************************/
1350 /*                                                                        */
1351 /*  FUNCTION                                               RELEASE        */
1352 /*                                                                        */
1353 /*    _nxd_mqtt_process_publish                           PORTABLE C      */
1354 /*                                                           6.3.0        */
1355 /*  AUTHOR                                                                */
1356 /*                                                                        */
1357 /*    Yuxin Zhou, Microsoft Corporation                                   */
1358 /*                                                                        */
1359 /*  DESCRIPTION                                                           */
1360 /*                                                                        */
1361 /*    This internal function process a publish message from the broker.   */
1362 /*                                                                        */
1363 /*                                                                        */
1364 /*  INPUT                                                                 */
1365 /*                                                                        */
1366 /*    client_ptr                            Pointer to MQTT Client        */
1367 /*    packet_ptr                            Pointer to the packet         */
1368 /*                                                                        */
1369 /*  OUTPUT                                                                */
1370 /*                                                                        */
1371 /*    NX_TRUE - packet is consumed                                        */
1372 /*    NX_FALSE - packet is not consumed                                   */
1373 /*                                                                        */
1374 /*  CALLS                                                                 */
1375 /*                                                                        */
1376 /*    [receive_notify]                      User supplied receive         */
1377 /*                                            callback function           */
1378 /*    _nxd_mqtt_client_packet_allocate                                    */
1379 /*    _nxd_mqtt_packet_send                                               */
1380 /*    nx_packet_release                                                   */
1381 /*    nx_secure_tls_session_send                                          */
1382 /*    _nxd_mqtt_process_publish_packet                                    */
1383 /*    _nxd_mqtt_copy_transmit_packet                                      */
1384 /*                                                                        */
1385 /*                                                                        */
1386 /*  CALLED BY                                                             */
1387 /*                                                                        */
1388 /*    _nxd_mqtt_packet_receive_process                                    */
1389 /*                                                                        */
1390 /*  RELEASE HISTORY                                                       */
1391 /*                                                                        */
1392 /*    DATE              NAME                      DESCRIPTION             */
1393 /*                                                                        */
1394 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1395 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1396 /*                                            resulting in version 6.1    */
1397 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
1398 /*                                            the logic of sending packet,*/
1399 /*                                            resulting in version 6.2.0  */
1400 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
1401 /*                                            internal logic,             */
1402 /*                                            resulting in version 6.3.0  */
1403 /*                                                                        */
1404 /**************************************************************************/
_nxd_mqtt_process_publish(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1405 static UINT _nxd_mqtt_process_publish(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1406 {
1407 MQTT_PACKET_PUBLISH_RESPONSE *pubresp_ptr;
1408 UINT                          status;
1409 USHORT                        packet_id = 0;
1410 UCHAR                         QoS;
1411 UINT                          enqueue_message = 0;
1412 NX_PACKET                    *transmit_packet_ptr;
1413 UINT                          remaining_length = 0;
1414 UINT                          packet_consumed = NX_FALSE;
1415 UCHAR                         fixed_header;
1416 USHORT                        transmit_packet_id;
1417 UINT                          topic_length;
1418 ULONG                         offset;
1419 UCHAR                         bytes[2];
1420 ULONG                         bytes_copied;
1421 
1422     QoS = (UCHAR)((*(packet_ptr -> nx_packet_prepend_ptr) & MQTT_PUBLISH_QOS_LEVEL_FIELD) >> 1);
1423 
1424     if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1425     {
1426         return(NX_FALSE);
1427     }
1428 
1429     if (remaining_length < 2)
1430     {
1431         return(NXD_MQTT_INVALID_PACKET);
1432     }
1433 
1434     /* Get topic length fields. */
1435     if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1436         (bytes_copied != sizeof(bytes)))
1437     {
1438         return(NXD_MQTT_INVALID_PACKET);
1439     }
1440 
1441     topic_length = (UINT)(*(bytes) << 8) | (*(bytes + 1));
1442 
1443     if (topic_length > remaining_length - 2u)
1444     {
1445         return(NXD_MQTT_INVALID_PACKET);
1446     }
1447 
1448     if (QoS == 0)
1449     {
1450         enqueue_message = 1;
1451     }
1452     else
1453     {
1454         /* QoS 1 or QoS 2 messages. */
1455         /* Get packet id fields. */
1456         if (nx_packet_data_extract_offset(packet_ptr, offset + 2 + topic_length, &bytes, sizeof(bytes), &bytes_copied))
1457         {
1458             return(NXD_MQTT_INVALID_PACKET);
1459         }
1460 
1461         packet_id = (USHORT)(((*bytes) << 8) | (*(bytes + 1)));
1462 
1463         /* Look for an existing transmit packets with the same packet id */
1464         transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1465 
1466         while (transmit_packet_ptr)
1467         {
1468             fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1469             transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1470             if ((transmit_packet_id == packet_id) &&
1471                 ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_PUBREC << 4)))
1472             {
1473 
1474                 /* Found a packet containing the packet_id */
1475                 break;
1476             }
1477             transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1478         }
1479 
1480         if (transmit_packet_ptr)
1481         {
1482             /* This published data is already in our system.  No need to deliver this message to the application. */
1483             enqueue_message = 0;
1484         }
1485         else
1486         {
1487             enqueue_message = 1;
1488         }
1489     }
1490 
1491     if (enqueue_message)
1492     {
1493         if (packet_ptr -> nx_packet_length > (offset + remaining_length))
1494         {
1495 
1496             /* This packet contains multiple messages. */
1497             if (nx_packet_copy(packet_ptr, &packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, NX_NO_WAIT))
1498             {
1499 
1500                 /* No packet is available. */
1501                 return(NX_FALSE);
1502             }
1503         }
1504         else
1505         {
1506             packet_consumed = NX_TRUE;
1507         }
1508 
1509         /* Increment the queue depth counter. */
1510         client_ptr -> message_receive_queue_depth++;
1511 
1512         if (client_ptr -> message_receive_queue_head == NX_NULL)
1513         {
1514             client_ptr -> message_receive_queue_head = packet_ptr;
1515         }
1516         else
1517         {
1518             client_ptr -> message_receive_queue_tail -> nx_packet_queue_next = packet_ptr;
1519         }
1520         client_ptr -> message_receive_queue_tail = packet_ptr;
1521 
1522         /* Invoke the user-defined receive notify function if it is set. */
1523         if (client_ptr -> nxd_mqtt_client_receive_notify)
1524         {
1525             (*(client_ptr -> nxd_mqtt_client_receive_notify))(client_ptr, client_ptr -> message_receive_queue_depth);
1526         }
1527     }
1528 
1529     /* If the message QoS level is 0, we are done. */
1530     if (QoS == 0)
1531     {
1532         /* Return */
1533         return(packet_consumed);
1534     }
1535 
1536     /* Send out proper ACKs for QoS 1 and 2 messages. */
1537     /* Allocate a new packet so we can send out a response. */
1538     status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
1539     if (status)
1540     {
1541         /* Packet allocation fails. */
1542         return(packet_consumed);
1543     }
1544 
1545     /* Fill in the packet ID */
1546     pubresp_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)(packet_ptr -> nx_packet_prepend_ptr);
1547     pubresp_ptr -> mqtt_publish_response_packet_remaining_length = 2;
1548     pubresp_ptr -> mqtt_publish_response_packet_packet_identifier_msb = (UCHAR)(packet_id >> 8);
1549     pubresp_ptr -> mqtt_publish_response_packet_packet_identifier_lsb = (UCHAR)(packet_id & 0xFF);
1550 
1551     if (QoS == 1)
1552     {
1553 
1554         pubresp_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBACK << 4;
1555     }
1556     else
1557     {
1558         pubresp_ptr -> mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBREC << 4;
1559     }
1560 
1561     packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + sizeof(MQTT_PACKET_PUBLISH_RESPONSE);
1562     packet_ptr -> nx_packet_length = sizeof(MQTT_PACKET_PUBLISH_RESPONSE);
1563 
1564     if (QoS == 2)
1565     {
1566 
1567         /* Copy packet for checking duplicate publish packet. */
1568         if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
1569                                            packet_id, NX_FALSE, NX_WAIT_FOREVER))
1570         {
1571 
1572             /* Release the packet. */
1573             nx_packet_release(packet_ptr);
1574             return(packet_consumed);
1575         }
1576         if (client_ptr -> message_transmit_queue_head == NX_NULL)
1577         {
1578             client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
1579         }
1580         else
1581         {
1582             client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
1583         }
1584         client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
1585     }
1586 
1587     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1588 
1589     /* Send packet to server.  */
1590     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
1591 
1592     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
1593     if (status)
1594     {
1595 
1596         /* Release the packet. */
1597         nx_packet_release(packet_ptr);
1598     }
1599     else
1600     {
1601         /* Update the timeout value. */
1602         client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
1603     }
1604 
1605     /* Return */
1606     return(packet_consumed);
1607 }
1608 
1609 /**************************************************************************/
1610 /*                                                                        */
1611 /*  FUNCTION                                               RELEASE        */
1612 /*                                                                        */
1613 /*    _nxd_mqtt_process_publish_response                  PORTABLE C      */
1614 /*                                                           6.3.0        */
1615 /*  AUTHOR                                                                */
1616 /*                                                                        */
1617 /*    Yuxin Zhou, Microsoft Corporation                                   */
1618 /*                                                                        */
1619 /*  DESCRIPTION                                                           */
1620 /*                                                                        */
1621 /*    This internal function process a publish response messages.         */
1622 /*    Publish Response messages are: PUBACK, PUBREC, PUBREL               */
1623 /*                                                                        */
1624 /*  INPUT                                                                 */
1625 /*                                                                        */
1626 /*    client_ptr                            Pointer to MQTT Client        */
1627 /*    packet_ptr                            Pointer to the packet         */
1628 /*                                                                        */
1629 /*  OUTPUT                                                                */
1630 /*                                                                        */
1631 /*    Status                                                              */
1632 /*                                                                        */
1633 /*  CALLS                                                                 */
1634 /*                                                                        */
1635 /*    [nxd_mqtt_client_receive_notify]      User supplied publish         */
1636 /*                                            callback function           */
1637 /*    _nxd_mqtt_release_transmit_packet                                   */
1638 /*    _nxd_mqtt_packet_send                                               */
1639 /*    _nxd_mqtt_client_packet_allocate                                    */
1640 /*                                                                        */
1641 /*                                                                        */
1642 /*  CALLED BY                                                             */
1643 /*                                                                        */
1644 /*    _nxd_mqtt_packet_receive_process                                    */
1645 /*                                                                        */
1646 /*  RELEASE HISTORY                                                       */
1647 /*                                                                        */
1648 /*    DATE              NAME                      DESCRIPTION             */
1649 /*                                                                        */
1650 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1651 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
1652 /*                                            added ack receive notify,   */
1653 /*                                            resulting in version 6.1    */
1654 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
1655 /*                                            the logic of sending packet,*/
1656 /*                                            resulting in version 6.2.0  */
1657 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
1658 /*                                            internal logic,             */
1659 /*                                            resulting in version 6.3.0  */
1660 /*                                                                        */
1661 /**************************************************************************/
_nxd_mqtt_process_publish_response(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1662 static UINT _nxd_mqtt_process_publish_response(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1663 {
1664 MQTT_PACKET_PUBLISH_RESPONSE *response_ptr;
1665 USHORT                        packet_id;
1666 NX_PACKET                    *previous_packet_ptr;
1667 NX_PACKET                    *transmit_packet_ptr;
1668 NX_PACKET                    *response_packet;
1669 UINT                          ret;
1670 UCHAR                         fixed_header;
1671 USHORT                        transmit_packet_id;
1672 
1673     response_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)(packet_ptr -> nx_packet_prepend_ptr);
1674 
1675     /* Validate the packet. */
1676     if (response_ptr -> mqtt_publish_response_packet_remaining_length != 2)
1677     {
1678         /* Invalid remaining_length value. Return 1 so the caller can release
1679            the packet. */
1680 
1681         return(1);
1682     }
1683 
1684     packet_id = (USHORT)((response_ptr -> mqtt_publish_response_packet_packet_identifier_msb << 8) |
1685                          (response_ptr -> mqtt_publish_response_packet_packet_identifier_lsb));
1686 
1687     /* Search all the outstanding transmitted packets for a match. */
1688     previous_packet_ptr = NX_NULL;
1689     transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1690     while (transmit_packet_ptr)
1691     {
1692         fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1693         transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1694         if (transmit_packet_id == packet_id)
1695         {
1696 
1697             /* Found the matching packet id */
1698             if (((response_ptr -> mqtt_publish_response_packet_header) >> 4) == MQTT_CONTROL_PACKET_TYPE_PUBACK)
1699             {
1700 
1701                 /* PUBACK is the response to a PUBLISH packet with QoS Level 1*/
1702                 /* Therefore we verify that packet contains PUBLISH packet with QoS level 1*/
1703                 if ((fixed_header & 0xF6) == ((MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4) | MQTT_PUBLISH_QOS_LEVEL_1))
1704                 {
1705 
1706                     /* Check ack notify function.  */
1707                     if (client_ptr -> nxd_mqtt_ack_receive_notify)
1708                     {
1709 
1710                         /* Call notify function. Note: user routine should not release the packet.  */
1711                         client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_PUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1712                     }
1713 
1714                     /* QoS Level1 message receives an ACK. */
1715                     /* This message can be released. */
1716                     _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1717 
1718                     /* Return with value 1, so the caller will release packet_ptr */
1719                     return(1);
1720                 }
1721             }
1722             else if (((response_ptr -> mqtt_publish_response_packet_header) >> 4) == MQTT_CONTROL_PACKET_TYPE_PUBREL)
1723             {
1724 
1725                 /* QoS 2 publish Release received, part 2. */
1726                 /* Therefore we verify that packet contains PUBLISH packet with QoS level 2*/
1727                 if ((fixed_header & 0xF6) == (MQTT_CONTROL_PACKET_TYPE_PUBREC << 4))
1728                 {
1729 
1730                     /* QoS Level2 message receives an ACK. */
1731                     /* This message can be released. */
1732                     /* Send PUBCOMP */
1733 
1734                     /* Allocate a packet to send the response. */
1735                     ret = _nxd_mqtt_client_packet_allocate(client_ptr, &response_packet, NX_WAIT_FOREVER);
1736                     if (ret)
1737                     {
1738                         return(1);
1739                     }
1740 
1741                     if (4u > ((ULONG)(response_packet -> nx_packet_data_end) - (ULONG)(response_packet -> nx_packet_append_ptr)))
1742                     {
1743                         nx_packet_release(response_packet);
1744 
1745                         /* Packet buffer is too small to hold the message. */
1746                         return(NX_SIZE_ERROR);
1747                     }
1748 
1749                     response_ptr = (MQTT_PACKET_PUBLISH_RESPONSE *)response_packet -> nx_packet_prepend_ptr;
1750 
1751                     response_ptr ->  mqtt_publish_response_packet_header = MQTT_CONTROL_PACKET_TYPE_PUBCOMP << 4;
1752                     response_ptr ->  mqtt_publish_response_packet_remaining_length = 2;
1753 
1754                     /* Fill in packet ID */
1755                     response_packet -> nx_packet_prepend_ptr[3] = packet_ptr -> nx_packet_prepend_ptr[3];
1756                     response_packet -> nx_packet_prepend_ptr[4] = packet_ptr -> nx_packet_prepend_ptr[4];
1757                     response_packet -> nx_packet_append_ptr = response_packet -> nx_packet_prepend_ptr + 4;
1758                     response_packet -> nx_packet_length = 4;
1759 
1760                     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
1761 
1762                     /* Send packet to server.  */
1763                     ret = _nxd_mqtt_packet_send(client_ptr, response_packet, NX_WAIT_FOREVER);
1764 
1765                     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
1766 
1767                     /* Update the timeout value. */
1768                     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
1769 
1770                     if (ret)
1771                     {
1772                         nx_packet_release(response_packet);
1773                     }
1774 
1775                     /* Check ack notify function.  */
1776                     if (client_ptr -> nxd_mqtt_ack_receive_notify)
1777                     {
1778 
1779                         /* Call notify function. Note: user routine should not release the packet.  */
1780                         client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_PUBREL, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1781                     }
1782 
1783                     /* This packet can be released. */
1784                     _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1785 
1786                     /* Return with value 1, so the caller will release packet_ptr */
1787                     return(1);
1788                 }
1789             }
1790         }
1791 
1792         /* Move on to the next packet */
1793         previous_packet_ptr = transmit_packet_ptr;
1794         transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1795     }
1796 
1797     /* nothing is found.  Return 1 to release the packet.*/
1798     return(1);
1799 }
1800 
1801 /**************************************************************************/
1802 /*                                                                        */
1803 /*  FUNCTION                                               RELEASE        */
1804 /*                                                                        */
1805 /*    _nxd_mqtt_process_sub_unsub_ack                     PORTABLE C      */
1806 /*                                                           6.1          */
1807 /*  AUTHOR                                                                */
1808 /*                                                                        */
1809 /*    Yuxin Zhou, Microsoft Corporation                                   */
1810 /*                                                                        */
1811 /*  DESCRIPTION                                                           */
1812 /*                                                                        */
1813 /*    This internal function process an ACK message for subscribe         */
1814 /*    or unsubscribe request.                                             */
1815 /*                                                                        */
1816 /*  INPUT                                                                 */
1817 /*                                                                        */
1818 /*    client_ptr                            Pointer to MQTT Client        */
1819 /*    packet_ptr                            Pointer to the packet         */
1820 /*                                                                        */
1821 /*  OUTPUT                                                                */
1822 /*                                                                        */
1823 /*    Status                                                              */
1824 /*                                                                        */
1825 /*  CALLS                                                                 */
1826 /*                                                                        */
1827 /*    [nxd_mqtt_client_receive_notify]      User supplied publish         */
1828 /*                                            callback function           */
1829 /*    _nxd_mqtt_release_transmit_packet                                   */
1830 /*                                          Release the memory block      */
1831 /*    _nxd_mqtt_read_remaining_length       Skip the remaining length     */
1832 /*                                            field                       */
1833 /*                                                                        */
1834 /*  CALLED BY                                                             */
1835 /*                                                                        */
1836 /*    _nxd_mqtt_packet_receive_process                                    */
1837 /*                                                                        */
1838 /*  RELEASE HISTORY                                                       */
1839 /*                                                                        */
1840 /*    DATE              NAME                      DESCRIPTION             */
1841 /*                                                                        */
1842 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1843 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
1844 /*                                            added ack receive notify,   */
1845 /*                                            resulting in version 6.1    */
1846 /*                                                                        */
1847 /**************************************************************************/
_nxd_mqtt_process_sub_unsub_ack(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr)1848 static UINT _nxd_mqtt_process_sub_unsub_ack(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr)
1849 {
1850 
1851 USHORT     packet_id;
1852 NX_PACKET *previous_packet_ptr;
1853 NX_PACKET *transmit_packet_ptr;
1854 UCHAR      response_header;
1855 UCHAR      fixed_header;
1856 USHORT     transmit_packet_id;
1857 UINT       remaining_length;
1858 ULONG      offset;
1859 UCHAR      bytes[2];
1860 ULONG      bytes_copied;
1861 
1862 
1863     response_header = *(packet_ptr -> nx_packet_prepend_ptr);
1864 
1865     if (_nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset))
1866     {
1867 
1868         /* Unable to process the sub/unsub ack.  Simply return and release the packet. */
1869         return(NXD_MQTT_INVALID_PACKET);
1870     }
1871 
1872     /* Get packet id fields. */
1873     if (nx_packet_data_extract_offset(packet_ptr, offset, &bytes, sizeof(bytes), &bytes_copied) ||
1874         (bytes_copied != sizeof(bytes)))
1875     {
1876         return(NXD_MQTT_INVALID_PACKET);
1877     }
1878 
1879     packet_id = (USHORT)(((*bytes) << 8) | (*(bytes + 1)));
1880 
1881     /* Search all the outstanding transmitted packets for a match. */
1882     previous_packet_ptr = NX_NULL;
1883     transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
1884     while (transmit_packet_ptr)
1885     {
1886         fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
1887         transmit_packet_id = *((USHORT *)transmit_packet_ptr -> nx_packet_data_start);
1888         if (transmit_packet_id == packet_id)
1889         {
1890 
1891             /* Found the matching packet id */
1892             if (((response_header >> 4) == MQTT_CONTROL_PACKET_TYPE_SUBACK) &&
1893                 ((fixed_header >> 4) == MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE))
1894             {
1895                 /* Validate the packet. */
1896                 if (remaining_length != 3)
1897                 {
1898                     /* Invalid remaining_length value. */
1899                     return(1);
1900                 }
1901 
1902                 /* Check ack notify function.  */
1903                 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1904                 {
1905 
1906                     /* Call notify function. Note: user routine should not release the packet.  */
1907                     client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_SUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1908                 }
1909 
1910                 /* Release the transmit packet. */
1911                 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1912 
1913                 return(1);
1914             }
1915             else if (((response_header >> 4) == MQTT_CONTROL_PACKET_TYPE_UNSUBACK) &&
1916                      ((fixed_header >> 4) == MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE))
1917             {
1918                 /* Validate the packet. */
1919                 if (remaining_length != 2)
1920                 {
1921                     /* Invalid remaining_length value. */
1922                     return(1);
1923                 }
1924 
1925                 /* Check ack notify function.  */
1926                 if (client_ptr -> nxd_mqtt_ack_receive_notify)
1927                 {
1928 
1929                     /* Call notify function. Note: user routine should not release the packet.  */
1930                     client_ptr -> nxd_mqtt_ack_receive_notify(client_ptr, MQTT_CONTROL_PACKET_TYPE_UNSUBACK, packet_id, transmit_packet_ptr, client_ptr -> nxd_mqtt_ack_receive_context);
1931                 }
1932 
1933                 /* Unsubscribe succeeded. */
1934                 /* Release the transmit packet. */
1935                 _nxd_mqtt_release_transmit_packet(client_ptr, transmit_packet_ptr, previous_packet_ptr);
1936 
1937                 return(1);
1938             }
1939         }
1940 
1941         /* Move on to the next packet */
1942         previous_packet_ptr = transmit_packet_ptr;
1943         transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
1944     }
1945     return(1);
1946 }
1947 
1948 
1949 /**************************************************************************/
1950 /*                                                                        */
1951 /*  FUNCTION                                               RELEASE        */
1952 /*                                                                        */
1953 /*    _nxd_mqtt_process_pingresp                          PORTABLE C      */
1954 /*                                                           6.1          */
1955 /*  AUTHOR                                                                */
1956 /*                                                                        */
1957 /*    Yuxin Zhou, Microsoft Corporation                                   */
1958 /*                                                                        */
1959 /*  DESCRIPTION                                                           */
1960 /*                                                                        */
1961 /*    This internal function process a PINGRESP message.                  */
1962 /*                                                                        */
1963 /*  INPUT                                                                 */
1964 /*                                                                        */
1965 /*    client_ptr                            Pointer to MQTT Client        */
1966 /*    packet_ptr                            Pointer to the packet         */
1967 /*                                                                        */
1968 /*  OUTPUT                                                                */
1969 /*                                                                        */
1970 /*    Status                                                              */
1971 /*                                                                        */
1972 /*  CALLS                                                                 */
1973 /*                                                                        */
1974 /*    None                                                                */
1975 /*                                            callback function           */
1976 /*                                                                        */
1977 /*  CALLED BY                                                             */
1978 /*                                                                        */
1979 /*    _nxd_mqtt_packet_receive_process                                    */
1980 /*                                                                        */
1981 /*  RELEASE HISTORY                                                       */
1982 /*                                                                        */
1983 /*    DATE              NAME                      DESCRIPTION             */
1984 /*                                                                        */
1985 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
1986 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
1987 /*                                            resulting in version 6.1    */
1988 /*                                                                        */
1989 /**************************************************************************/
_nxd_mqtt_process_pingresp(NXD_MQTT_CLIENT * client_ptr)1990 static VOID _nxd_mqtt_process_pingresp(NXD_MQTT_CLIENT *client_ptr)
1991 {
1992 
1993 
1994     /* If there is an outstanding ping, mark it as responded. */
1995     if (client_ptr -> nxd_mqtt_ping_not_responded == NX_TRUE)
1996     {
1997         client_ptr -> nxd_mqtt_ping_not_responded = NX_FALSE;
1998 
1999         client_ptr -> nxd_mqtt_ping_sent_time = 0;
2000     }
2001 
2002     return;
2003 }
2004 
2005 
2006 /**************************************************************************/
2007 /*                                                                        */
2008 /*  FUNCTION                                               RELEASE        */
2009 /*                                                                        */
2010 /*    _nxd_mqtt_process_disconnect                        PORTABLE C      */
2011 /*                                                           6.1          */
2012 /*  AUTHOR                                                                */
2013 /*                                                                        */
2014 /*    Yuxin Zhou, Microsoft Corporation                                   */
2015 /*                                                                        */
2016 /*  DESCRIPTION                                                           */
2017 /*                                                                        */
2018 /*    This internal function process a DISCONNECT message.                */
2019 /*                                                                        */
2020 /*  INPUT                                                                 */
2021 /*                                                                        */
2022 /*    client_ptr                            Pointer to MQTT Client        */
2023 /*    packet_ptr                            Pointer to the packet         */
2024 /*                                                                        */
2025 /*  OUTPUT                                                                */
2026 /*                                                                        */
2027 /*    None                                                                */
2028 /*                                                                        */
2029 /*  CALLS                                                                 */
2030 /*                                                                        */
2031 /*   nx_secure_tls_session_send                                           */
2032 /*   nx_tcp_socket_disconnect                                             */
2033 /*   nx_tcp_client_socket_unbind                                          */
2034 /*   _nxd_mqtt_release_transmit_packet                                    */
2035 /*   _nxd_mqtt_release_receive_packet                                     */
2036 /*   _nxd_mqtt_client_connection_end                                      */
2037 /*                                                                        */
2038 /*  CALLED BY                                                             */
2039 /*                                                                        */
2040 /*   _nxd_mqtt_packet_receive_process                                     */
2041 /*                                                                        */
2042 /*  RELEASE HISTORY                                                       */
2043 /*                                                                        */
2044 /*    DATE              NAME                      DESCRIPTION             */
2045 /*                                                                        */
2046 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2047 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2048 /*                                            resulting in version 6.1    */
2049 /*                                                                        */
2050 /**************************************************************************/
_nxd_mqtt_process_disconnect(NXD_MQTT_CLIENT * client_ptr)2051 static VOID _nxd_mqtt_process_disconnect(NXD_MQTT_CLIENT *client_ptr)
2052 {
2053 NX_PACKET  *previous = NX_NULL;
2054 NX_PACKET  *current;
2055 NX_PACKET  *next;
2056 UINT        disconnect_callback = NX_FALSE;
2057 UINT        status;
2058 UCHAR       fixed_header;
2059 
2060     if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2061     {
2062         /* State changes from CONNECTED TO IDLE.  Call disconnect notify callback
2063            if the function is set. */
2064         disconnect_callback = NX_TRUE;
2065     }
2066     else if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTING)
2067     {
2068 
2069         /* If state isn't CONNECTED or CONNECTING, just return. */
2070         return;
2071     }
2072 
2073     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2074 
2075     /* End connection. */
2076     _nxd_mqtt_client_connection_end(client_ptr, NXD_MQTT_SOCKET_TIMEOUT);
2077 
2078     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
2079 
2080     /* Free up sub/unsub packets on the transmit queue. */
2081     current = client_ptr -> message_transmit_queue_head;
2082 
2083     while (current)
2084     {
2085         next = current -> nx_packet_queue_next;
2086         fixed_header = *(current -> nx_packet_prepend_ptr);
2087 
2088         if (((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4)) ||
2089             ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE << 4)))
2090         {
2091             _nxd_mqtt_release_transmit_packet(client_ptr, current, previous);
2092         }
2093         else
2094         {
2095             previous = current;
2096         }
2097         current = next;
2098     }
2099 
2100     /* If a callback notification is defined, call it now. */
2101     if ((disconnect_callback == NX_TRUE) && (client_ptr -> nxd_mqtt_disconnect_notify))
2102     {
2103         client_ptr -> nxd_mqtt_disconnect_notify(client_ptr);
2104     }
2105 
2106     /* If a connect callback notification is defined and is still in connecting stage, call it now. */
2107     if ((disconnect_callback == NX_FALSE) && (client_ptr -> nxd_mqtt_connect_notify))
2108     {
2109         client_ptr -> nxd_mqtt_connect_notify(client_ptr, NXD_MQTT_CONNECT_FAILURE, client_ptr -> nxd_mqtt_connect_context);
2110     }
2111 
2112     if (status == TX_SUCCESS)
2113     {
2114         /* Remove all the packets in the receive queue. */
2115         while (client_ptr -> message_receive_queue_head)
2116         {
2117             _nxd_mqtt_release_receive_packet(client_ptr, client_ptr -> message_receive_queue_head, NX_NULL);
2118         }
2119         client_ptr -> message_receive_queue_depth = 0;
2120 
2121         /* Clear the MQTT_PACKET_RECEIVE_EVENT */
2122 #ifndef NXD_MQTT_CLOUD_ENABLE
2123         tx_event_flags_set(&client_ptr -> nxd_mqtt_events, ~MQTT_PACKET_RECEIVE_EVENT, TX_AND);
2124 #else
2125         nx_cloud_module_event_clear(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PACKET_RECEIVE_EVENT);
2126 #endif /* NXD_MQTT_CLOUD_ENABLE */
2127     }
2128 
2129     /* Clear flags if keep alive is enabled. */
2130     if (client_ptr -> nxd_mqtt_keepalive)
2131     {
2132         client_ptr -> nxd_mqtt_ping_not_responded = 0;
2133         client_ptr -> nxd_mqtt_ping_sent_time = 0;
2134     }
2135 
2136     /* Clean up the information when disconnecting. */
2137     client_ptr -> nxd_mqtt_client_username = NX_NULL;
2138     client_ptr -> nxd_mqtt_client_password = NX_NULL;
2139     client_ptr -> nxd_mqtt_client_will_topic = NX_NULL;
2140     client_ptr -> nxd_mqtt_client_will_message = NX_NULL;
2141     client_ptr -> nxd_mqtt_client_will_qos_retain = 0;
2142 
2143     /* Release current processing packet. */
2144     if (client_ptr -> nxd_mqtt_client_processing_packet)
2145     {
2146         nx_packet_release(client_ptr -> nxd_mqtt_client_processing_packet);
2147         client_ptr -> nxd_mqtt_client_processing_packet = NX_NULL;
2148     }
2149 
2150     return;
2151 }
2152 
2153 
2154 /**************************************************************************/
2155 /*                                                                        */
2156 /*  FUNCTION                                               RELEASE        */
2157 /*                                                                        */
2158 /*    _nxd_mqtt_packet_receive_process                    PORTABLE C      */
2159 /*                                                           6.2.0        */
2160 /*  AUTHOR                                                                */
2161 /*                                                                        */
2162 /*    Yuxin Zhou, Microsoft Corporation                                   */
2163 /*                                                                        */
2164 /*  DESCRIPTION                                                           */
2165 /*                                                                        */
2166 /*    This internal function process MQTT message.                        */
2167 /*    NOTE: MQTT Mutex is NOT obtained on entering this function.         */
2168 /*    Therefore it shouldn't hold the mutex when it exists this function. */
2169 /*                                                                        */
2170 /*  INPUT                                                                 */
2171 /*                                                                        */
2172 /*    client_ptr                            Pointer to MQTT Client        */
2173 /*                                                                        */
2174 /*  OUTPUT                                                                */
2175 /*                                                                        */
2176 /*    None                                                                */
2177 /*                                                                        */
2178 /*  CALLS                                                                 */
2179 /*                                                                        */
2180 /*    nx_secure_tls_session_receive                                       */
2181 /*    nx_tcp_socket_receive                                               */
2182 /*    _nxd_mqtt_process_publish                                           */
2183 /*    _nxd_mqtt_process_publish_response                                  */
2184 /*    _nxd_mqtt_process_sub_unsub_ack                                     */
2185 /*    _nxd_mqtt_process_pingresp                                          */
2186 /*    _nxd_mqtt_process_disconnect                                        */
2187 /*    nx_packet_release                                                   */
2188 /*                                                                        */
2189 /*  CALLED BY                                                             */
2190 /*                                                                        */
2191 /*    _nxd_mqtt_client_event_process                                      */
2192 /*                                                                        */
2193 /*  RELEASE HISTORY                                                       */
2194 /*                                                                        */
2195 /*    DATE              NAME                      DESCRIPTION             */
2196 /*                                                                        */
2197 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2198 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2199 /*                                            resulting in version 6.1    */
2200 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
2201 /*                                            mqtt over websocket,        */
2202 /*                                            resulting in version 6.2.0  */
2203 /*                                                                        */
2204 /**************************************************************************/
_nxd_mqtt_packet_receive_process(NXD_MQTT_CLIENT * client_ptr)2205 static VOID _nxd_mqtt_packet_receive_process(NXD_MQTT_CLIENT *client_ptr)
2206 {
2207 NX_PACKET *packet_ptr;
2208 NX_PACKET *previous_packet_ptr;
2209 UINT       status;
2210 UCHAR      packet_type;
2211 UINT       remaining_length;
2212 UINT       packet_consumed;
2213 ULONG      offset;
2214 ULONG      bytes_copied;
2215 ULONG      packet_length;
2216 
2217     for (;;)
2218     {
2219 
2220         /* Release the mutex. */
2221         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2222 
2223         /* Make a receive call. */
2224         status = _nxd_mqtt_packet_receive(client_ptr, &packet_ptr, NX_NO_WAIT);
2225 
2226         if (status != NX_SUCCESS)
2227         {
2228             if ((status != NX_NO_PACKET) && (status != NX_CONTINUE))
2229             {
2230 
2231                 /* Network issue. Close the MQTT session. */
2232 #ifndef NXD_MQTT_CLOUD_ENABLE
2233                 tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_NETWORK_DISCONNECT_EVENT, TX_OR);
2234 #else
2235                 nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_NETWORK_DISCONNECT_EVENT);
2236 #endif /* NXD_MQTT_CLOUD_ENABLE */
2237             }
2238 
2239             break;
2240         }
2241 
2242         /* Obtain the mutex. */
2243         tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
2244 
2245         /* Is there a packet waiting for processing? */
2246         if (client_ptr -> nxd_mqtt_client_processing_packet)
2247         {
2248 
2249             /* Yes. Link received packet to existing one. */
2250             if (client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last)
2251             {
2252                 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last -> nx_packet_next = packet_ptr;
2253             }
2254             else
2255             {
2256                 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_next = packet_ptr;
2257             }
2258             if (packet_ptr -> nx_packet_last)
2259             {
2260                 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last = packet_ptr -> nx_packet_last;
2261             }
2262             else
2263             {
2264                 client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_last = packet_ptr;
2265             }
2266             client_ptr -> nxd_mqtt_client_processing_packet -> nx_packet_length += packet_ptr -> nx_packet_length;
2267 
2268             /* Start to process existing packet. */
2269             packet_ptr = client_ptr -> nxd_mqtt_client_processing_packet;
2270             client_ptr -> nxd_mqtt_client_processing_packet = NX_NULL;
2271         }
2272 
2273         /* Check notify function.  */
2274         if (client_ptr -> nxd_mqtt_packet_receive_notify)
2275         {
2276 
2277             /* Call notify function. Return NX_TRUE if the packet has been consumed.  */
2278             if (client_ptr -> nxd_mqtt_packet_receive_notify(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_packet_receive_context) == NX_TRUE)
2279             {
2280                 continue;
2281             }
2282         }
2283 
2284         packet_consumed = NX_FALSE;
2285         while (packet_ptr)
2286         {
2287             /* Parse the incoming packet. */
2288             status = _nxd_mqtt_read_remaining_length(packet_ptr, &remaining_length, &offset);
2289             if (status == NXD_MQTT_PARTIAL_PACKET)
2290             {
2291 
2292                 /* We only have partial MQTT message.
2293                  * Put it to waiting list for more packets. */
2294                 client_ptr -> nxd_mqtt_client_processing_packet = packet_ptr;
2295                 packet_consumed = NX_TRUE;
2296                 break;
2297             }
2298             else if (status)
2299             {
2300 
2301                 /* Invalid packet. */
2302                 break;
2303             }
2304 
2305             /* Get packet type. */
2306             if (nx_packet_data_extract_offset(packet_ptr, 0, &packet_type, 1, &bytes_copied))
2307             {
2308 
2309                 /* Unable to read packet type. */
2310                 break;
2311             }
2312 
2313             /* Right shift 4 bits to get the packet type. */
2314             packet_type = packet_type >> 4;
2315 
2316             /* Process based on packet type. */
2317             switch (packet_type)
2318             {
2319             case MQTT_CONTROL_PACKET_TYPE_CONNECT:
2320                 /* Client does not accept connections.  Nothing needs to be done. */
2321                 break;
2322             case MQTT_CONTROL_PACKET_TYPE_CONNACK:
2323                 _nxd_mqtt_process_connack(client_ptr, packet_ptr, NX_NO_WAIT);
2324                 break;
2325 
2326             case MQTT_CONTROL_PACKET_TYPE_PUBLISH:
2327                 packet_consumed = _nxd_mqtt_process_publish(client_ptr, packet_ptr);
2328                 break;
2329 
2330             case MQTT_CONTROL_PACKET_TYPE_PUBACK:
2331             case MQTT_CONTROL_PACKET_TYPE_PUBREL:
2332                 _nxd_mqtt_process_publish_response(client_ptr, packet_ptr);
2333                 break;
2334 
2335             case MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE:
2336             case MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE:
2337                 /* Client should not process subscribe or unsubscribe message. */
2338                 break;
2339 
2340             case MQTT_CONTROL_PACKET_TYPE_SUBACK:
2341             case MQTT_CONTROL_PACKET_TYPE_UNSUBACK:
2342                 _nxd_mqtt_process_sub_unsub_ack(client_ptr, packet_ptr);
2343                 break;
2344 
2345 
2346             case MQTT_CONTROL_PACKET_TYPE_PINGREQ:
2347                 /* Client is not supposed to receive ping req.  Ignore it. */
2348                 break;
2349 
2350             case MQTT_CONTROL_PACKET_TYPE_PINGRESP:
2351                 _nxd_mqtt_process_pingresp(client_ptr);
2352                 break;
2353 
2354             case MQTT_CONTROL_PACKET_TYPE_DISCONNECT:
2355                 _nxd_mqtt_process_disconnect(client_ptr);
2356                 break;
2357 
2358             /* Publisher sender message type for QoS 2. Not supported. */
2359             case MQTT_CONTROL_PACKET_TYPE_PUBREC:
2360             case MQTT_CONTROL_PACKET_TYPE_PUBCOMP:
2361             default:
2362                 /* Unknown type. */
2363                 break;
2364             }
2365 
2366             if (packet_consumed)
2367             {
2368                 break;
2369             }
2370 
2371             /* Trim current packet. */
2372             offset += remaining_length;
2373             packet_length = packet_ptr -> nx_packet_length;
2374             if (packet_length > offset)
2375             {
2376 
2377                 /* Multiple MQTT message in one packet. */
2378                 packet_length = packet_ptr -> nx_packet_length - offset;
2379                 while ((ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr) <= offset)
2380                 {
2381                     offset -= (ULONG)(packet_ptr -> nx_packet_append_ptr - packet_ptr -> nx_packet_prepend_ptr);
2382 
2383                     /* Current packet can be released. */
2384                     previous_packet_ptr = packet_ptr;
2385                     packet_ptr = packet_ptr -> nx_packet_next;
2386                     previous_packet_ptr -> nx_packet_next = NX_NULL;
2387                     nx_packet_release(previous_packet_ptr);
2388                     if (packet_ptr == NX_NULL)
2389                     {
2390 
2391                         /* Invalid packet. */
2392                         break;
2393                     }
2394                 }
2395 
2396                 if (packet_ptr)
2397                 {
2398 
2399                     /* Adjust current packet. */
2400                     packet_ptr -> nx_packet_prepend_ptr = packet_ptr -> nx_packet_prepend_ptr + offset;
2401                     packet_ptr -> nx_packet_length = packet_length;
2402                 }
2403             }
2404             else
2405             {
2406 
2407                 /* All messages in current packet is processed. */
2408                 break;
2409             }
2410         }
2411 
2412         if (!packet_consumed)
2413         {
2414             nx_packet_release(packet_ptr);
2415         }
2416     }
2417 
2418     /* No more data in the receive queue.  Return. */
2419 
2420     return;
2421 }
2422 
2423 
2424 /**************************************************************************/
2425 /*                                                                        */
2426 /*  FUNCTION                                               RELEASE        */
2427 /*                                                                        */
2428 /*    _nxd_mqtt_tcp_establish_process                     PORTABLE C      */
2429 /*                                                           6.2.0        */
2430 /*  AUTHOR                                                                */
2431 /*                                                                        */
2432 /*    Yuxin Zhou, Microsoft Corporation                                   */
2433 /*                                                                        */
2434 /*  DESCRIPTION                                                           */
2435 /*                                                                        */
2436 /*    This function processes MQTT TCP connection establish event.        */
2437 /*                                                                        */
2438 /*  INPUT                                                                 */
2439 /*                                                                        */
2440 /*    client_ptr                            Pointer to MQTT Client        */
2441 /*                                                                        */
2442 /*  OUTPUT                                                                */
2443 /*                                                                        */
2444 /*    None                                                                */
2445 /*                                                                        */
2446 /*  CALLS                                                                 */
2447 /*                                                                        */
2448 /*    nx_secure_tls_session_start                                         */
2449 /*    _nxd_mqtt_client_connection_end                                     */
2450 /*    _nxd_mqtt_client_connect_packet_send                                */
2451 /*                                                                        */
2452 /*  CALLED BY                                                             */
2453 /*                                                                        */
2454 /*    _nxd_mqtt_client_event_process                                      */
2455 /*                                                                        */
2456 /*  RELEASE HISTORY                                                       */
2457 /*                                                                        */
2458 /*    DATE              NAME                      DESCRIPTION             */
2459 /*                                                                        */
2460 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2461 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2462 /*                                            resulting in version 6.1    */
2463 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
2464 /*                                            mqtt over websocket,        */
2465 /*                                            resulting in version 6.2.0  */
2466 /*                                                                        */
2467 /**************************************************************************/
_nxd_mqtt_tcp_establish_process(NXD_MQTT_CLIENT * client_ptr)2468 static VOID _nxd_mqtt_tcp_establish_process(NXD_MQTT_CLIENT *client_ptr)
2469 {
2470 UINT       status;
2471 
2472 
2473     /* TCP connection is established.  */
2474 
2475     /* If TLS is enabled, start TLS */
2476 #ifdef NX_SECURE_ENABLE
2477     if (client_ptr -> nxd_mqtt_client_use_tls)
2478     {
2479         status = nx_secure_tls_session_start(&(client_ptr -> nxd_mqtt_tls_session), &(client_ptr -> nxd_mqtt_client_socket), NX_NO_WAIT);
2480 
2481         if (status != NX_CONTINUE)
2482         {
2483 
2484             /* End connection. */
2485             _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2486 
2487             /* Check callback function.  */
2488             if (client_ptr -> nxd_mqtt_connect_notify)
2489             {
2490                 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2491             }
2492 
2493             return;
2494         }
2495 
2496         /* TLS in progress.  */
2497         client_ptr -> nxd_mqtt_tls_in_progress = NX_TRUE;
2498 
2499         return;
2500     }
2501 #endif /* NX_SECURE_ENABLE */
2502 
2503 #ifdef NXD_MQTT_OVER_WEBSOCKET
2504 
2505     /* If using websocket, start websocket connection.  */
2506     if (client_ptr -> nxd_mqtt_client_use_websocket)
2507     {
2508         status = nx_websocket_client_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_client_socket),
2509                                              client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
2510                                              client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
2511                                              (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
2512                                              NX_NO_WAIT);
2513 
2514         if (status != NX_IN_PROGRESS)
2515         {
2516 
2517             /* End connection. */
2518             _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2519 
2520             /* Check callback function.  */
2521             if (client_ptr -> nxd_mqtt_connect_notify)
2522             {
2523                 client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2524             }
2525 
2526             return;
2527         }
2528 
2529         return;
2530     }
2531 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2532 
2533     /* Start to send MQTT connect packet.  */
2534     status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
2535 
2536     /* Check status.  */
2537     if (status)
2538     {
2539 
2540         /* End connection. */
2541         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2542 
2543         /* Check callback function.  */
2544         if (client_ptr -> nxd_mqtt_connect_notify)
2545         {
2546             client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2547         }
2548     }
2549 }
2550 
2551 
2552 #ifdef NX_SECURE_ENABLE
2553 /**************************************************************************/
2554 /*                                                                        */
2555 /*  FUNCTION                                               RELEASE        */
2556 /*                                                                        */
2557 /*    _nxd_mqtt_tls_establish_process                     PORTABLE C      */
2558 /*                                                           6.2.0        */
2559 /*  AUTHOR                                                                */
2560 /*                                                                        */
2561 /*    Yuxin Zhou, Microsoft Corporation                                   */
2562 /*                                                                        */
2563 /*  DESCRIPTION                                                           */
2564 /*                                                                        */
2565 /*    This function processes TLS connection establish event.             */
2566 /*                                                                        */
2567 /*  INPUT                                                                 */
2568 /*                                                                        */
2569 /*    client_ptr                            Pointer to MQTT Client        */
2570 /*                                                                        */
2571 /*  OUTPUT                                                                */
2572 /*                                                                        */
2573 /*    None                                                                */
2574 /*                                                                        */
2575 /*  CALLS                                                                 */
2576 /*                                                                        */
2577 /*    _nx_secure_tls_handshake_process                                    */
2578 /*    _nxd_mqtt_client_connect_packet_send                                */
2579 /*    _nxd_mqtt_client_connection_end                                     */
2580 /*                                                                        */
2581 /*  CALLED BY                                                             */
2582 /*                                                                        */
2583 /*    _nxd_mqtt_client_event_process                                      */
2584 /*                                                                        */
2585 /*  RELEASE HISTORY                                                       */
2586 /*                                                                        */
2587 /*    DATE              NAME                      DESCRIPTION             */
2588 /*                                                                        */
2589 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2590 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2591 /*                                            resulting in version 6.1    */
2592 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
2593 /*                                            mqtt over websocket,        */
2594 /*                                            resulting in version 6.2.0  */
2595 /*                                                                        */
2596 /**************************************************************************/
_nxd_mqtt_tls_establish_process(NXD_MQTT_CLIENT * client_ptr)2597 static VOID _nxd_mqtt_tls_establish_process(NXD_MQTT_CLIENT *client_ptr)
2598 {
2599 UINT       status;
2600 
2601 
2602     /* Directly call handshake process for async mode. */
2603     status = _nx_secure_tls_handshake_process(&(client_ptr -> nxd_mqtt_tls_session), NX_NO_WAIT);
2604     if (status == NX_SUCCESS)
2605     {
2606 
2607         /* TLS session established.   */
2608         client_ptr -> nxd_mqtt_tls_in_progress = NX_FALSE;
2609 
2610 #ifdef NXD_MQTT_OVER_WEBSOCKET
2611 
2612         /* If using websocket, start websocket connection.  */
2613         if (client_ptr -> nxd_mqtt_client_use_websocket)
2614         {
2615             status = nx_websocket_client_secure_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_tls_session),
2616                                                         client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
2617                                                         client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
2618                                                         (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
2619                                                         NX_NO_WAIT);
2620 
2621             if (status == NX_IN_PROGRESS)
2622             {
2623                 return;
2624             }
2625         }
2626         else
2627 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2628         {
2629 
2630             /* Start to send MQTT connect packet.  */
2631             status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
2632         }
2633     }
2634     else if (status == NX_CONTINUE)
2635     {
2636         return;
2637     }
2638 
2639     /* Check status.  */
2640     if (status)
2641     {
2642 
2643         /* End connection. */
2644         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
2645 
2646         /* Check callback function.  */
2647         if (client_ptr -> nxd_mqtt_connect_notify)
2648         {
2649             client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
2650         }
2651     }
2652 
2653     return;
2654 }
2655 #endif /* NX_SECURE_ENABLE */
2656 
2657 /**************************************************************************/
2658 /*                                                                        */
2659 /*  FUNCTION                                               RELEASE        */
2660 /*                                                                        */
2661 /*    _nxd_mqtt_client_append_message                     PORTABLE C      */
2662 /*                                                           6.1          */
2663 /*  AUTHOR                                                                */
2664 /*                                                                        */
2665 /*    Yuxin Zhou, Microsoft Corporation                                   */
2666 /*                                                                        */
2667 /*  DESCRIPTION                                                           */
2668 /*                                                                        */
2669 /*    This function writes the message length and message in the outgoing */
2670 /*    MQTT packet.                                                        */
2671 /*                                                                        */
2672 /*  INPUT                                                                 */
2673 /*                                                                        */
2674 /*    client_ptr                            Pointer to MQTT Client        */
2675 /*    packet_ptr                            Outgoing MQTT packet          */
2676 /*    message                               Pointer to the message        */
2677 /*    length                                Length of the message         */
2678 /*    wait_option                           Wait option                   */
2679 /*                                                                        */
2680 /*  OUTPUT                                                                */
2681 /*                                                                        */
2682 /*    status                                                              */
2683 /*                                                                        */
2684 /*  CALLS                                                                 */
2685 /*                                                                        */
2686 /*    nx_packet_data_append                 Append packet data            */
2687 /*                                                                        */
2688 /*  CALLED BY                                                             */
2689 /*                                                                        */
2690 /*    _nxd_mqtt_client_sub_unsub                                          */
2691 /*    _nxd_mqtt_client_connect                                            */
2692 /*    _nxd_mqtt_client_publish                                            */
2693 /*                                                                        */
2694 /*  RELEASE HISTORY                                                       */
2695 /*                                                                        */
2696 /*    DATE              NAME                      DESCRIPTION             */
2697 /*                                                                        */
2698 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2699 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2700 /*                                            resulting in version 6.1    */
2701 /*                                                                        */
2702 /**************************************************************************/
_nxd_mqtt_client_append_message(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,CHAR * message,UINT length,ULONG wait_option)2703 UINT _nxd_mqtt_client_append_message(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr, CHAR *message, UINT length, ULONG wait_option)
2704 {
2705 UINT ret = 0;
2706 UCHAR len[2];
2707 
2708     len[0] = (length >> 8) & 0xFF;
2709     len[1] = length  & 0xFF;
2710 
2711     /* Append message length field. */
2712     ret = nx_packet_data_append(packet_ptr, len, 2, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
2713 
2714     if (ret)
2715     {
2716         return(ret);
2717     }
2718 
2719     if (length)
2720     {
2721         /* Copy the string into the packet. */
2722         ret = nx_packet_data_append(packet_ptr, message, length,
2723                                     client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
2724     }
2725 
2726     return(ret);
2727 }
2728 
2729 
2730 /**************************************************************************/
2731 /*                                                                        */
2732 /*  FUNCTION                                               RELEASE        */
2733 /*                                                                        */
2734 /*    _nxd_mqtt_client_connection_end                     PORTABLE C      */
2735 /*                                                           6.2.0        */
2736 /*  AUTHOR                                                                */
2737 /*                                                                        */
2738 /*    Yuxin Zhou, Microsoft Corporation                                   */
2739 /*                                                                        */
2740 /*  DESCRIPTION                                                           */
2741 /*                                                                        */
2742 /*    This function is used to end the MQTT connection.                   */
2743 /*                                                                        */
2744 /*  INPUT                                                                 */
2745 /*                                                                        */
2746 /*    client_ptr                            Pointer to MQTT Client        */
2747 /*    wait_option                           Wait option                   */
2748 /*                                                                        */
2749 /*  OUTPUT                                                                */
2750 /*                                                                        */
2751 /*    None                                                                */
2752 /*                                                                        */
2753 /*  CALLS                                                                 */
2754 /*                                                                        */
2755 /*    nx_secure_tls_session_end             End TLS session               */
2756 /*    nx_secure_tls_session_delete          Delete TLS session            */
2757 /*    nx_tcp_socket_disconnect              Close TCP connection          */
2758 /*    nx_tcp_client_socket_unbind           Unbind TCP socket             */
2759 /*    tx_timer_delete                       Delete timer                  */
2760 /*                                                                        */
2761 /*  CALLED BY                                                             */
2762 /*                                                                        */
2763 /*    _nxd_mqtt_client_connect                                            */
2764 /*    _nxd_mqtt_process_disconnect                                        */
2765 /*                                                                        */
2766 /*  RELEASE HISTORY                                                       */
2767 /*                                                                        */
2768 /*    DATE              NAME                      DESCRIPTION             */
2769 /*                                                                        */
2770 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2771 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2772 /*                                            resulting in version 6.1    */
2773 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
2774 /*                                            mqtt over websocket,        */
2775 /*                                            resulting in version 6.2.0  */
2776 /*                                                                        */
2777 /**************************************************************************/
_nxd_mqtt_client_connection_end(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)2778 VOID _nxd_mqtt_client_connection_end(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
2779 {
2780 
2781     /* Obtain the mutex. */
2782     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
2783 
2784     /* Mark the session as terminated. */
2785     client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
2786 
2787     /* Release the mutex. */
2788     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
2789 
2790 #ifdef NXD_MQTT_OVER_WEBSOCKET
2791     if (client_ptr -> nxd_mqtt_client_use_websocket)
2792     {
2793         nx_websocket_client_disconnect(&(client_ptr -> nxd_mqtt_client_websocket), wait_option);
2794     }
2795 #endif /* NXD_MQTT_OVER_WEBSOCKET */
2796 
2797 #ifdef NX_SECURE_ENABLE
2798     if (client_ptr -> nxd_mqtt_client_use_tls)
2799     {
2800         nx_secure_tls_session_end(&(client_ptr -> nxd_mqtt_tls_session), wait_option);
2801         nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
2802     }
2803 #endif
2804     nx_tcp_socket_disconnect(&(client_ptr -> nxd_mqtt_client_socket), wait_option);
2805     nx_tcp_client_socket_unbind(&(client_ptr -> nxd_mqtt_client_socket));
2806 
2807     /* Disable timer if timer has been started. */
2808     if (client_ptr -> nxd_mqtt_keepalive)
2809     {
2810          tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
2811     }
2812 }
2813 
2814 
2815 static UINT _nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT *client_ptr, UCHAR header_value);
2816 
2817 
2818 /* MQTT internal function */
2819 
2820 /**************************************************************************/
2821 /*                                                                        */
2822 /*  FUNCTION                                               RELEASE        */
2823 /*                                                                        */
2824 /*    _nxd_mqtt_periodic_timer_entry                      PORTABLE C      */
2825 /*                                                           6.1          */
2826 /*  AUTHOR                                                                */
2827 /*                                                                        */
2828 /*    Yuxin Zhou, Microsoft Corporation                                   */
2829 /*                                                                        */
2830 /*  DESCRIPTION                                                           */
2831 /*                                                                        */
2832 /*    This internal function is passed to MQTT client socket create call. */
2833 /*    This callback function notifies MQTT client thread when the TCP     */
2834 /*    connection is lost.                                                 */
2835 /*                                                                        */
2836 /*                                                                        */
2837 /*  INPUT                                                                 */
2838 /*                                                                        */
2839 /*    socket_ptr                            Pointer to TCP socket that    */
2840 /*                                            disconnected.               */
2841 /*                                                                        */
2842 /*  OUTPUT                                                                */
2843 /*                                                                        */
2844 /*    None                                                                */
2845 /*                                                                        */
2846 /*  CALLS                                                                 */
2847 /*                                                                        */
2848 /*    None                                                                */
2849 /*                                                                        */
2850 /*  CALLED BY                                                             */
2851 /*                                                                        */
2852 /*    TCP socket disconnect callback                                      */
2853 /*                                                                        */
2854 /*  RELEASE HISTORY                                                       */
2855 /*                                                                        */
2856 /*    DATE              NAME                      DESCRIPTION             */
2857 /*                                                                        */
2858 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2859 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
2860 /*                                            resulting in version 6.1    */
2861 /*                                                                        */
2862 /**************************************************************************/
_nxd_mqtt_periodic_timer_entry(ULONG client)2863 static VOID _nxd_mqtt_periodic_timer_entry(ULONG client)
2864 {
2865 /* Check if it is time to send out a ping message. */
2866 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)client;
2867 
2868     /* If an outstanding ping response has not been received, and the client exceeds the time waiting for ping response,
2869        the client shall disconnect from the server. */
2870     if (client_ptr -> nxd_mqtt_ping_not_responded)
2871     {
2872         /* If current time is greater than the ping timeout */
2873         if ((tx_time_get() - client_ptr -> nxd_mqtt_ping_sent_time) >= client_ptr -> nxd_mqtt_ping_timeout)
2874         {
2875             /* Ping timed out.  Need to terminate the connection. */
2876 #ifndef NXD_MQTT_CLOUD_ENABLE
2877             tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_PING_TIMEOUT_EVENT, TX_OR);
2878 #else
2879             nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_PING_TIMEOUT_EVENT);
2880 #endif /* NXD_MQTT_CLOUD_ENABLE */
2881 
2882             return;
2883         }
2884     }
2885 
2886     /* About to timeout? */
2887     if ((client_ptr -> nxd_mqtt_timeout - tx_time_get()) <= client_ptr -> nxd_mqtt_timer_value)
2888     {
2889         /* Set the flag so the MQTT thread can send the ping. */
2890 #ifndef NXD_MQTT_CLOUD_ENABLE
2891         tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_TIMEOUT_EVENT, TX_OR);
2892 #else
2893         nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_TIMEOUT_EVENT);
2894 #endif /* NXD_MQTT_CLOUD_ENABLE */
2895     }
2896 
2897     /* If keepalive is not enabled, just return. */
2898     return;
2899 }
2900 
2901 
2902 
2903 /**************************************************************************/
2904 /*                                                                        */
2905 /*  FUNCTION                                               RELEASE        */
2906 /*                                                                        */
2907 /*    _nxd_mqtt_client_event_process                      PORTABLE C      */
2908 /*                                                           6.1          */
2909 /*  AUTHOR                                                                */
2910 /*                                                                        */
2911 /*    Yuxin Zhou, Microsoft Corporation                                   */
2912 /*                                                                        */
2913 /*  DESCRIPTION                                                           */
2914 /*                                                                        */
2915 /*    This internal function serves as the entry point for the MQTT       */
2916 /*    client thread.                                                      */
2917 /*                                                                        */
2918 /*  INPUT                                                                 */
2919 /*                                                                        */
2920 /*    mqtt_client                           Pointer to MQTT Client        */
2921 /*                                                                        */
2922 /*  OUTPUT                                                                */
2923 /*                                                                        */
2924 /*    None                                                                */
2925 /*                                                                        */
2926 /*  CALLS                                                                 */
2927 /*                                                                        */
2928 /*    _nxd_mqtt_release_transmit_packet                                   */
2929 /*    _nxd_mqtt_release_receive_packet                                    */
2930 /*    _nxd_mqtt_send_simple_message                                       */
2931 /*    _nxd_mqtt_process_disconnect                                        */
2932 /*    _nxd_mqtt_packet_receive_process                                    */
2933 /*    tx_timer_delete                                                     */
2934 /*    tx_event_flags_delete                                               */
2935 /*    nx_tcp_socket_delete                                                */
2936 /*    tx_timer_delete                                                     */
2937 /*    tx_mutex_delete                                                     */
2938 /*                                                                        */
2939 /*  CALLED BY                                                             */
2940 /*                                                                        */
2941 /*   _nxd_mqtt_client_create                                              */
2942 /*                                                                        */
2943 /*  RELEASE HISTORY                                                       */
2944 /*                                                                        */
2945 /*    DATE              NAME                      DESCRIPTION             */
2946 /*                                                                        */
2947 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
2948 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
2949 /*                                            corrected mqtt client state,*/
2950 /*                                            resulting in version 6.1    */
2951 /*                                                                        */
2952 /**************************************************************************/
_nxd_mqtt_client_event_process(VOID * mqtt_client,ULONG common_events,ULONG module_own_events)2953 static VOID _nxd_mqtt_client_event_process(VOID *mqtt_client, ULONG common_events, ULONG module_own_events)
2954 {
2955 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)mqtt_client;
2956 
2957 
2958     /* Obtain the mutex. */
2959     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
2960 
2961     /* Process common events.  */
2962     NX_PARAMETER_NOT_USED(common_events);
2963 
2964     if (module_own_events & MQTT_TIMEOUT_EVENT)
2965     {
2966         /* Send out PING only if the client is connected. */
2967         if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
2968         {
2969             _nxd_mqtt_send_simple_message(client_ptr, MQTT_CONTROL_PACKET_TYPE_PINGREQ);
2970         }
2971     }
2972 
2973     if (module_own_events & MQTT_TCP_ESTABLISH_EVENT)
2974     {
2975         _nxd_mqtt_tcp_establish_process(client_ptr);
2976     }
2977 
2978     if (module_own_events & MQTT_PACKET_RECEIVE_EVENT)
2979     {
2980 #ifdef NX_SECURE_ENABLE
2981         /* TLS in progress on async mode.  */
2982         if (client_ptr -> nxd_mqtt_tls_in_progress)
2983         {
2984             _nxd_mqtt_tls_establish_process(client_ptr);
2985         }
2986         else
2987 #endif /* NX_SECURE_ENABLE */
2988 
2989         _nxd_mqtt_packet_receive_process(client_ptr);
2990     }
2991 
2992     if (module_own_events & MQTT_PING_TIMEOUT_EVENT)
2993     {
2994         /* The server/broker didn't respond to our ping request message. Disconnect from the server. */
2995         _nxd_mqtt_process_disconnect(client_ptr);
2996     }
2997     if (module_own_events & MQTT_NETWORK_DISCONNECT_EVENT)
2998     {
2999         /* The server closed TCP socket. We shall go through the disconnect code path. */
3000         _nxd_mqtt_process_disconnect(client_ptr);
3001     }
3002 
3003     if (module_own_events & MQTT_DELETE_EVENT)
3004     {
3005 
3006         /* Stop the client and disconnect from the server. */
3007         if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
3008         {
3009             _nxd_mqtt_process_disconnect(client_ptr);
3010         }
3011 
3012         /* Delete the timer. Check first if it is already deleted. */
3013         if ((client_ptr -> nxd_mqtt_timer).tx_timer_id != 0)
3014             tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
3015 
3016 #ifndef NXD_MQTT_CLOUD_ENABLE
3017         /* Delete the event flag. Check first if it is already deleted. */
3018         if ((client_ptr -> nxd_mqtt_events).tx_event_flags_group_id != 0)
3019             tx_event_flags_delete(&client_ptr -> nxd_mqtt_events);
3020 #endif /* NXD_MQTT_CLOUD_ENABLE */
3021 
3022         /* Release all the messages on the receive queue. */
3023         while (client_ptr -> message_receive_queue_head)
3024         {
3025             _nxd_mqtt_release_receive_packet(client_ptr, client_ptr -> message_receive_queue_head, NX_NULL);
3026         }
3027         client_ptr -> message_receive_queue_depth = 0;
3028 
3029         /* Delete all the messages sitting in the receive and transmit queue. */
3030         while (client_ptr -> message_transmit_queue_head)
3031         {
3032             _nxd_mqtt_release_transmit_packet(client_ptr, client_ptr -> message_transmit_queue_head, NX_NULL);
3033         }
3034 
3035         /* Release mutex */
3036         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3037 
3038 #ifndef NXD_MQTT_CLOUD_ENABLE
3039         /* Delete the mutex. */
3040         tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3041 #endif /* NXD_MQTT_CLOUD_ENABLE */
3042 
3043         /* Deleting the socket, (the socket ID is cleared); this signals it is ok to delete this thread. */
3044         nx_tcp_socket_delete(&client_ptr -> nxd_mqtt_client_socket);
3045     }
3046     else
3047     {
3048 
3049         /* Release mutex */
3050         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3051     }
3052 }
3053 
3054 
3055 #ifndef NXD_MQTT_CLOUD_ENABLE
3056 /**************************************************************************/
3057 /*                                                                        */
3058 /*  FUNCTION                                               RELEASE        */
3059 /*                                                                        */
3060 /*    _nxd_mqtt_thread_entry                              PORTABLE C      */
3061 /*                                                           6.1          */
3062 /*  AUTHOR                                                                */
3063 /*                                                                        */
3064 /*    Yuxin Zhou, Microsoft Corporation                                   */
3065 /*                                                                        */
3066 /*  DESCRIPTION                                                           */
3067 /*                                                                        */
3068 /*    This internal function serves as the entry point for the MQTT       */
3069 /*    client thread.                                                      */
3070 /*                                                                        */
3071 /*  INPUT                                                                 */
3072 /*                                                                        */
3073 /*    mqtt_client                           Pointer to MQTT Client        */
3074 /*                                                                        */
3075 /*  OUTPUT                                                                */
3076 /*                                                                        */
3077 /*    None                                                                */
3078 /*                                                                        */
3079 /*  CALLS                                                                 */
3080 /*                                                                        */
3081 /*    tx_event_flags_get                                                  */
3082 /*    _nxd_mqtt_client_event_process                                      */
3083 /*                                                                        */
3084 /*  CALLED BY                                                             */
3085 /*                                                                        */
3086 /*   _nxd_mqtt_client_create                                              */
3087 /*                                                                        */
3088 /*  RELEASE HISTORY                                                       */
3089 /*                                                                        */
3090 /*    DATE              NAME                      DESCRIPTION             */
3091 /*                                                                        */
3092 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3093 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3094 /*                                            resulting in version 6.1    */
3095 /*                                                                        */
3096 /**************************************************************************/
_nxd_mqtt_thread_entry(ULONG mqtt_client)3097 static VOID _nxd_mqtt_thread_entry(ULONG mqtt_client)
3098 {
3099 NXD_MQTT_CLIENT *client_ptr;
3100 ULONG            events;
3101 
3102     client_ptr = (NXD_MQTT_CLIENT *)mqtt_client;
3103 
3104     /* Loop to process events on the MQTT client */
3105     for (;;)
3106     {
3107 
3108         tx_event_flags_get(&client_ptr -> nxd_mqtt_events, MQTT_ALL_EVENTS, TX_OR_CLEAR, &events, TX_WAIT_FOREVER);
3109 
3110         /* Call the event processing routine.  */
3111         _nxd_mqtt_client_event_process(client_ptr, NX_NULL, events);
3112 
3113         if (events & MQTT_DELETE_EVENT)
3114         {
3115             break;
3116         }
3117     }
3118 }
3119 #endif /* NXD_MQTT_CLOUD_ENABLE */
3120 
3121 
3122 /**************************************************************************/
3123 /*                                                                        */
3124 /*  FUNCTION                                               RELEASE        */
3125 /*                                                                        */
3126 /*    _mqtt_client_disconnect_callback                    PORTABLE C      */
3127 /*                                                           6.1          */
3128 /*  AUTHOR                                                                */
3129 /*                                                                        */
3130 /*    Yuxin Zhou, Microsoft Corporation                                   */
3131 /*                                                                        */
3132 /*  DESCRIPTION                                                           */
3133 /*                                                                        */
3134 /*    This internal function is passed to MQTT client socket create call. */
3135 /*    This callback function notifies MQTT client thread when the TCP     */
3136 /*    connection is lost.                                                 */
3137 /*                                                                        */
3138 /*                                                                        */
3139 /*  INPUT                                                                 */
3140 /*                                                                        */
3141 /*    socket_ptr                            Pointer to TCP socket that    */
3142 /*                                            disconnected.               */
3143 /*                                                                        */
3144 /*  OUTPUT                                                                */
3145 /*                                                                        */
3146 /*    None                                                                */
3147 /*                                                                        */
3148 /*  CALLS                                                                 */
3149 /*                                                                        */
3150 /*    None                                                                */
3151 /*                                                                        */
3152 /*  CALLED BY                                                             */
3153 /*                                                                        */
3154 /*    TCP socket disconnect callback                                      */
3155 /*                                                                        */
3156 /*  RELEASE HISTORY                                                       */
3157 /*                                                                        */
3158 /*    DATE              NAME                      DESCRIPTION             */
3159 /*                                                                        */
3160 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3161 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3162 /*                                            resulting in version 6.1    */
3163 /*                                                                        */
3164 /**************************************************************************/
_mqtt_client_disconnect_callback(NX_TCP_SOCKET * socket_ptr)3165 static VOID _mqtt_client_disconnect_callback(NX_TCP_SOCKET *socket_ptr)
3166 {
3167 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)(socket_ptr -> nx_tcp_socket_reserved_ptr);
3168 
3169     /* Set the MQTT_NETWORK_DISCONNECT  event.  This event indicates
3170        that the disconnect is initiated from the network. */
3171 #ifndef NXD_MQTT_CLOUD_ENABLE
3172     tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_NETWORK_DISCONNECT_EVENT, TX_OR);
3173 #else
3174     nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_NETWORK_DISCONNECT_EVENT);
3175 #endif /* NXD_MQTT_CLOUD_ENABLE */
3176 
3177     return;
3178 }
3179 
3180 
3181 /**************************************************************************/
3182 /*                                                                        */
3183 /*  FUNCTION                                               RELEASE        */
3184 /*                                                                        */
3185 /*    _nxd_mqtt_client_create                             PORTABLE C      */
3186 /*                                                           6.1          */
3187 /*  AUTHOR                                                                */
3188 /*                                                                        */
3189 /*    Yuxin Zhou, Microsoft Corporation                                   */
3190 /*                                                                        */
3191 /*  DESCRIPTION                                                           */
3192 /*                                                                        */
3193 /*    This function creates an instance for MQTT Client.  It initializes  */
3194 /*    MQTT Client control block, creates a thread, mutex and event queue  */
3195 /*    for MQTT Client, and creates the TCP socket for MQTT messaging.     */
3196 /*                                                                        */
3197 /*                                                                        */
3198 /*  INPUT                                                                 */
3199 /*                                                                        */
3200 /*    client_ptr                            Pointer to MQTT Client        */
3201 /*    client_id                             Client ID for this instance   */
3202 /*    client_id_length                      Length of Client ID, in bytes */
3203 /*    ip_ptr                                Pointer to IP instance        */
3204 /*    pool_ptr                              Pointer to packet pool        */
3205 /*    stack_ptr                             Client thread's stack pointer */
3206 /*    stack_size                            Client thread's stack size    */
3207 /*    mqtt_thread_priority                  Priority for MQTT thread      */
3208 /*    memory_ptr                            Deprecated and not used       */
3209 /*    memory_size                           Deprecated and not used       */
3210 /*                                                                        */
3211 /*  OUTPUT                                                                */
3212 /*                                                                        */
3213 /*    status                                Completion status             */
3214 /*                                                                        */
3215 /*  CALLS                                                                 */
3216 /*                                                                        */
3217 /*    _nxd_mqtt_client_create_internal      Create mqtt client            */
3218 /*                                                                        */
3219 /*  CALLED BY                                                             */
3220 /*                                                                        */
3221 /*    Application Code                                                    */
3222 /*                                                                        */
3223 /*  RELEASE HISTORY                                                       */
3224 /*                                                                        */
3225 /*    DATE              NAME                      DESCRIPTION             */
3226 /*                                                                        */
3227 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3228 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
3229 /*                                            corrected mqtt client state,*/
3230 /*                                            resulting in version 6.1    */
3231 /*                                                                        */
3232 /**************************************************************************/
_nxd_mqtt_client_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority,VOID * memory_ptr,ULONG memory_size)3233 UINT _nxd_mqtt_client_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
3234                              CHAR *client_id, UINT client_id_length,
3235                              NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
3236                              VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority,
3237                              VOID *memory_ptr, ULONG memory_size)
3238 {
3239 
3240 UINT    status;
3241 
3242 
3243     NX_PARAMETER_NOT_USED(memory_ptr);
3244     NX_PARAMETER_NOT_USED(memory_size);
3245 
3246     /* Create MQTT client.  */
3247     status = _nxd_mqtt_client_create_internal(client_ptr, client_name, client_id, client_id_length, ip_ptr,
3248                                               pool_ptr, stack_ptr, stack_size, mqtt_thread_priority);
3249 
3250     /* Check status.  */
3251     if (status)
3252     {
3253         return(status);
3254     }
3255 
3256 #ifdef NXD_MQTT_CLOUD_ENABLE
3257 
3258     /* Create cloud helper.  */
3259     status = nx_cloud_create(&(client_ptr -> nxd_mqtt_client_cloud), "Cloud Helper", stack_ptr, stack_size, mqtt_thread_priority);
3260 
3261     /* Determine if an error occurred.  */
3262     if (status != NX_SUCCESS)
3263     {
3264 
3265         /* Delete internal resource created in _nxd_mqtt_client_create_internal().  */
3266 
3267         /* Delete socket.  */
3268         nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
3269 
3270         return(NXD_MQTT_INTERNAL_ERROR);
3271     }
3272 
3273     /* Save the cloud pointer.  */
3274     client_ptr -> nxd_mqtt_client_cloud_ptr = &(client_ptr -> nxd_mqtt_client_cloud);
3275 
3276     /* Save the mutex pointer.  */
3277     client_ptr -> nxd_mqtt_client_mutex_ptr = &(client_ptr -> nxd_mqtt_client_cloud.nx_cloud_mutex);
3278 
3279     /* Register MQTT on cloud helper.  */
3280     status = nx_cloud_module_register(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module), client_name, NX_CLOUD_MODULE_MQTT_EVENT,
3281                                       _nxd_mqtt_client_event_process, client_ptr);
3282 
3283     /* Determine if an error occurred.  */
3284     if (status != NX_SUCCESS)
3285     {
3286 
3287         /* Delete own created cloud.  */
3288         nx_cloud_delete(&(client_ptr -> nxd_mqtt_client_cloud));
3289 
3290         /* Delete internal resource created in _nxd_mqtt_client_create_internal().  */
3291 
3292         /* Delete socket.  */
3293         nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
3294 
3295         return(NXD_MQTT_INTERNAL_ERROR);
3296     }
3297 
3298 #endif /* NXD_MQTT_CLOUD_ENABLE */
3299 
3300     /* Update state.  */
3301     client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
3302 
3303     /* Return.  */
3304     return(NXD_MQTT_SUCCESS);
3305 }
3306 
3307 
3308 /**************************************************************************/
3309 /*                                                                        */
3310 /*  FUNCTION                                               RELEASE        */
3311 /*                                                                        */
3312 /*    _nxd_mqtt_client_create_internal                    PORTABLE C      */
3313 /*                                                           6.1.12       */
3314 /*  AUTHOR                                                                */
3315 /*                                                                        */
3316 /*    Yuxin Zhou, Microsoft Corporation                                   */
3317 /*                                                                        */
3318 /*  DESCRIPTION                                                           */
3319 /*                                                                        */
3320 /*    This function creates an instance for MQTT Client.  It initializes  */
3321 /*    MQTT Client control block, creates a thread, mutex and event queue  */
3322 /*    for MQTT Client, and creates the TCP socket for MQTT messaging.     */
3323 /*                                                                        */
3324 /*                                                                        */
3325 /*  INPUT                                                                 */
3326 /*                                                                        */
3327 /*    client_ptr                            Pointer to MQTT Client        */
3328 /*    client_id                             Client ID for this instance   */
3329 /*    client_id_length                      Length of Client ID, in bytes */
3330 /*    ip_ptr                                Pointer to IP instance        */
3331 /*    pool_ptr                              Pointer to packet pool        */
3332 /*    stack_ptr                             Client thread's stack pointer */
3333 /*    stack_size                            Client thread's stack size    */
3334 /*    mqtt_thread_priority                  Priority for MQTT thread      */
3335 /*                                                                        */
3336 /*  OUTPUT                                                                */
3337 /*                                                                        */
3338 /*    status                                Completion status             */
3339 /*                                                                        */
3340 /*  CALLS                                                                 */
3341 /*                                                                        */
3342 /*    tx_thread_create                      Create a thread               */
3343 /*    tx_mutex_create                       Create mutex                  */
3344 /*    tx_mutex_get                                                        */
3345 /*    tx_mutex_put                                                        */
3346 /*    tx_event_flag_create                  Create event flag             */
3347 /*                                                                        */
3348 /*  CALLED BY                                                             */
3349 /*                                                                        */
3350 /*    Application Code                                                    */
3351 /*                                                                        */
3352 /*  RELEASE HISTORY                                                       */
3353 /*                                                                        */
3354 /*    DATE              NAME                      DESCRIPTION             */
3355 /*                                                                        */
3356 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3357 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
3358 /*                                            corrected mqtt client state,*/
3359 /*                                            resulting in version 6.1    */
3360 /*  07-29-2022     Spencer McDonough        Modified comment(s), and      */
3361 /*                                            improved internal logic,    */
3362 /*                                            resulting in version 6.1.12 */
3363 /*                                                                        */
3364 /**************************************************************************/
_nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority)3365 static UINT _nxd_mqtt_client_create_internal(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name,
3366                                              CHAR *client_id, UINT client_id_length,
3367                                              NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
3368                                              VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority)
3369 {
3370 #ifndef NXD_MQTT_CLOUD_ENABLE
3371 UINT                status;
3372 #endif /* NXD_MQTT_CLOUD_ENABLE */
3373 
3374 #ifdef NXD_MQTT_CLOUD_ENABLE
3375     NX_PARAMETER_NOT_USED(stack_ptr);
3376     NX_PARAMETER_NOT_USED(stack_size);
3377     NX_PARAMETER_NOT_USED(mqtt_thread_priority);
3378 #endif /* NXD_MQTT_CLOUD_ENABLE */
3379 
3380     /* Clear the MQTT Client control block. */
3381     NXD_MQTT_SECURE_MEMSET((void *)client_ptr, 0, sizeof(NXD_MQTT_CLIENT));
3382 
3383 #ifndef NXD_MQTT_CLOUD_ENABLE
3384 
3385     /* Create MQTT mutex.  */
3386     status = tx_mutex_create(&client_ptr -> nxd_mqtt_protection, client_name, TX_NO_INHERIT);
3387 
3388     /* Determine if an error occurred. */
3389     if (status != TX_SUCCESS)
3390     {
3391 
3392         return(NXD_MQTT_INTERNAL_ERROR);
3393     }
3394     client_ptr -> nxd_mqtt_client_mutex_ptr = &(client_ptr -> nxd_mqtt_protection);
3395 
3396     /* Now create MQTT client thread */
3397     status = tx_thread_create(&(client_ptr -> nxd_mqtt_thread), client_name, _nxd_mqtt_thread_entry,
3398                               (ULONG)client_ptr, stack_ptr, stack_size, mqtt_thread_priority, mqtt_thread_priority,
3399                               NXD_MQTT_CLIENT_THREAD_TIME_SLICE, TX_DONT_START);
3400 
3401     /* Determine if an error occurred. */
3402     if (status != TX_SUCCESS)
3403     {
3404         /* Delete the mutex. */
3405         tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3406 
3407         /* Return error code. */
3408         return(NXD_MQTT_INTERNAL_ERROR);
3409     }
3410 
3411     status = tx_event_flags_create(&(client_ptr -> nxd_mqtt_events), client_name);
3412 
3413     if (status != TX_SUCCESS)
3414     {
3415         /* Delete the mutex. */
3416         tx_mutex_delete(&client_ptr -> nxd_mqtt_protection);
3417 
3418         /* Delete the thread. */
3419         tx_thread_delete(&(client_ptr -> nxd_mqtt_thread));
3420 
3421         /* Return error code. */
3422         return(NXD_MQTT_INTERNAL_ERROR);
3423     }
3424 #endif /* NXD_MQTT_CLOUD_ENABLE */
3425 
3426     /* Record the client ID information. */
3427     client_ptr -> nxd_mqtt_client_id = client_id;
3428     client_ptr -> nxd_mqtt_client_id_length = client_id_length;
3429     client_ptr -> nxd_mqtt_client_ip_ptr = ip_ptr;
3430     client_ptr -> nxd_mqtt_client_packet_pool_ptr = pool_ptr;
3431     client_ptr -> nxd_mqtt_client_name = client_name;
3432 
3433     /* Create the socket. */
3434     nx_tcp_socket_create(client_ptr -> nxd_mqtt_client_ip_ptr, &(client_ptr -> nxd_mqtt_client_socket), client_ptr -> nxd_mqtt_client_name,
3435                          NX_IP_NORMAL, NX_DONT_FRAGMENT, 0x80, NXD_MQTT_CLIENT_SOCKET_WINDOW_SIZE,
3436                          NX_NULL, _mqtt_client_disconnect_callback);
3437 
3438 
3439     /* Record the client_ptr in the socket structure. */
3440     client_ptr -> nxd_mqtt_client_socket.nx_tcp_socket_reserved_ptr = (VOID *)client_ptr;
3441 
3442 #ifndef NXD_MQTT_CLOUD_ENABLE
3443     /* Start MQTT thread. */
3444     tx_thread_resume(&(client_ptr -> nxd_mqtt_thread));
3445 #endif /* NXD_MQTT_CLOUD_ENABLE */
3446     return(NXD_MQTT_SUCCESS);
3447 }
3448 
3449 
3450 /**************************************************************************/
3451 /*                                                                        */
3452 /*  FUNCTION                                               RELEASE        */
3453 /*                                                                        */
3454 /*    _nxd_mqtt_client_login_set                          PORTABLE C      */
3455 /*                                                           6.1          */
3456 /*  AUTHOR                                                                */
3457 /*                                                                        */
3458 /*    Yuxin Zhou, Microsoft Corporation                                   */
3459 /*                                                                        */
3460 /*  DESCRIPTION                                                           */
3461 /*                                                                        */
3462 /*    This function sets optional MQTT username and password.  Note       */
3463 /*    if the broker requires username and password, this information      */
3464 /*    must be set prior to calling nxd_mqtt_client_connect or             */
3465 /*    nxd_mqtt_client_secure_connect.                                     */
3466 /*                                                                        */
3467 /*                                                                        */
3468 /*  INPUT                                                                 */
3469 /*                                                                        */
3470 /*    client_ptr                            Pointer to MQTT Client        */
3471 /*    username                              User name, or NULL if user    */
3472 /*                                            name is not required        */
3473 /*    username_length                       Length of the user name, or   */
3474 /*                                            0 if user name is NULL      */
3475 /*    password                              Password string, or NULL if   */
3476 /*                                            password is not required    */
3477 /*    password_length                       Length of the password, or    */
3478 /*                                            0 if password is NULL       */
3479 /*                                                                        */
3480 /*  OUTPUT                                                                */
3481 /*                                                                        */
3482 /*    status                                Completion status             */
3483 /*                                                                        */
3484 /*  CALLS                                                                 */
3485 /*                                                                        */
3486 /*    tx_mutex_get                                                        */
3487 /*                                                                        */
3488 /*  CALLED BY                                                             */
3489 /*                                                                        */
3490 /*    Application Code                                                    */
3491 /*                                                                        */
3492 /*  RELEASE HISTORY                                                       */
3493 /*                                                                        */
3494 /*    DATE              NAME                      DESCRIPTION             */
3495 /*                                                                        */
3496 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3497 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3498 /*                                            resulting in version 6.1    */
3499 /*                                                                        */
3500 /**************************************************************************/
_nxd_mqtt_client_login_set(NXD_MQTT_CLIENT * client_ptr,CHAR * username,UINT username_length,CHAR * password,UINT password_length)3501 UINT _nxd_mqtt_client_login_set(NXD_MQTT_CLIENT *client_ptr,
3502                                 CHAR *username, UINT username_length, CHAR *password, UINT password_length)
3503 {
3504 UINT status;
3505 
3506     /* Obtain the mutex. */
3507     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3508 
3509     if (status != TX_SUCCESS)
3510     {
3511         return(NXD_MQTT_MUTEX_FAILURE);
3512     }
3513     client_ptr -> nxd_mqtt_client_username = username;
3514     client_ptr -> nxd_mqtt_client_username_length = (USHORT)username_length;
3515     client_ptr -> nxd_mqtt_client_password = password;
3516     client_ptr -> nxd_mqtt_client_password_length = (USHORT)password_length;
3517 
3518     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3519 
3520     return(NX_SUCCESS);
3521 }
3522 
3523 /**************************************************************************/
3524 /*                                                                        */
3525 /*  FUNCTION                                               RELEASE        */
3526 /*                                                                        */
3527 /*    _nxd_mqtt_client_will_message_set                   PORTABLE C      */
3528 /*                                                           6.1          */
3529 /*  AUTHOR                                                                */
3530 /*                                                                        */
3531 /*    Yuxin Zhou, Microsoft Corporation                                   */
3532 /*                                                                        */
3533 /*  DESCRIPTION                                                           */
3534 /*                                                                        */
3535 /*    This function sets optional MQTT will topic and will message.       */
3536 /*    Note that if will message is needed, application must set will      */
3537 /*    message prior to calling nxd_mqtt_client_connect or                 */
3538 /*    nxd_mqtt_client_secure_connect.                                     */
3539 /*                                                                        */
3540 /*                                                                        */
3541 /*  INPUT                                                                 */
3542 /*                                                                        */
3543 /*    client_ptr                            Pointer to MQTT Client        */
3544 /*    will_topic                            Will topic string.            */
3545 /*    will_topic_length                     Length of the will topic.     */
3546 /*    will_message                          Will message string.          */
3547 /*    will_message_length                   Length of the will message.   */
3548 /*    will_retain_flag                      Whether or not the will       */
3549 /*                                            message is to be retained   */
3550 /*                                            when it is published.       */
3551 /*                                            Valid values are NX_TRUE    */
3552 /*                                            NX_FALSE                    */
3553 /*    will_QoS                              QoS level to be used when     */
3554 /*                                            publishing will message.    */
3555 /*                                            Valid values are 0, 1, 2    */
3556 /*                                                                        */
3557 /*  OUTPUT                                                                */
3558 /*                                                                        */
3559 /*    status                                Completion status             */
3560 /*                                                                        */
3561 /*  CALLS                                                                 */
3562 /*                                                                        */
3563 /*    tx_mutex_get                                                        */
3564 /*                                                                        */
3565 /*  CALLED BY                                                             */
3566 /*                                                                        */
3567 /*    Application Code                                                    */
3568 /*                                                                        */
3569 /*  RELEASE HISTORY                                                       */
3570 /*                                                                        */
3571 /*    DATE              NAME                      DESCRIPTION             */
3572 /*                                                                        */
3573 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3574 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3575 /*                                            resulting in version 6.1    */
3576 /*                                                                        */
3577 /**************************************************************************/
_nxd_mqtt_client_will_message_set(NXD_MQTT_CLIENT * client_ptr,const UCHAR * will_topic,UINT will_topic_length,const UCHAR * will_message,UINT will_message_length,UINT will_retain_flag,UINT will_QoS)3578 UINT _nxd_mqtt_client_will_message_set(NXD_MQTT_CLIENT *client_ptr,
3579                                        const UCHAR *will_topic, UINT will_topic_length, const UCHAR *will_message,
3580                                        UINT will_message_length, UINT will_retain_flag, UINT will_QoS)
3581 {
3582 UINT status;
3583 
3584     if (will_QoS == 2)
3585     {
3586         return(NXD_MQTT_QOS2_NOT_SUPPORTED);
3587     }
3588 
3589     /* Obtain the mutex. */
3590     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3591 
3592     if (status != TX_SUCCESS)
3593     {
3594         return(NXD_MQTT_MUTEX_FAILURE);
3595     }
3596 
3597     client_ptr -> nxd_mqtt_client_will_topic = will_topic;
3598     client_ptr -> nxd_mqtt_client_will_topic_length = will_topic_length;
3599     client_ptr -> nxd_mqtt_client_will_message = will_message;
3600     client_ptr -> nxd_mqtt_client_will_message_length = will_message_length;
3601 
3602     if (will_retain_flag == NX_TRUE)
3603     {
3604         client_ptr -> nxd_mqtt_client_will_qos_retain = 0x80;
3605     }
3606     client_ptr -> nxd_mqtt_client_will_qos_retain = (UCHAR)(client_ptr -> nxd_mqtt_client_will_qos_retain | will_QoS);
3607 
3608     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3609 
3610     return(NX_SUCCESS);
3611 }
3612 
3613 
3614 /**************************************************************************/
3615 /*                                                                        */
3616 /*  FUNCTION                                               RELEASE        */
3617 /*                                                                        */
3618 /*    _nxde_mqtt_client_will_message_set                  PORTABLE C      */
3619 /*                                                           6.1          */
3620 /*  AUTHOR                                                                */
3621 /*                                                                        */
3622 /*    Yuxin Zhou, Microsoft Corporation                                   */
3623 /*                                                                        */
3624 /*  DESCRIPTION                                                           */
3625 /*                                                                        */
3626 /*    This function checks for errors in the                              */
3627 /*    nxd_mqtt_client_will_message_set call.                              */
3628 /*                                                                        */
3629 /*                                                                        */
3630 /*  INPUT                                                                 */
3631 /*                                                                        */
3632 /*    client_ptr                            Pointer to MQTT Client        */
3633 /*    will_topic                            Will topic string.            */
3634 /*    will_topic_length                     Length of the will topic.     */
3635 /*    will_message                          Will message string.          */
3636 /*    will_message_length                   Length of the will message.   */
3637 /*    will_retain_flag                      Whether or not the will       */
3638 /*                                            message is to be retained   */
3639 /*                                            when it is published.       */
3640 /*                                            Valid values are NX_TRUE    */
3641 /*                                            NX_FALSE                    */
3642 /*    will_QoS                              QoS level to be used when     */
3643 /*                                            publishing will message.    */
3644 /*                                            Valid values are 0, 1, 2    */
3645 /*                                                                        */
3646 /*  OUTPUT                                                                */
3647 /*                                                                        */
3648 /*    status                                Completion status             */
3649 /*                                                                        */
3650 /*  CALLS                                                                 */
3651 /*                                                                        */
3652 /*    _nxd_mqtt_client_will_message_set                                   */
3653 /*                                                                        */
3654 /*  CALLED BY                                                             */
3655 /*                                                                        */
3656 /*    Application Code                                                    */
3657 /*                                                                        */
3658 /*  RELEASE HISTORY                                                       */
3659 /*                                                                        */
3660 /*    DATE              NAME                      DESCRIPTION             */
3661 /*                                                                        */
3662 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3663 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3664 /*                                            resulting in version 6.1    */
3665 /*                                                                        */
3666 /**************************************************************************/
_nxde_mqtt_client_will_message_set(NXD_MQTT_CLIENT * client_ptr,const UCHAR * will_topic,UINT will_topic_length,const UCHAR * will_message,UINT will_message_length,UINT will_retain_flag,UINT will_QoS)3667 UINT _nxde_mqtt_client_will_message_set(NXD_MQTT_CLIENT *client_ptr,
3668                                         const UCHAR *will_topic, UINT will_topic_length, const UCHAR *will_message,
3669                                         UINT will_message_length, UINT will_retain_flag, UINT will_QoS)
3670 {
3671 
3672     if (client_ptr == NX_NULL)
3673     {
3674         return(NXD_MQTT_INVALID_PARAMETER);
3675     }
3676 
3677     /* Valid will_topic string.  The will topic string cannot be NULL. */
3678     if ((will_topic == NX_NULL) || (will_topic_length  == 0))
3679     {
3680         return(NXD_MQTT_INVALID_PARAMETER);
3681     }
3682 
3683     if ((will_retain_flag != NX_TRUE) && (will_retain_flag != NX_FALSE))
3684     {
3685         return(NXD_MQTT_INVALID_PARAMETER);
3686     }
3687 
3688     if (will_QoS > 2)
3689     {
3690         return(NXD_MQTT_INVALID_PARAMETER);
3691     }
3692 
3693     return(_nxd_mqtt_client_will_message_set(client_ptr, will_topic, will_topic_length, will_message,
3694                                              will_message_length, will_retain_flag, will_QoS));
3695 }
3696 
3697 /**************************************************************************/
3698 /*                                                                        */
3699 /*  FUNCTION                                               RELEASE        */
3700 /*                                                                        */
3701 /*    _nxde_mqtt_client_login_set                         PORTABLE C      */
3702 /*                                                           6.1          */
3703 /*  AUTHOR                                                                */
3704 /*                                                                        */
3705 /*    Yuxin Zhou, Microsoft Corporation                                   */
3706 /*                                                                        */
3707 /*  DESCRIPTION                                                           */
3708 /*                                                                        */
3709 /*    This function checks for errors in the nxd_mqtt_client_login call.  */
3710 /*                                                                        */
3711 /*                                                                        */
3712 /*  INPUT                                                                 */
3713 /*                                                                        */
3714 /*    client_ptr                            Pointer to MQTT Client        */
3715 /*    username                              User name, or NULL if user    */
3716 /*                                            name is not required        */
3717 /*    username_length                       Length of the user name, or   */
3718 /*                                            0 if user name is NULL      */
3719 /*    password                              Password string, or NULL if   */
3720 /*                                            password is not required    */
3721 /*    password_length                       Length of the password, or    */
3722 /*                                            0 if password is NULL       */
3723 /*                                                                        */
3724 /*  OUTPUT                                                                */
3725 /*                                                                        */
3726 /*    status                                Completion status             */
3727 /*                                                                        */
3728 /*  CALLS                                                                 */
3729 /*                                                                        */
3730 /*    _nxd_mqtt_client_login_set                                          */
3731 /*                                                                        */
3732 /*  CALLED BY                                                             */
3733 /*                                                                        */
3734 /*    Application Code                                                    */
3735 /*                                                                        */
3736 /*  RELEASE HISTORY                                                       */
3737 /*                                                                        */
3738 /*    DATE              NAME                      DESCRIPTION             */
3739 /*                                                                        */
3740 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3741 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3742 /*                                            resulting in version 6.1    */
3743 /*                                                                        */
3744 /**************************************************************************/
_nxde_mqtt_client_login_set(NXD_MQTT_CLIENT * client_ptr,CHAR * username,UINT username_length,CHAR * password,UINT password_length)3745 UINT _nxde_mqtt_client_login_set(NXD_MQTT_CLIENT *client_ptr,
3746                                  CHAR *username, UINT username_length, CHAR *password, UINT password_length)
3747 {
3748     if (client_ptr == NX_NULL)
3749     {
3750         return(NXD_MQTT_INVALID_PARAMETER);
3751     }
3752 
3753     /* Username and username length don't match,
3754        or password and password length don't match. */
3755     if (((username == NX_NULL) && (username_length != 0)) ||
3756         ((password == NX_NULL) && (password_length != 0)))
3757     {
3758         return(NXD_MQTT_INVALID_PARAMETER);
3759     }
3760 
3761     return(_nxd_mqtt_client_login_set(client_ptr, username, username_length, password, password_length));
3762 }
3763 
3764 
3765 /**************************************************************************/
3766 /*                                                                        */
3767 /*  FUNCTION                                               RELEASE        */
3768 /*                                                                        */
3769 /*    _nxd_mqtt_client_retransmit_message                 PORTABLE C      */
3770 /*                                                           6.2.0        */
3771 /*  AUTHOR                                                                */
3772 /*                                                                        */
3773 /*    Yuxin Zhou, Microsoft Corporation                                   */
3774 /*                                                                        */
3775 /*  DESCRIPTION                                                           */
3776 /*                                                                        */
3777 /*    This function retransmit QoS1 messages upon reconnection, if the    */
3778 /*    connection is not set CLEAN_SESSION.                                */
3779 /*                                                                        */
3780 /*                                                                        */
3781 /*  INPUT                                                                 */
3782 /*                                                                        */
3783 /*    client_ptr                            Pointer to MQTT Client        */
3784 /*    wait_option                           Timeout value                 */
3785 /*                                                                        */
3786 /*  OUTPUT                                                                */
3787 /*                                                                        */
3788 /*    status                                Completion status             */
3789 /*                                                                        */
3790 /*  CALLS                                                                 */
3791 /*                                                                        */
3792 /*    tx_mutex_get                                                        */
3793 /*    tx_mutex_put                                                        */
3794 /*    _nxd_mqtt_packet_send                                               */
3795 /*    nx_packet_release                                                   */
3796 /*    tx_time_get                                                         */
3797 /*    nx_packet_copy                                                      */
3798 /*                                                                        */
3799 /*  CALLED BY                                                             */
3800 /*                                                                        */
3801 /*    Application Code                                                    */
3802 /*                                                                        */
3803 /*  RELEASE HISTORY                                                       */
3804 /*                                                                        */
3805 /*    DATE              NAME                      DESCRIPTION             */
3806 /*                                                                        */
3807 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3808 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
3809 /*                                            resulting in version 6.1    */
3810 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
3811 /*                                            the logic of sending packet,*/
3812 /*                                            resulting in version 6.2.0  */
3813 /*                                                                        */
3814 /**************************************************************************/
_nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)3815 static UINT _nxd_mqtt_client_retransmit_message(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
3816 {
3817 NX_PACKET          *transmit_packet_ptr;
3818 NX_PACKET          *packet_ptr;
3819 UINT                status = NXD_MQTT_SUCCESS;
3820 UINT                mutex_status;
3821 UCHAR               fixed_header;
3822 
3823     transmit_packet_ptr = client_ptr -> message_transmit_queue_head;
3824 
3825     while (transmit_packet_ptr)
3826     {
3827         fixed_header = *(transmit_packet_ptr -> nx_packet_prepend_ptr);
3828 
3829         if ((fixed_header & 0xF0) == (MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4))
3830         {
3831 
3832             /* Retransmit publish packet only. */
3833             /* Obtain a NetX Packet. */
3834             status = nx_packet_copy(transmit_packet_ptr, &packet_ptr, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
3835 
3836             if (status != NXD_MQTT_SUCCESS)
3837             {
3838                 return(NXD_MQTT_PACKET_POOL_FAILURE);
3839             }
3840 
3841             /* Release the mutex. */
3842             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3843 
3844             /* Send packet to server.  */
3845             status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
3846 
3847             if (status)
3848             {
3849                 /* Release the packet. */
3850                 nx_packet_release(packet_ptr);
3851 
3852                 status = NXD_MQTT_COMMUNICATION_FAILURE;
3853             }
3854             /* Obtain the mutex. */
3855             mutex_status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, wait_option);
3856 
3857             if (mutex_status != TX_SUCCESS)
3858             {
3859                 return(NXD_MQTT_MUTEX_FAILURE);
3860             }
3861             if (status)
3862             {
3863                 return(status);
3864             }
3865         }
3866         transmit_packet_ptr = transmit_packet_ptr -> nx_packet_queue_next;
3867     }
3868 
3869     /* Update the timeout value. */
3870     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
3871 
3872     return(status);
3873 }
3874 
3875 /**************************************************************************/
3876 /*                                                                        */
3877 /*  FUNCTION                                               RELEASE        */
3878 /*                                                                        */
3879 /*    _nxd_mqtt_client_connect                            PORTABLE C      */
3880 /*                                                           6.3.0        */
3881 /*  AUTHOR                                                                */
3882 /*                                                                        */
3883 /*    Yuxin Zhou, Microsoft Corporation                                   */
3884 /*                                                                        */
3885 /*  DESCRIPTION                                                           */
3886 /*                                                                        */
3887 /*    This function makes an initial connection to the MQTT server.       */
3888 /*                                                                        */
3889 /*                                                                        */
3890 /*  INPUT                                                                 */
3891 /*                                                                        */
3892 /*    client_ptr                            Pointer to MQTT Client        */
3893 /*    server_ip                             Server IP address structure   */
3894 /*    server_port                           Server port number, in host   */
3895 /*                                            byte order                  */
3896 /*    keepalive                             Keepalive flag                */
3897 /*    clean_session                         Clean session flag            */
3898 /*    wait_option                           Timeout value                 */
3899 /*                                                                        */
3900 /*                                                                        */
3901 /*  OUTPUT                                                                */
3902 /*                                                                        */
3903 /*    status                                Completion status             */
3904 /*                                                                        */
3905 /*  CALLS                                                                 */
3906 /*                                                                        */
3907 /*    tx_mutex_get                                                        */
3908 /*    nx_tcp_socket_create                  Create TCP socket             */
3909 /*    nx_tcp_socket_receive_notify                                        */
3910 /*    nx_tcp_client_socket_bind                                           */
3911 /*    nxd_tcp_client_socket_connect                                       */
3912 /*    nx_tcp_client_socket_unbind                                         */
3913 /*    tx_mutex_put                                                        */
3914 /*    nx_secure_tls_session_start                                         */
3915 /*    nx_secure_tls_session_send                                          */
3916 /*    nx_secure_tls_session_receive                                       */
3917 /*    _nxd_mqtt_client_set_fixed_header                                   */
3918 /*    _nxd_mqtt_client_append_message                                     */
3919 /*    nx_tcp_socket_send                                                  */
3920 /*    nx_packet_release                                                   */
3921 /*    nx_tcp_socket_receive                                               */
3922 /*    tx_event_flag_set                                                   */
3923 /*    _nxd_mqtt_release_transmit_packet                                   */
3924 /*    tx_timer_create                                                     */
3925 /*    nx_secure_tls_session_receive                                       */
3926 /*    _nxd_mqtt_client_connection_end                                     */
3927 /*                                                                        */
3928 /*  CALLED BY                                                             */
3929 /*                                                                        */
3930 /*    Application Code                                                    */
3931 /*                                                                        */
3932 /*  RELEASE HISTORY                                                       */
3933 /*                                                                        */
3934 /*    DATE              NAME                      DESCRIPTION             */
3935 /*                                                                        */
3936 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
3937 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
3938 /*                                            fixed return value when it  */
3939 /*                                            is set in CONNACK, corrected*/
3940 /*                                            mqtt client state,          */
3941 /*                                            resulting in version 6.1    */
3942 /*  08-02-2021     Yuxin Zhou               Modified comment(s), and      */
3943 /*                                            corrected the logic for     */
3944 /*                                            non-blocking mode,          */
3945 /*                                            resulting in version 6.1.8  */
3946 /*  07-29-2022     Spencer McDonough        Modified comment(s), and      */
3947 /*                                            improved internal logic,    */
3948 /*                                            resulting in version 6.1.12 */
3949 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
3950 /*                                            mqtt over websocket,        */
3951 /*                                            resulting in version 6.2.0  */
3952 /*  10-31-2023     Haiqing Zhao             Modified comment(s),          */
3953 /*                                            resulting in version 6.3.0  */
3954 /*                                                                        */
3955 /**************************************************************************/
_nxd_mqtt_client_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT keepalive,UINT clean_session,ULONG wait_option)3956 UINT _nxd_mqtt_client_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
3957                               UINT keepalive, UINT clean_session, ULONG wait_option)
3958 {
3959 NX_PACKET           *packet_ptr;
3960 UINT                 status;
3961 TX_THREAD           *thread_ptr;
3962 UINT                 new_priority;
3963 UINT                 old_priority;
3964 
3965 
3966     /* Obtain the mutex. */
3967     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
3968 
3969     if (status != TX_SUCCESS)
3970     {
3971 #ifdef NX_SECURE_ENABLE
3972         if (client_ptr -> nxd_mqtt_client_use_tls)
3973         {
3974             nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
3975         }
3976 #endif /* NX_SECURE_ENABLE */
3977 
3978         return(NXD_MQTT_MUTEX_FAILURE);
3979     }
3980 
3981     /* Do nothing if the client is already connected. */
3982     if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTED)
3983     {
3984         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3985         return(NXD_MQTT_ALREADY_CONNECTED);
3986     }
3987 
3988     /* Check if client is connecting. */
3989     if (client_ptr -> nxd_mqtt_client_state == NXD_MQTT_CLIENT_STATE_CONNECTING)
3990     {
3991         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
3992         return(NXD_MQTT_CONNECTING);
3993     }
3994 
3995     /* Client state must be in IDLE.  */
3996     if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_IDLE)
3997     {
3998 #ifdef NX_SECURE_ENABLE
3999         if (client_ptr -> nxd_mqtt_client_use_tls)
4000         {
4001             nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
4002         }
4003 #endif /* NX_SECURE_ENABLE */
4004         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4005         return(NXD_MQTT_INVALID_STATE);
4006     }
4007 
4008 #if defined(NX_SECURE_ENABLE) && defined(NXD_MQTT_REQUIRE_TLS)
4009     if (!client_ptr -> nxd_mqtt_client_use_tls)
4010     {
4011 
4012         /* NXD_MQTT_REQUIRE_TLS is defined but the application does not use TLS.
4013            This is security violation.  Return with failure code. */
4014         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4015         return(NXD_MQTT_CONNECT_FAILURE);
4016     }
4017 #endif /* NX_SECURE_ENABLE && NXD_MQTT_REQUIRE_TLS*/
4018 
4019     /* Record the keepalive value, converted to TX timer ticks. */
4020     client_ptr -> nxd_mqtt_keepalive = keepalive * NX_IP_PERIODIC_RATE;
4021     if (keepalive)
4022     {
4023         client_ptr -> nxd_mqtt_timer_value = NXD_MQTT_KEEPALIVE_TIMER_RATE;
4024         client_ptr -> nxd_mqtt_ping_timeout = NXD_MQTT_PING_TIMEOUT_DELAY;
4025 
4026         /* Create timer */
4027         tx_timer_create(&(client_ptr -> nxd_mqtt_timer), "MQTT Timer", _nxd_mqtt_periodic_timer_entry, (ULONG)client_ptr,
4028                         client_ptr -> nxd_mqtt_timer_value, client_ptr -> nxd_mqtt_timer_value, TX_AUTO_ACTIVATE);
4029     }
4030     else
4031     {
4032         client_ptr -> nxd_mqtt_timer_value = 0;
4033         client_ptr -> nxd_mqtt_ping_timeout = 0;
4034     }
4035 
4036     /* Record the clean session flag.  */
4037     client_ptr -> nxd_mqtt_clean_session = clean_session;
4038 
4039     /* Set TCP connection establish notify for non-blocking mode.  */
4040     if (wait_option == 0)
4041     {
4042         nx_tcp_socket_establish_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_tcp_establish_notify);
4043 
4044         /* Set the receive callback. */
4045         nx_tcp_socket_receive_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_receive_callback);
4046 
4047 #ifdef NXD_MQTT_OVER_WEBSOCKET
4048         if (client_ptr -> nxd_mqtt_client_use_websocket)
4049         {
4050 
4051             /* Set the websocket connection callback.  */
4052             nx_websocket_client_connection_status_callback_set(&client_ptr -> nxd_mqtt_client_websocket, client_ptr, _nxd_mqtt_client_websocket_connection_status_callback);
4053         }
4054 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4055     }
4056     else
4057     {
4058 
4059         /* Clean receive callback.  */
4060         client_ptr -> nxd_mqtt_client_socket.nx_tcp_receive_callback = NX_NULL;
4061 
4062 #ifdef NXD_MQTT_OVER_WEBSOCKET
4063         if (client_ptr -> nxd_mqtt_client_use_websocket)
4064         {
4065 
4066             /* Clean the websocket connection callback.  */
4067             nx_websocket_client_connection_status_callback_set(&client_ptr -> nxd_mqtt_client_websocket, NX_NULL, NX_NULL);
4068         }
4069 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4070     }
4071 
4072     /* Release mutex */
4073     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4074 
4075     /* First attempt to bind the client socket. */
4076     nx_tcp_client_socket_bind(&(client_ptr -> nxd_mqtt_client_socket), NX_ANY_PORT, wait_option);
4077 
4078     /* Obtain the mutex. */
4079     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4080 
4081     /* Make state as NXD_MQTT_CLIENT_STATE_CONNECTING. */
4082     client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_CONNECTING;
4083 
4084     /* Release mutex. */
4085     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4086 
4087     /* Connect to the MQTT server */
4088     status = nxd_tcp_client_socket_connect(&(client_ptr -> nxd_mqtt_client_socket), server_ip, server_port, wait_option);
4089     if ((status != NX_SUCCESS) && (status != NX_IN_PROGRESS))
4090     {
4091 
4092         /* Obtain the mutex. */
4093         tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4094 
4095         /* Make state as NXD_MQTT_CLIENT_STATE_IDLE. */
4096         client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
4097 
4098         /* Release mutex. */
4099         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4100 
4101 #ifdef NX_SECURE_ENABLE
4102         if (client_ptr -> nxd_mqtt_client_use_tls)
4103         {
4104             nx_secure_tls_session_delete(&(client_ptr -> nxd_mqtt_tls_session));
4105         }
4106 #endif /* NX_SECURE_ENABLE */
4107         nx_tcp_client_socket_unbind(&(client_ptr -> nxd_mqtt_client_socket));
4108         tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
4109         return(NXD_MQTT_CONNECT_FAILURE);
4110     }
4111 
4112     /* Just return for non-blocking mode.  */
4113     if (wait_option == 0)
4114     {
4115         return(NX_IN_PROGRESS);
4116     }
4117 
4118     /* Increase priority to the same of internal thread to avoid out of order packet process. */
4119 #ifndef NXD_MQTT_CLOUD_ENABLE
4120     thread_ptr = &(client_ptr -> nxd_mqtt_thread);
4121 #else
4122     thread_ptr = &(client_ptr -> nxd_mqtt_client_cloud_ptr -> nx_cloud_thread);
4123 #endif /* NXD_MQTT_CLOUD_ENABLE */
4124     tx_thread_info_get(thread_ptr, NX_NULL, NX_NULL, NX_NULL,
4125                        &new_priority, NX_NULL, NX_NULL, NX_NULL, NX_NULL);
4126     tx_thread_priority_change(tx_thread_identify(), new_priority, &old_priority);
4127 
4128     /* If TLS is enabled, start TLS */
4129 #ifdef NX_SECURE_ENABLE
4130     if (client_ptr -> nxd_mqtt_client_use_tls)
4131     {
4132 
4133         status = nx_secure_tls_session_start(&(client_ptr -> nxd_mqtt_tls_session), &(client_ptr -> nxd_mqtt_client_socket), wait_option);
4134 
4135         if (status != NX_SUCCESS)
4136         {
4137 
4138             /* Revert thread priority. */
4139             tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4140 
4141             /* End connection. */
4142             _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4143 
4144             return(NXD_MQTT_CONNECT_FAILURE);
4145         }
4146     }
4147 #endif /* NX_SECURE_ENABLE */
4148 
4149 #ifdef NXD_MQTT_OVER_WEBSOCKET
4150     if (client_ptr -> nxd_mqtt_client_use_websocket)
4151     {
4152 #ifdef NX_SECURE_ENABLE
4153         if (client_ptr -> nxd_mqtt_client_use_tls)
4154         {
4155             status = nx_websocket_client_secure_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_tls_session),
4156                                                         client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
4157                                                         client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
4158                                                         (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
4159                                                         wait_option);
4160         }
4161         else
4162 #endif /* NX_SECURE_ENABLE */
4163         {
4164             status = nx_websocket_client_connect(&(client_ptr -> nxd_mqtt_client_websocket), &(client_ptr -> nxd_mqtt_client_socket),
4165                                                  client_ptr -> nxd_mqtt_client_websocket_host, client_ptr -> nxd_mqtt_client_websocket_host_length,
4166                                                  client_ptr -> nxd_mqtt_client_websocket_uri_path, client_ptr -> nxd_mqtt_client_websocket_uri_path_length,
4167                                                  (UCHAR *)NXD_MQTT_OVER_WEBSOCKET_PROTOCOL, sizeof(NXD_MQTT_OVER_WEBSOCKET_PROTOCOL) - 1,
4168                                                  wait_option);
4169         }
4170 
4171         if (status != NX_SUCCESS)
4172         {
4173 
4174             /* Revert thread priority. */
4175             tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4176 
4177             /* End connection. */
4178             _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4179 
4180             return(NXD_MQTT_CONNECT_FAILURE);
4181         }
4182     }
4183 
4184 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4185 
4186     /* Start to send connect packet.  */
4187     status = _nxd_mqtt_client_connect_packet_send(client_ptr, wait_option);
4188 
4189     if (status != NX_SUCCESS)
4190     {
4191 
4192         /* Revert thread priority. */
4193         tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4194 
4195         /* End connection. */
4196         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4197         return(NXD_MQTT_CONNECT_FAILURE);
4198     }
4199 
4200     /* Call a receive. */
4201     status = _nxd_mqtt_packet_receive(client_ptr, &packet_ptr, wait_option);
4202 
4203     /* Revert thread priority. */
4204     tx_thread_priority_change(tx_thread_identify(), old_priority, &old_priority);
4205 
4206     /* Check status.  */
4207     if (status)
4208     {
4209 
4210         /* End connection. */
4211         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
4212         return(NXD_MQTT_COMMUNICATION_FAILURE);
4213     }
4214 
4215     /* Process CONNACK message.  */
4216     status = _nxd_mqtt_process_connack(client_ptr, packet_ptr, wait_option);
4217 
4218     /* Release the packet.  */
4219     nx_packet_release(packet_ptr);
4220 
4221     /* Check status.  */
4222     if (status == NX_SUCCESS)
4223     {
4224 
4225         /* Set the receive callback. */
4226         nx_tcp_socket_receive_notify(&client_ptr -> nxd_mqtt_client_socket, _nxd_mqtt_receive_callback);
4227     }
4228 
4229     return(status);
4230 }
4231 
4232 /**************************************************************************/
4233 /*                                                                        */
4234 /*  FUNCTION                                               RELEASE        */
4235 /*                                                                        */
4236 /*    _nxd_mqtt_client_connect_packet_send                PORTABLE C      */
4237 /*                                                           6.3.0        */
4238 /*  AUTHOR                                                                */
4239 /*                                                                        */
4240 /*    Yuxin Zhou, Microsoft Corporation                                   */
4241 /*                                                                        */
4242 /*  DESCRIPTION                                                           */
4243 /*                                                                        */
4244 /*    This function sends CONNECT packet to MQTT server.                  */
4245 /*                                                                        */
4246 /*                                                                        */
4247 /*  INPUT                                                                 */
4248 /*                                                                        */
4249 /*    client_ptr                            Pointer to MQTT Client        */
4250 /*    keepalive                             Keepalive flag                */
4251 /*    clean_session                         Clean session flag            */
4252 /*    wait_option                           Timeout value                 */
4253 /*                                                                        */
4254 /*                                                                        */
4255 /*  OUTPUT                                                                */
4256 /*                                                                        */
4257 /*    status                                Completion status             */
4258 /*                                                                        */
4259 /*  CALLS                                                                 */
4260 /*                                                                        */
4261 /*    nx_packet_release                                                   */
4262 /*    _nxd_mqtt_client_packet_allocate                                    */
4263 /*    _nxd_mqtt_release_transmit_packet                                   */
4264 /*    _nxd_mqtt_client_connection_end                                     */
4265 /*    _nxd_mqtt_client_set_fixed_header                                   */
4266 /*    _nxd_mqtt_client_append_message                                     */
4267 /*    _nxd_mqtt_packet_send                                               */
4268 /*                                                                        */
4269 /*  CALLED BY                                                             */
4270 /*                                                                        */
4271 /*    _nxd_mqtt_client_connect                                            */
4272 /*    _nxd_mqtt_tcp_establish_process                                     */
4273 /*    _nxd_mqtt_tls_establish_process                                     */
4274 /*                                                                        */
4275 /*  RELEASE HISTORY                                                       */
4276 /*                                                                        */
4277 /*    DATE              NAME                      DESCRIPTION             */
4278 /*                                                                        */
4279 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4280 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4281 /*                                            resulting in version 6.1    */
4282 /*  07-29-2022     Spencer McDonough        Modified comment(s),          */
4283 /*                                            improved internal logic,    */
4284 /*                                            resulting in version 6.1.12 */
4285 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
4286 /*                                            the logic of sending packet,*/
4287 /*                                            resulting in version 6.2.0  */
4288 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
4289 /*                                            internal logic,             */
4290 /*                                            resulting in version 6.3.0  */
4291 /*                                                                        */
4292 /**************************************************************************/
_nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT * client_ptr,ULONG wait_option)4293 UINT _nxd_mqtt_client_connect_packet_send(NXD_MQTT_CLIENT *client_ptr, ULONG wait_option)
4294 {
4295 NX_PACKET           *packet_ptr;
4296 UINT                 status;
4297 UINT                 length = 0;
4298 UCHAR                connection_flags = 0;
4299 UINT                 ret = NXD_MQTT_SUCCESS;
4300 UCHAR                temp_data[4];
4301 UINT                 keepalive = (client_ptr -> nxd_mqtt_keepalive/NX_IP_PERIODIC_RATE);
4302 
4303 
4304     /* Construct connect flags by taking the connect flag user supplies, or'ing the username and
4305        password bits, if they are supplied. */
4306     if (client_ptr -> nxd_mqtt_client_username)
4307     {
4308         connection_flags |= MQTT_CONNECT_FLAGS_USERNAME;
4309 
4310         /* Add the password flag only if username is supplied. */
4311         if (client_ptr -> nxd_mqtt_client_password)
4312         {
4313             connection_flags |= MQTT_CONNECT_FLAGS_PASSWORD;
4314         }
4315     }
4316 
4317     if (client_ptr -> nxd_mqtt_client_will_topic)
4318     {
4319         connection_flags = connection_flags | MQTT_CONNECT_FLAGS_WILL_FLAG;
4320 
4321 
4322         if (client_ptr -> nxd_mqtt_client_will_qos_retain & 0x80)
4323         {
4324             connection_flags = connection_flags | MQTT_CONNECT_FLAGS_WILL_RETAIN;
4325         }
4326 
4327         connection_flags = (UCHAR)(connection_flags | ((client_ptr -> nxd_mqtt_client_will_qos_retain & 0x3) << 3));
4328     }
4329 
4330     if (client_ptr -> nxd_mqtt_clean_session == NX_TRUE)
4331     {
4332         connection_flags = connection_flags | MQTT_CONNECT_FLAGS_CLEAN_SESSION;
4333 
4334         /* Clear any transmit blocks from the previous session. */
4335         while (client_ptr -> message_transmit_queue_head)
4336         {
4337             _nxd_mqtt_release_transmit_packet(client_ptr, client_ptr -> message_transmit_queue_head, NX_NULL);
4338         }
4339     }
4340 
4341     /* Set the length of the packet. */
4342     length = 10;
4343 
4344     /* Add the size of the client Identifier. */
4345     length += (client_ptr -> nxd_mqtt_client_id_length + 2);
4346 
4347     /* Add the will topic, if present. */
4348     if (connection_flags & MQTT_CONNECT_FLAGS_WILL_FLAG)
4349     {
4350         length += (client_ptr -> nxd_mqtt_client_will_topic_length + 2);
4351         length += (client_ptr -> nxd_mqtt_client_will_message_length + 2);
4352     }
4353     if (connection_flags & MQTT_CONNECT_FLAGS_USERNAME)
4354     {
4355         length += (UINT)(client_ptr -> nxd_mqtt_client_username_length + 2);
4356     }
4357     if (connection_flags & MQTT_CONNECT_FLAGS_PASSWORD)
4358     {
4359         length += (UINT)(client_ptr -> nxd_mqtt_client_password_length + 2);
4360     }
4361 
4362     /* Check for invalid length. */
4363     if (length > (127 * 127 * 127 * 127))
4364     {
4365         return(NXD_MQTT_INTERNAL_ERROR);
4366     }
4367 
4368     status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, wait_option);
4369 
4370     if (status)
4371     {
4372         return(status);
4373     }
4374 
4375     /* Construct MQTT CONNECT message. */
4376     temp_data[0] = ((MQTT_CONTROL_PACKET_TYPE_CONNECT << 4) & 0xF0);
4377 
4378     /* Fill in fixed header. */
4379     ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, temp_data[0], length, wait_option);
4380 
4381     if (ret)
4382     {
4383         /* Release the packet. */
4384         nx_packet_release(packet_ptr);
4385         return(NXD_MQTT_PACKET_POOL_FAILURE);
4386     }
4387 
4388     /* Fill in protocol name. */
4389     ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, "MQTT", 4, wait_option);
4390 
4391     if (ret)
4392     {
4393         /* Release the packet. */
4394         nx_packet_release(packet_ptr);
4395         return(NXD_MQTT_PACKET_POOL_FAILURE);
4396     }
4397 
4398     /* Fill in protocol level, */
4399     temp_data[0] = MQTT_PROTOCOL_LEVEL;
4400 
4401     /* Fill in byte 8: connect flags */
4402     temp_data[1] = connection_flags;
4403 
4404     /* Fill in byte 9 and 10: keep alive */
4405     temp_data[2] = (keepalive >> 8) & 0xFF;
4406     temp_data[3] = (keepalive & 0xFF);
4407 
4408     ret = nx_packet_data_append(packet_ptr, temp_data, 4, client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4409 
4410     if (ret)
4411     {
4412 
4413         /* Release the packet. */
4414         nx_packet_release(packet_ptr);
4415 
4416         return(NXD_MQTT_PACKET_POOL_FAILURE);
4417     }
4418 
4419     /* Fill in payload area, in the order of: client identifier, will topic, will message,
4420        user name, and password. */
4421     ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_id,
4422                                           client_ptr -> nxd_mqtt_client_id_length, wait_option);
4423 
4424     /* Next fill will topic and will message if the will flag is set. */
4425     if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_WILL_FLAG))
4426     {
4427         ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, (CHAR *)client_ptr -> nxd_mqtt_client_will_topic,
4428                                               client_ptr -> nxd_mqtt_client_will_topic_length, wait_option);
4429 
4430         if (!ret)
4431         {
4432             ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, (CHAR *)client_ptr -> nxd_mqtt_client_will_message,
4433                                                   client_ptr -> nxd_mqtt_client_will_message_length, wait_option);
4434         }
4435     }
4436 
4437     /* Fill username if username flag is set */
4438     if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_USERNAME))
4439     {
4440         ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_username,
4441                                               client_ptr -> nxd_mqtt_client_username_length, wait_option);
4442     }
4443 
4444     /* Fill password if password flag is set */
4445     if (!ret && (connection_flags & MQTT_CONNECT_FLAGS_PASSWORD))
4446     {
4447         ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, client_ptr -> nxd_mqtt_client_password,
4448                                               client_ptr -> nxd_mqtt_client_password_length, wait_option);
4449     }
4450 
4451     if (ret)
4452     {
4453 
4454         /* Release the packet. */
4455         nx_packet_release(packet_ptr);
4456 
4457         return(NXD_MQTT_PACKET_POOL_FAILURE);
4458     }
4459 
4460     /* Ready to send the connect message to the server. */
4461     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
4462 
4463     if (status)
4464     {
4465 
4466         /* Release the packet. */
4467         nx_packet_release(packet_ptr);
4468     }
4469 
4470     /* Update the timeout value. */
4471     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
4472 
4473     return(status);
4474 }
4475 
4476 /**************************************************************************/
4477 /*                                                                        */
4478 /*  FUNCTION                                               RELEASE        */
4479 /*                                                                        */
4480 /*    _nxd_mqtt_client_secure_connect                     PORTABLE C      */
4481 /*                                                           6.1          */
4482 /*  AUTHOR                                                                */
4483 /*                                                                        */
4484 /*    Yuxin Zhou, Microsoft Corporation                                   */
4485 /*                                                                        */
4486 /*  DESCRIPTION                                                           */
4487 /*                                                                        */
4488 /*    This function makes an initial secure (TLS) connection to           */
4489 /*    the MQTT server.                                                    */
4490 /*                                                                        */
4491 /*                                                                        */
4492 /*  INPUT                                                                 */
4493 /*                                                                        */
4494 /*    client_ptr                            Pointer to MQTT Client        */
4495 /*    server_ip                             Server IP address structure   */
4496 /*    server_port                           Server port number, in host   */
4497 /*                                            byte order                  */
4498 /*    tls_setup                             User-supplied callback        */
4499 /*                                            function to set up TLS      */
4500 /*                                            parameters.                 */
4501 /*    username                              User name, or NULL if user    */
4502 /*                                            name is not required        */
4503 /*    username_length                       Length of the user name, or   */
4504 /*                                            0 if user name is NULL      */
4505 /*    password                              Password string, or NULL if   */
4506 /*                                            password is not required    */
4507 /*    password_length                       Length of the password, or    */
4508 /*                                            0 if password is NULL       */
4509 /*    connection_flag                       Flag passed to the server     */
4510 /*    timeout                               Timeout value                 */
4511 /*                                                                        */
4512 /*                                                                        */
4513 /*  OUTPUT                                                                */
4514 /*                                                                        */
4515 /*    status                                Completion status             */
4516 /*                                                                        */
4517 /*  CALLS                                                                 */
4518 /*                                                                        */
4519 /*    _nxd_mqtt_client_connect              Actual MQTT Client connect    */
4520 /*                                            call                        */
4521 /*                                                                        */
4522 /*  CALLED BY                                                             */
4523 /*                                                                        */
4524 /*    Application Code                                                    */
4525 /*                                                                        */
4526 /*  RELEASE HISTORY                                                       */
4527 /*                                                                        */
4528 /*    DATE              NAME                      DESCRIPTION             */
4529 /*                                                                        */
4530 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4531 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4532 /*                                            resulting in version 6.1    */
4533 /*                                                                        */
4534 /**************************************************************************/
4535 #ifdef NX_SECURE_ENABLE
_nxd_mqtt_client_secure_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT (* tls_setup)(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION *,NX_SECURE_X509_CERT *,NX_SECURE_X509_CERT *),UINT keepalive,UINT clean_session,ULONG wait_option)4536 UINT _nxd_mqtt_client_secure_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
4537                                      UINT (*tls_setup)(NXD_MQTT_CLIENT *client_ptr, NX_SECURE_TLS_SESSION *,
4538                                                        NX_SECURE_X509_CERT *, NX_SECURE_X509_CERT *),
4539                                      UINT keepalive, UINT clean_session, ULONG wait_option)
4540 {
4541 UINT ret;
4542 
4543     /* Set up TLS session information. */
4544     ret = (*tls_setup)(client_ptr, &client_ptr -> nxd_mqtt_tls_session,
4545                        &client_ptr -> nxd_mqtt_tls_certificate,
4546                        &client_ptr -> nxd_mqtt_tls_trusted_certificate);
4547 
4548     if (ret)
4549     {
4550         return(ret);
4551     }
4552 
4553     /* Mark the connection as secure. */
4554     client_ptr -> nxd_mqtt_client_use_tls = 1;
4555 
4556     ret = _nxd_mqtt_client_connect(client_ptr, server_ip, server_port, keepalive, clean_session, wait_option);
4557 
4558     return(ret);
4559 }
4560 
4561 #endif /* NX_SECURE_ENABLE */
4562 
4563 
4564 /**************************************************************************/
4565 /*                                                                        */
4566 /*  FUNCTION                                               RELEASE        */
4567 /*                                                                        */
4568 /*    _nxd_mqtt_client_delete                             PORTABLE C      */
4569 /*                                                           6.2.0        */
4570 /*  AUTHOR                                                                */
4571 /*                                                                        */
4572 /*    Yuxin Zhou, Microsoft Corporation                                   */
4573 /*                                                                        */
4574 /*  DESCRIPTION                                                           */
4575 /*                                                                        */
4576 /*    This function deletes a previously created MQTT client instance.    */
4577 /*    If the NXD_MQTT_SOCKET_TIMEOUT is set to NX_WAIT_FOREVER, this may  */
4578 /*    suspend infinitely. This is because the MQTT Client must            */
4579 /*    disconnect with the server, and if the network link is disabled or  */
4580 /*    the server is not responding, this will blocks this function from   */
4581 /*    completing.                                                         */
4582 /*                                                                        */
4583 /*  INPUT                                                                 */
4584 /*                                                                        */
4585 /*    client_ptr                            Pointer to MQTT Client        */
4586 /*                                                                        */
4587 /*  OUTPUT                                                                */
4588 /*                                                                        */
4589 /*    status                                Completion status             */
4590 /*                                                                        */
4591 /*  CALLS                                                                 */
4592 /*                                                                        */
4593 /*    tx_event_flags_set                                                  */
4594 /*                                                                        */
4595 /*  CALLED BY                                                             */
4596 /*                                                                        */
4597 /*    Application Code                                                    */
4598 /*                                                                        */
4599 /*  RELEASE HISTORY                                                       */
4600 /*                                                                        */
4601 /*    DATE              NAME                      DESCRIPTION             */
4602 /*                                                                        */
4603 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4604 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4605 /*                                            resulting in version 6.1    */
4606 /*  10-31-2022     Bo Chen                  Modified comment(s), supported*/
4607 /*                                            mqtt over websocket,        */
4608 /*                                            resulting in version 6.2.0  */
4609 /*                                                                        */
4610 /**************************************************************************/
_nxd_mqtt_client_delete(NXD_MQTT_CLIENT * client_ptr)4611 UINT _nxd_mqtt_client_delete(NXD_MQTT_CLIENT *client_ptr)
4612 {
4613 
4614 
4615     /* Set the event flag for DELETE. Next time when the MQTT client thread
4616        wakes up, it will perform the deletion process. */
4617 #ifndef NXD_MQTT_CLOUD_ENABLE
4618     tx_event_flags_set(&client_ptr -> nxd_mqtt_events, MQTT_DELETE_EVENT, TX_OR);
4619 #else
4620     nx_cloud_module_event_set(&(client_ptr -> nxd_mqtt_client_cloud_module), MQTT_DELETE_EVENT);
4621 #endif /* NXD_MQTT_CLOUD_ENABLE */
4622 
4623     /* Check if the MQTT client thread has completed. */
4624     while((client_ptr -> nxd_mqtt_client_socket).nx_tcp_socket_id != 0)
4625     {
4626         tx_thread_sleep(NX_IP_PERIODIC_RATE);
4627     }
4628 
4629 #ifndef NXD_MQTT_CLOUD_ENABLE
4630     /* Now we can delete the Client instance. */
4631     tx_thread_delete(&(client_ptr -> nxd_mqtt_thread));
4632 #else
4633     /* Deregister mqtt module from cloud helper.  */
4634     nx_cloud_module_deregister(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module));
4635 
4636     /* Delete own created cloud.  */
4637     if (client_ptr -> nxd_mqtt_client_cloud.nx_cloud_id == NX_CLOUD_ID)
4638         nx_cloud_delete(&(client_ptr -> nxd_mqtt_client_cloud));
4639 #endif /* NXD_MQTT_CLOUD_ENABLE */
4640 
4641 #ifdef NXD_MQTT_OVER_WEBSOCKET
4642     if (client_ptr -> nxd_mqtt_client_use_websocket)
4643     {
4644         nx_websocket_client_delete(&client_ptr -> nxd_mqtt_client_websocket);
4645         client_ptr -> nxd_mqtt_client_use_websocket = NX_FALSE;
4646     }
4647 #endif /* NXD_MQTT_OVER_WEBSOCKET */
4648 
4649     return(NXD_MQTT_SUCCESS);
4650 }
4651 
4652 /**************************************************************************/
4653 /*                                                                        */
4654 /*  FUNCTION                                               RELEASE        */
4655 /*                                                                        */
4656 /*    _nxd_mqtt_client_publish_packet_send                PORTABLE C      */
4657 /*                                                           6.2.0        */
4658 /*  AUTHOR                                                                */
4659 /*                                                                        */
4660 /*    Yuxin Zhou, Microsoft Corporation                                   */
4661 /*                                                                        */
4662 /*  DESCRIPTION                                                           */
4663 /*                                                                        */
4664 /*    This function sends a publish packet to the connected broker.       */
4665 /*                                                                        */
4666 /*                                                                        */
4667 /*  INPUT                                                                 */
4668 /*                                                                        */
4669 /*    client_ptr                            Pointer to MQTT Client        */
4670 /*    packet_ptr                            Pointer to publish packet     */
4671 /*    packet_id                             Current packet ID             */
4672 /*    QoS                                   Quality of service            */
4673 /*    wait_option                           Suspension option             */
4674 /*                                                                        */
4675 /*  OUTPUT                                                                */
4676 /*                                                                        */
4677 /*    status                                Completion status             */
4678 /*                                                                        */
4679 /*  CALLS                                                                 */
4680 /*                                                                        */
4681 /*    tx_mutex_get                                                        */
4682 /*    tx_mutex_put                                                        */
4683 /*    nx_packet_release                                                   */
4684 /*    _nxd_mqtt_copy_transmit_packet                                      */
4685 /*    _nxd_mqtt_packet_send                                               */
4686 /*                                                                        */
4687 /*  CALLED BY                                                             */
4688 /*                                                                        */
4689 /*    _nxd_mqtt_client_publish                                            */
4690 /*                                                                        */
4691 /*  RELEASE HISTORY                                                       */
4692 /*                                                                        */
4693 /*    DATE              NAME                      DESCRIPTION             */
4694 /*                                                                        */
4695 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4696 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4697 /*                                            resulting in version 6.1    */
4698 /*  08-02-2021     Yuxin Zhou               Modified comment(s),          */
4699 /*                                            supported maximum transmit  */
4700 /*                                            queue depth,                */
4701 /*                                            resulting in version 6.1.8  */
4702 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
4703 /*                                            the logic of sending packet,*/
4704 /*                                            resulting in version 6.2.0  */
4705 /*                                                                        */
4706 /**************************************************************************/
_nxd_mqtt_client_publish_packet_send(NXD_MQTT_CLIENT * client_ptr,NX_PACKET * packet_ptr,USHORT packet_id,UINT QoS,ULONG wait_option)4707 UINT _nxd_mqtt_client_publish_packet_send(NXD_MQTT_CLIENT *client_ptr, NX_PACKET *packet_ptr,
4708                                           USHORT packet_id, UINT QoS, ULONG wait_option)
4709 {
4710 
4711 UINT       status;
4712 UINT       ret = NXD_MQTT_SUCCESS;
4713 
4714     if (QoS != 0)
4715     {
4716     /* This packet needs to be stored locally for possible retransmission. */
4717     NX_PACKET *transmit_packet_ptr;
4718 
4719         /* Copy packet for retransmission. */
4720         if (_nxd_mqtt_copy_transmit_packet(client_ptr, packet_ptr, &transmit_packet_ptr,
4721                                            packet_id, NX_TRUE, wait_option))
4722         {
4723             return(NXD_MQTT_PACKET_POOL_FAILURE);
4724         }
4725 
4726         /* Obtain the mutex. */
4727         status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4728 
4729         if (status != TX_SUCCESS)
4730         {
4731             nx_packet_release(transmit_packet_ptr);
4732 
4733 #ifdef NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH
4734             /* Decrease the transmit queue depth.  */
4735             client_ptr -> message_transmit_queue_depth--;
4736 #endif /* NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH */
4737             return(NXD_MQTT_MUTEX_FAILURE);
4738         }
4739 
4740         if (client_ptr -> message_transmit_queue_head == NX_NULL)
4741         {
4742             client_ptr -> message_transmit_queue_head = transmit_packet_ptr;
4743         }
4744         else
4745         {
4746             client_ptr -> message_transmit_queue_tail -> nx_packet_queue_next = transmit_packet_ptr;
4747         }
4748         client_ptr -> message_transmit_queue_tail = transmit_packet_ptr;
4749     }
4750     else
4751     {
4752 
4753         /* Obtain the mutex. */
4754         status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4755 
4756         if (status != TX_SUCCESS)
4757         {
4758             return(NXD_MQTT_MUTEX_FAILURE);
4759         }
4760     }
4761 
4762     /* Update the timeout value. */
4763     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
4764 
4765 
4766     /* Release the mutex. */
4767     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4768 
4769     /* Ready to send the connect message to the server. */
4770     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, wait_option);
4771 
4772     if (status)
4773     {
4774         ret = NXD_MQTT_COMMUNICATION_FAILURE;
4775     }
4776 
4777     return(ret);
4778 }
4779 
4780 /**************************************************************************/
4781 /*                                                                        */
4782 /*  FUNCTION                                               RELEASE        */
4783 /*                                                                        */
4784 /*    _nxd_mqtt_client_publish                            PORTABLE C      */
4785 /*                                                           6.3.0        */
4786 /*  AUTHOR                                                                */
4787 /*                                                                        */
4788 /*    Yuxin Zhou, Microsoft Corporation                                   */
4789 /*                                                                        */
4790 /*  DESCRIPTION                                                           */
4791 /*                                                                        */
4792 /*    This function publishes a message to the connected broker.          */
4793 /*                                                                        */
4794 /*                                                                        */
4795 /*  INPUT                                                                 */
4796 /*                                                                        */
4797 /*    client_ptr                            Pointer to MQTT Client        */
4798 /*    topic_name                            Name of the topic             */
4799 /*    topic_name_length                     Length of the topic name      */
4800 /*    message                               Message string                */
4801 /*    message_length                        Length of the message,        */
4802 /*                                            in bytes                    */
4803 /*    retain                                The retain flag, whether      */
4804 /*                                            or not the broker should    */
4805 /*                                            store this message          */
4806 /*    QoS                                   Expected QoS level            */
4807 /*    wait_option                           Suspension option             */
4808 /*                                                                        */
4809 /*  OUTPUT                                                                */
4810 /*                                                                        */
4811 /*    status                                Completion status             */
4812 /*                                                                        */
4813 /*  CALLS                                                                 */
4814 /*                                                                        */
4815 /*    tx_mutex_get                                                        */
4816 /*    _nxd_mqtt_client_packet_allocate                                    */
4817 /*    _nxd_mqtt_client_set_fixed_header                                   */
4818 /*    _nxd_mqtt_client_append_message                                     */
4819 /*    tx_mutex_put                                                        */
4820 /*    nx_packet_release                                                   */
4821 /*    _nxd_mqtt_client_publish_packet_send                                */
4822 /*                                                                        */
4823 /*  CALLED BY                                                             */
4824 /*                                                                        */
4825 /*    Application Code                                                    */
4826 /*                                                                        */
4827 /*  RELEASE HISTORY                                                       */
4828 /*                                                                        */
4829 /*    DATE              NAME                      DESCRIPTION             */
4830 /*                                                                        */
4831 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
4832 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
4833 /*                                            resulting in version 6.1    */
4834 /*  07-29-2022     Spencer McDonough        Modified comment(s),          */
4835 /*                                            improved internal logic,    */
4836 /*                                            resulting in version 6.1.12 */
4837 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
4838 /*                                            internal logic,             */
4839 /*                                            resulting in version 6.3.0  */
4840 /*                                                                        */
4841 /**************************************************************************/
_nxd_mqtt_client_publish(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,CHAR * message,UINT message_length,UINT retain,UINT QoS,ULONG wait_option)4842 UINT _nxd_mqtt_client_publish(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length,
4843                               CHAR *message, UINT message_length, UINT retain, UINT QoS, ULONG wait_option)
4844 {
4845 
4846 NX_PACKET *packet_ptr;
4847 UINT       status;
4848 UINT       length = 0;
4849 UCHAR      flags;
4850 USHORT     packet_id = 0;
4851 UINT       ret = NXD_MQTT_SUCCESS;
4852 
4853     if (QoS == 2)
4854     {
4855         return(NXD_MQTT_QOS2_NOT_SUPPORTED);
4856     }
4857 
4858 
4859     /* Do nothing if the client is already connected. */
4860     if (client_ptr -> nxd_mqtt_client_state != NXD_MQTT_CLIENT_STATE_CONNECTED)
4861     {
4862         return(NXD_MQTT_NOT_CONNECTED);
4863     }
4864 
4865     status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, wait_option);
4866 
4867     if (status != NXD_MQTT_SUCCESS)
4868     {
4869         return(NXD_MQTT_PACKET_POOL_FAILURE);
4870     }
4871 
4872     flags = (UCHAR)((MQTT_CONTROL_PACKET_TYPE_PUBLISH << 4) | (QoS << 1));
4873 
4874     if (retain)
4875     {
4876         flags = flags | MQTT_PUBLISH_RETAIN;
4877     }
4878 
4879 
4880     /* Compute the remaining length. */
4881 
4882     /* Topic Name. */
4883     /* Compute Topic Name length. */
4884     length = topic_name_length + 2;
4885 
4886     /* Count packet ID for QoS 1 or QoS 2 message. */
4887     if ((QoS == 1) || (QoS == 2))
4888     {
4889         length += 2;
4890     }
4891 
4892     /* Count message. */
4893     if ((message != NX_NULL) && (message_length != 0))
4894     {
4895         length += message_length;
4896     }
4897 
4898     /* Write out the control header and remaining length field. */
4899     ret = _nxd_mqtt_client_set_fixed_header(client_ptr, packet_ptr, flags, length, wait_option);
4900 
4901     if (ret)
4902     {
4903 
4904         /* Release the packet. */
4905         nx_packet_release(packet_ptr);
4906 
4907         return(NXD_MQTT_INTERNAL_ERROR);
4908     }
4909 
4910 
4911     /* Write out topic */
4912     ret = _nxd_mqtt_client_append_message(client_ptr, packet_ptr, topic_name, topic_name_length, wait_option);
4913 
4914     if (ret)
4915     {
4916 
4917         /* Release the packet. */
4918         nx_packet_release(packet_ptr);
4919 
4920         return(NXD_MQTT_INTERNAL_ERROR);
4921     }
4922 
4923     /* Append Packet Identifier for QoS level 1 or 2  MQTT 3.3.2.2 */
4924     if ((QoS == 1) || (QoS == 2))
4925     {
4926     UCHAR identifier[2];
4927 
4928         /* Obtain the mutex. */
4929         status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
4930 
4931         if (status != TX_SUCCESS)
4932         {
4933             return(NXD_MQTT_MUTEX_FAILURE);
4934         }
4935 
4936         packet_id = (USHORT)client_ptr -> nxd_mqtt_client_packet_identifier;
4937         identifier[0] = (UCHAR)(client_ptr -> nxd_mqtt_client_packet_identifier >> 8);
4938         identifier[1] = (client_ptr -> nxd_mqtt_client_packet_identifier & 0xFF);
4939 
4940         /* Update packet id. */
4941         client_ptr -> nxd_mqtt_client_packet_identifier = (client_ptr -> nxd_mqtt_client_packet_identifier + 1) & 0xFFFF;
4942 
4943         /* Prevent packet identifier from being zero. MQTT-2.3.1-1 */
4944         if(client_ptr -> nxd_mqtt_client_packet_identifier == 0)
4945             client_ptr -> nxd_mqtt_client_packet_identifier = 1;
4946 
4947         /* Release the mutex. */
4948         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
4949 
4950         ret = nx_packet_data_append(packet_ptr, identifier, 2,
4951                                     client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4952 
4953         if (ret)
4954         {
4955 
4956             /* Release the packet. */
4957             nx_packet_release(packet_ptr);
4958 
4959             return(NXD_MQTT_INTERNAL_ERROR);
4960         }
4961     }
4962 
4963     /* Append message. */
4964     if ((message != NX_NULL) && (message_length) != 0)
4965     {
4966 
4967         /* Use nx_packet_data_append to move user-supplied message data into the packet.
4968            nx_packet_data_append uses chained packet if the additional storage space is
4969            needed. */
4970         ret = nx_packet_data_append(packet_ptr, message, message_length,
4971                                        client_ptr -> nxd_mqtt_client_packet_pool_ptr, wait_option);
4972         if(ret)
4973         {
4974             /* Unable to obtain a new packet to store the message. */
4975 
4976             /* Release the packet. */
4977             nx_packet_release(packet_ptr);
4978 
4979             return(NXD_MQTT_INTERNAL_ERROR);
4980         }
4981     }
4982 
4983     /* Send publish packet. */
4984     ret = _nxd_mqtt_client_publish_packet_send(client_ptr, packet_ptr, packet_id, QoS, wait_option);
4985 
4986     if (ret)
4987     {
4988 
4989         /* Release the packet. */
4990         nx_packet_release(packet_ptr);
4991     }
4992     return(ret);
4993 }
4994 
4995 /**************************************************************************/
4996 /*                                                                        */
4997 /*  FUNCTION                                               RELEASE        */
4998 /*                                                                        */
4999 /*    _nxd_mqtt_client_subscribe                          PORTABLE C      */
5000 /*                                                           6.1.2        */
5001 /*  AUTHOR                                                                */
5002 /*                                                                        */
5003 /*    Yuxin Zhou, Microsoft Corporation                                   */
5004 /*                                                                        */
5005 /*  DESCRIPTION                                                           */
5006 /*                                                                        */
5007 /*    This function sends a subscribe message to the broker.              */
5008 /*                                                                        */
5009 /*                                                                        */
5010 /*  INPUT                                                                 */
5011 /*                                                                        */
5012 /*    client_ptr                            Pointer to MQTT Client        */
5013 /*    topic_name                            Pointer to the topic string   */
5014 /*                                            to subscribe to             */
5015 /*    topic_name_length                     Length of the topic string    */
5016 /*                                            in bytes                    */
5017 /*    QoS                                   Expected QoS level            */
5018 /*                                                                        */
5019 /*  OUTPUT                                                                */
5020 /*                                                                        */
5021 /*    status                                Completion status             */
5022 /*                                                                        */
5023 /*  CALLS                                                                 */
5024 /*                                                                        */
5025 /*    _nxd_mqtt_client_sub_unsub            The actual routine that       */
5026 /*                                            performs the sub/unsub      */
5027 /*                                            action.                     */
5028 /*                                                                        */
5029 /*  CALLED BY                                                             */
5030 /*                                                                        */
5031 /*    Application Code                                                    */
5032 /*                                                                        */
5033 /*  RELEASE HISTORY                                                       */
5034 /*                                                                        */
5035 /*    DATE              NAME                      DESCRIPTION             */
5036 /*                                                                        */
5037 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5038 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5039 /*                                            resulting in version 6.1    */
5040 /*  11-09-2020     Yuxin Zhou               Modified comment(s), and      */
5041 /*                                            added packet id parameter,  */
5042 /*                                            resulting in version 6.1.2  */
5043 /*                                                                        */
5044 /**************************************************************************/
_nxd_mqtt_client_subscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,UINT QoS)5045 UINT _nxd_mqtt_client_subscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length, UINT QoS)
5046 {
5047 
5048     if (QoS == 2)
5049     {
5050         return(NXD_MQTT_QOS2_NOT_SUPPORTED);
5051     }
5052 
5053     return(_nxd_mqtt_client_sub_unsub(client_ptr, (MQTT_CONTROL_PACKET_TYPE_SUBSCRIBE << 4) | 0x02,
5054                                       topic_name, topic_name_length, NX_NULL, QoS));
5055 }
5056 
5057 
5058 
5059 
5060 /**************************************************************************/
5061 /*                                                                        */
5062 /*  FUNCTION                                               RELEASE        */
5063 /*                                                                        */
5064 /*    _nxd_mqtt_client_unsubscribe                        PORTABLE C      */
5065 /*                                                           6.1.2        */
5066 /*  AUTHOR                                                                */
5067 /*                                                                        */
5068 /*    Yuxin Zhou, Microsoft Corporation                                   */
5069 /*                                                                        */
5070 /*  DESCRIPTION                                                           */
5071 /*                                                                        */
5072 /*    This function unsubscribes a topic from the broker.                 */
5073 /*                                                                        */
5074 /*                                                                        */
5075 /*  INPUT                                                                 */
5076 /*                                                                        */
5077 /*    client_ptr                            Pointer to MQTT Client        */
5078 /*    topic_name                            Pointer to the topic string   */
5079 /*                                            to subscribe to             */
5080 /*    topic_name_length                     Length of the topic string    */
5081 /*                                            in bytes                    */
5082 /*                                                                        */
5083 /*  OUTPUT                                                                */
5084 /*                                                                        */
5085 /*    status                                Completion status             */
5086 /*                                                                        */
5087 /*  CALLS                                                                 */
5088 /*                                                                        */
5089 /*    _nxd_mqtt_client_sub_unsub                                          */
5090 /*                                                                        */
5091 /*  CALLED BY                                                             */
5092 /*                                                                        */
5093 /*    Application Code                                                    */
5094 /*                                                                        */
5095 /*  RELEASE HISTORY                                                       */
5096 /*                                                                        */
5097 /*    DATE              NAME                      DESCRIPTION             */
5098 /*                                                                        */
5099 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5100 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5101 /*                                            resulting in version 6.1    */
5102 /*  11-09-2020     Yuxin Zhou               Modified comment(s), and      */
5103 /*                                            added packet id parameter,  */
5104 /*                                            resulting in version 6.1.2  */
5105 /*                                                                        */
5106 /**************************************************************************/
_nxd_mqtt_client_unsubscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length)5107 UINT _nxd_mqtt_client_unsubscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length)
5108 {
5109     return(_nxd_mqtt_client_sub_unsub(client_ptr, (MQTT_CONTROL_PACKET_TYPE_UNSUBSCRIBE << 4) | 0x02,
5110                                       topic_name, topic_name_length, NX_NULL, 0));
5111 }
5112 
5113 /**************************************************************************/
5114 /*                                                                        */
5115 /*  FUNCTION                                               RELEASE        */
5116 /*                                                                        */
5117 /*    _nxd_mqtt_send_simple_message                       PORTABLE C      */
5118 /*                                                           6.3.0        */
5119 /*  AUTHOR                                                                */
5120 /*                                                                        */
5121 /*    Yuxin Zhou, Microsoft Corporation                                   */
5122 /*                                                                        */
5123 /*  DESCRIPTION                                                           */
5124 /*                                                                        */
5125 /*    This internal function handles the transmission of PINGREQ or       */
5126 /*    DISCONNECT message.                                                 */
5127 /*                                                                        */
5128 /*                                                                        */
5129 /*  INPUT                                                                 */
5130 /*                                                                        */
5131 /*    client_ptr                            Pointer to MQTT Client        */
5132 /*    header_value                          Value to be programmed into   */
5133 /*                                            MQTT header.                */
5134 /*                                                                        */
5135 /*  OUTPUT                                                                */
5136 /*                                                                        */
5137 /*    status                                Completion status             */
5138 /*                                                                        */
5139 /*  CALLS                                                                 */
5140 /*                                                                        */
5141 /*    tx_mutex_get                                                        */
5142 /*    _nxd_mqtt_client_packet_allocate                                    */
5143 /*    tx_mutex_put                                                        */
5144 /*    _nxd_mqtt_packet_send                                               */
5145 /*    nx_packet_release                                                   */
5146 /*                                                                        */
5147 /*  CALLED BY                                                             */
5148 /*                                                                        */
5149 /*    _nxd_mqtt_client_ping                                               */
5150 /*    _nxd_mqtt_client_disconnect                                         */
5151 /*                                                                        */
5152 /*  RELEASE HISTORY                                                       */
5153 /*                                                                        */
5154 /*    DATE              NAME                      DESCRIPTION             */
5155 /*                                                                        */
5156 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5157 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5158 /*                                            resulting in version 6.1    */
5159 /*  07-29-2022     Spencer McDonough        Modified comment(s),          */
5160 /*                                            improved internal logic,    */
5161 /*                                            resulting in version 6.1.12 */
5162 /*  10-31-2022     Bo Chen                  Modified comment(s), improved */
5163 /*                                            the logic of sending packet,*/
5164 /*                                            resulting in version 6.2.0  */
5165 /*  10-31-2023     Haiqing Zhao             Modified comment(s), improved */
5166 /*                                            internal logic,             */
5167 /*                                            resulting in version 6.3.0  */
5168 /*                                                                        */
5169 /**************************************************************************/
_nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT * client_ptr,UCHAR header_value)5170 static UINT _nxd_mqtt_send_simple_message(NXD_MQTT_CLIENT *client_ptr, UCHAR header_value)
5171 {
5172 
5173 NX_PACKET *packet_ptr;
5174 UINT       status;
5175 UINT       status_mutex;
5176 UCHAR     *byte;
5177 
5178     status = _nxd_mqtt_client_packet_allocate(client_ptr, &packet_ptr, NX_WAIT_FOREVER);
5179     if (status)
5180     {
5181         return(NXD_MQTT_INTERNAL_ERROR);
5182     }
5183 
5184     if (2u > ((ULONG)(packet_ptr -> nx_packet_data_end) - (ULONG)(packet_ptr -> nx_packet_append_ptr)))
5185     {
5186         nx_packet_release(packet_ptr);
5187 
5188         /* Packet buffer is too small to hold the message. */
5189         return(NX_SIZE_ERROR);
5190     }
5191 
5192     byte = packet_ptr -> nx_packet_prepend_ptr;
5193 
5194     *byte = (UCHAR)(header_value << 4);
5195     *(byte + 1) = 0;
5196 
5197     packet_ptr -> nx_packet_append_ptr = packet_ptr -> nx_packet_prepend_ptr + 2;
5198     packet_ptr -> nx_packet_length = 2;
5199 
5200 
5201     /* Release MQTT protection before making NetX/TLS calls. */
5202     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5203 
5204     /* Send packet to server.  */
5205     status = _nxd_mqtt_packet_send(client_ptr, packet_ptr, NX_WAIT_FOREVER);
5206 
5207     status_mutex = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5208     if (status)
5209     {
5210 
5211         /* Release the packet. */
5212         nx_packet_release(packet_ptr);
5213 
5214         status = NXD_MQTT_COMMUNICATION_FAILURE;
5215     }
5216     if (status_mutex)
5217     {
5218         return(NXD_MQTT_MUTEX_FAILURE);
5219     }
5220 
5221     if (header_value == MQTT_CONTROL_PACKET_TYPE_PINGREQ)
5222     {
5223         /* Do not update the ping sent time if the outstanding ping has not been responded yet */
5224         if (client_ptr -> nxd_mqtt_ping_not_responded != 1)
5225         {
5226             /* Record the time we send out the PINGREG */
5227             client_ptr -> nxd_mqtt_ping_sent_time = tx_time_get();
5228             client_ptr -> nxd_mqtt_ping_not_responded = 1;
5229         }
5230     }
5231 
5232     /* Update the timeout value. */
5233     client_ptr -> nxd_mqtt_timeout = tx_time_get() + client_ptr -> nxd_mqtt_keepalive;
5234 
5235     return(status);
5236 }
5237 
5238 
5239 /**************************************************************************/
5240 /*                                                                        */
5241 /*  FUNCTION                                               RELEASE        */
5242 /*                                                                        */
5243 /*    _nxd_mqtt_client_disconnect                         PORTABLE C      */
5244 /*                                                           6.1          */
5245 /*  AUTHOR                                                                */
5246 /*                                                                        */
5247 /*    Yuxin Zhou, Microsoft Corporation                                   */
5248 /*                                                                        */
5249 /*  DESCRIPTION                                                           */
5250 /*                                                                        */
5251 /*    This function disconnects the MQTT client from a server.            */
5252 /*                                                                        */
5253 /*                                                                        */
5254 /*  INPUT                                                                 */
5255 /*                                                                        */
5256 /*    client_ptr                            Pointer to MQTT Client        */
5257 /*                                                                        */
5258 /*  OUTPUT                                                                */
5259 /*                                                                        */
5260 /*    status                                Completion status             */
5261 /*                                                                        */
5262 /*  CALLS                                                                 */
5263 /*                                                                        */
5264 /*    _nxd_mqtt_send_simple_message                                       */
5265 /*    _nxd_mqtt_process_disconnect                                        */
5266 /*                                                                        */
5267 /*  CALLED BY                                                             */
5268 /*                                                                        */
5269 /*    Application Code                                                    */
5270 /*                                                                        */
5271 /*  RELEASE HISTORY                                                       */
5272 /*                                                                        */
5273 /*    DATE              NAME                      DESCRIPTION             */
5274 /*                                                                        */
5275 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5276 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5277 /*                                            resulting in version 6.1    */
5278 /*                                                                        */
5279 /**************************************************************************/
_nxd_mqtt_client_disconnect(NXD_MQTT_CLIENT * client_ptr)5280 UINT _nxd_mqtt_client_disconnect(NXD_MQTT_CLIENT *client_ptr)
5281 {
5282 UINT status;
5283 
5284     /* Obtain the mutex. */
5285     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, TX_WAIT_FOREVER);
5286     if (status != TX_SUCCESS)
5287     {
5288         /* Disable timer if timer has been started. */
5289         if (client_ptr -> nxd_mqtt_keepalive)
5290         {
5291             tx_timer_delete(&(client_ptr -> nxd_mqtt_timer));
5292         }
5293 
5294         return(NXD_MQTT_MUTEX_FAILURE);
5295     }
5296 
5297     /* Let the server know we are ending the MQTT session. */
5298     _nxd_mqtt_send_simple_message(client_ptr, MQTT_CONTROL_PACKET_TYPE_DISCONNECT);
5299 
5300     /* Call the disconnect routine to disconnect the socket,
5301        release transmit packets, release received packets,
5302        and delete the client timer. */
5303     _nxd_mqtt_process_disconnect(client_ptr);
5304 
5305     /* Release the mutex. */
5306     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5307 
5308     return(status);
5309 }
5310 
5311 /**************************************************************************/
5312 /*                                                                        */
5313 /*  FUNCTION                                               RELEASE        */
5314 /*                                                                        */
5315 /*    _nxd_mqtt_client_receive_notify_set                 PORTABLE C      */
5316 /*                                                           6.1          */
5317 /*  AUTHOR                                                                */
5318 /*                                                                        */
5319 /*    Yuxin Zhou, Microsoft Corporation                                   */
5320 /*                                                                        */
5321 /*  DESCRIPTION                                                           */
5322 /*                                                                        */
5323 /*    This function installs the MQTT client publish notify callback      */
5324 /*    function.                                                           */
5325 /*                                                                        */
5326 /*                                                                        */
5327 /*  INPUT                                                                 */
5328 /*                                                                        */
5329 /*    client_ptr                            Pointer to MQTT Client        */
5330 /*    mqtt_client_receive_notify            User-supplied callback        */
5331 /*                                            function, which is invoked  */
5332 /*                                            upon receiving a publish    */
5333 /*                                            message.                    */
5334 /*                                                                        */
5335 /*  OUTPUT                                                                */
5336 /*                                                                        */
5337 /*    status                                Completion status             */
5338 /*                                                                        */
5339 /*  CALLS                                                                 */
5340 /*                                                                        */
5341 /*    tx_mutex_get                                                        */
5342 /*    tx_mutex_put                                                        */
5343 /*                                                                        */
5344 /*  CALLED BY                                                             */
5345 /*                                                                        */
5346 /*    Application Code                                                    */
5347 /*                                                                        */
5348 /*  RELEASE HISTORY                                                       */
5349 /*                                                                        */
5350 /*    DATE              NAME                      DESCRIPTION             */
5351 /*                                                                        */
5352 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5353 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5354 /*                                            resulting in version 6.1    */
5355 /*                                                                        */
5356 /**************************************************************************/
_nxd_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* receive_notify)(NXD_MQTT_CLIENT * client_ptr,UINT message_count))5357 UINT _nxd_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT *client_ptr,
5358                                          VOID (*receive_notify)(NXD_MQTT_CLIENT *client_ptr, UINT message_count))
5359 {
5360 
5361     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5362 
5363     client_ptr -> nxd_mqtt_client_receive_notify = receive_notify;
5364 
5365     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5366 
5367     return(NXD_MQTT_SUCCESS);
5368 }
5369 
5370 /**************************************************************************/
5371 /*                                                                        */
5372 /*  FUNCTION                                               RELEASE        */
5373 /*                                                                        */
5374 /*    _nxd_mqtt_client_message_get                        PORTABLE C      */
5375 /*                                                           6.1          */
5376 /*  AUTHOR                                                                */
5377 /*                                                                        */
5378 /*    Yuxin Zhou, Microsoft Corporation                                   */
5379 /*                                                                        */
5380 /*  DESCRIPTION                                                           */
5381 /*                                                                        */
5382 /*    This function retrieves a published MQTT message.                   */
5383 /*                                                                        */
5384 /*                                                                        */
5385 /*  INPUT                                                                 */
5386 /*                                                                        */
5387 /*    client_ptr                            Pointer to MQTT Client        */
5388 /*    topic_buffer                          Pointer to the topic buffer   */
5389 /*                                            where topic is copied to    */
5390 /*    topic_buffer_size                     Size of the topic buffer.     */
5391 /*    actual_topic_length                   Number of bytes copied into   */
5392 /*                                            topic_buffer                */
5393 /*    message_buffer                        Pointer to the buffer where   */
5394 /*                                            message is copied to        */
5395 /*    message_buffer_size                   Size of the message_buffer    */
5396 /*    actual_message_length                 Number of bytes copied into   */
5397 /*                                            the message buffer.         */
5398 /*                                                                        */
5399 /*  OUTPUT                                                                */
5400 /*                                                                        */
5401 /*    status                                Completion status             */
5402 /*                                                                        */
5403 /*  CALLS                                                                 */
5404 /*                                                                        */
5405 /*    _nxd_mqtt_client_disconnect           Actual MQTT Client disconnect */
5406 /*                                            call                        */
5407 /*    _nxd_mqtt_read_remaining_length       Skip the remaining length     */
5408 /*                                            field                       */
5409 /*    tx_mutex_get                                                        */
5410 /*    tx_mutex_put                                                        */
5411 /*                                                                        */
5412 /*  CALLED BY                                                             */
5413 /*                                                                        */
5414 /*    Application Code                                                    */
5415 /*                                                                        */
5416 /*  RELEASE HISTORY                                                       */
5417 /*                                                                        */
5418 /*    DATE              NAME                      DESCRIPTION             */
5419 /*                                                                        */
5420 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5421 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
5422 /*                                            fixed uninitialized value,  */
5423 /*                                            resulting in version 6.1    */
5424 /*                                                                        */
5425 /**************************************************************************/
_nxd_mqtt_client_message_get(NXD_MQTT_CLIENT * client_ptr,UCHAR * topic_buffer,UINT topic_buffer_size,UINT * actual_topic_length,UCHAR * message_buffer,UINT message_buffer_size,UINT * actual_message_length)5426 UINT _nxd_mqtt_client_message_get(NXD_MQTT_CLIENT *client_ptr, UCHAR *topic_buffer, UINT topic_buffer_size, UINT *actual_topic_length,
5427                                   UCHAR *message_buffer, UINT message_buffer_size, UINT *actual_message_length)
5428 {
5429 
5430 UINT                status;
5431 NX_PACKET          *packet_ptr;
5432 ULONG               topic_offset;
5433 USHORT              topic_length;
5434 ULONG               message_offset;
5435 ULONG               message_length;
5436 
5437     tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
5438     while (client_ptr -> message_receive_queue_depth)
5439     {
5440         packet_ptr = client_ptr -> message_receive_queue_head;
5441         status = _nxd_mqtt_process_publish_packet(packet_ptr, &topic_offset, &topic_length, &message_offset, &message_length);
5442         if (status == NXD_MQTT_SUCCESS)
5443         {
5444             if ((topic_buffer_size < topic_length) ||
5445                 (message_buffer_size < message_length))
5446             {
5447                 tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5448                 return(NXD_MQTT_INSUFFICIENT_BUFFER_SPACE);
5449             }
5450         }
5451 
5452         client_ptr -> message_receive_queue_head = packet_ptr -> nx_packet_queue_next;
5453         if (client_ptr -> message_receive_queue_tail == packet_ptr)
5454         {
5455             client_ptr -> message_receive_queue_tail = NX_NULL;
5456         }
5457         client_ptr -> message_receive_queue_depth--;
5458 
5459         if (status == NXD_MQTT_SUCCESS)
5460         {
5461 
5462             /* Set topic and message lengths to avoid uninitialized value. */
5463             *actual_topic_length = 0;
5464             *actual_message_length = 0;
5465             nx_packet_data_extract_offset(packet_ptr, topic_offset, topic_buffer,
5466                                           topic_length, (ULONG *)actual_topic_length);
5467             nx_packet_data_extract_offset(packet_ptr, message_offset, message_buffer,
5468                                           message_length, (ULONG *)actual_message_length);
5469             nx_packet_release(packet_ptr);
5470 
5471             tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5472             return(NXD_MQTT_SUCCESS);
5473         }
5474         nx_packet_release(packet_ptr);
5475     }
5476     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
5477     return(NXD_MQTT_NO_MESSAGE);
5478 }
5479 
5480 /**************************************************************************/
5481 /*                                                                        */
5482 /*  FUNCTION                                               RELEASE        */
5483 /*                                                                        */
5484 /*    _nxde_mqtt_client_create                            PORTABLE C      */
5485 /*                                                           6.1          */
5486 /*  AUTHOR                                                                */
5487 /*                                                                        */
5488 /*    Yuxin Zhou, Microsoft Corporation                                   */
5489 /*                                                                        */
5490 /*  DESCRIPTION                                                           */
5491 /*                                                                        */
5492 /*    This function checks for errors in nxd_mqt_client_create call.      */
5493 /*                                                                        */
5494 /*                                                                        */
5495 /*  INPUT                                                                 */
5496 /*                                                                        */
5497 /*    client_ptr                            Pointer to MQTT Client        */
5498 /*    client_name                           Name string used in by the    */
5499 /*                                            client                      */
5500 /*    client_id                             Client ID used by the client  */
5501 /*    client_id_length                      Length of Client ID, in bytes */
5502 /*    ip_ptr                                Pointer to IP instance        */
5503 /*    pool_ptr                              Pointer to packet pool        */
5504 /*    stack_ptr                             Client thread's stack pointer */
5505 /*    stack_size                            Client thread's stack size    */
5506 /*    mqtt_thread_priority                  Priority for MQTT thread      */
5507 /*    memory_ptr                            Deprecated and not used       */
5508 /*    memory_size                           Deprecated and not used       */
5509 /*                                                                        */
5510 /*  OUTPUT                                                                */
5511 /*                                                                        */
5512 /*    status                                Completion status             */
5513 /*                                                                        */
5514 /*  CALLS                                                                 */
5515 /*                                                                        */
5516 /*    _nxd_mqtt_client_create               Actual client create call     */
5517 /*                                                                        */
5518 /*  CALLED BY                                                             */
5519 /*                                                                        */
5520 /*    Application Code                                                    */
5521 /*                                                                        */
5522 /*  RELEASE HISTORY                                                       */
5523 /*                                                                        */
5524 /*    DATE              NAME                      DESCRIPTION             */
5525 /*                                                                        */
5526 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5527 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5528 /*                                            resulting in version 6.1    */
5529 /*                                                                        */
5530 /**************************************************************************/
_nxde_mqtt_client_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,VOID * stack_ptr,ULONG stack_size,UINT mqtt_thread_priority,VOID * memory_ptr,ULONG memory_size)5531 UINT _nxde_mqtt_client_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name, CHAR *client_id, UINT client_id_length,
5532                               NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr,
5533                               VOID *stack_ptr, ULONG stack_size, UINT mqtt_thread_priority,
5534                               VOID *memory_ptr, ULONG memory_size)
5535 {
5536 
5537 
5538     /* Check for invalid input pointers.  */
5539     if ((client_ptr == NX_NULL) || (ip_ptr == NX_NULL) || (ip_ptr -> nx_ip_id != NX_IP_ID) ||
5540         (stack_ptr == NX_NULL) || (stack_size == 0) || (pool_ptr == NX_NULL))
5541     {
5542         return(NX_PTR_ERROR);
5543     }
5544 
5545     return(_nxd_mqtt_client_create(client_ptr, client_name, client_id, client_id_length, ip_ptr,
5546                                    pool_ptr, stack_ptr, stack_size, mqtt_thread_priority,
5547                                    memory_ptr, memory_size));
5548 }
5549 
5550 
5551 /**************************************************************************/
5552 /*                                                                        */
5553 /*  FUNCTION                                               RELEASE        */
5554 /*                                                                        */
5555 /*    _nxde_mqtt_client_connect                           PORTABLE C      */
5556 /*                                                           6.1          */
5557 /*  AUTHOR                                                                */
5558 /*                                                                        */
5559 /*    Yuxin Zhou, Microsoft Corporation                                   */
5560 /*                                                                        */
5561 /*  DESCRIPTION                                                           */
5562 /*                                                                        */
5563 /*    This function checks for errors in MQTT client stop call.           */
5564 /*                                                                        */
5565 /*                                                                        */
5566 /*  INPUT                                                                 */
5567 /*                                                                        */
5568 /*    client_ptr                            Pointer to MQTT Client        */
5569 /*    server_ip                             Server IP address structure   */
5570 /*    server_port                           Server port number, in host   */
5571 /*                                            byte order                  */
5572 /*    keepalive                             The MQTT keepalive timer      */
5573 /*    clean_session                         Clean session flag            */
5574 /*    wait_option                           Suspension option             */
5575 /*                                                                        */
5576 /*                                                                        */
5577 /*  OUTPUT                                                                */
5578 /*                                                                        */
5579 /*    status                                Completion status             */
5580 /*                                                                        */
5581 /*  CALLS                                                                 */
5582 /*                                                                        */
5583 /*    _nxd_mqtt_client_connect              Actual MQTT Client connect    */
5584 /*                                            call                        */
5585 /*                                                                        */
5586 /*  CALLED BY                                                             */
5587 /*                                                                        */
5588 /*    Application Code                                                    */
5589 /*                                                                        */
5590 /*  RELEASE HISTORY                                                       */
5591 /*                                                                        */
5592 /*    DATE              NAME                      DESCRIPTION             */
5593 /*                                                                        */
5594 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5595 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
5596 /*                                            corrected mqtt client state,*/
5597 /*                                            resulting in version 6.1    */
5598 /*                                                                        */
5599 /**************************************************************************/
_nxde_mqtt_client_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT keepalive,UINT clean_session,ULONG wait_option)5600 UINT _nxde_mqtt_client_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
5601                                UINT keepalive, UINT clean_session, ULONG wait_option)
5602 {
5603 
5604 UINT status;
5605 
5606     if (client_ptr == NX_NULL)
5607     {
5608         return(NX_PTR_ERROR);
5609     }
5610 
5611     if (server_ip == NX_NULL)
5612     {
5613         return(NX_PTR_ERROR);
5614     }
5615 
5616     /* Test for IP version flag. */
5617     if ((server_ip -> nxd_ip_version != 4) && (server_ip -> nxd_ip_version != 6))
5618     {
5619         return(NXD_MQTT_INVALID_PARAMETER);
5620     }
5621 
5622     if (server_port == 0)
5623     {
5624         return(NX_INVALID_PORT);
5625     }
5626 
5627     status = _nxd_mqtt_client_connect(client_ptr, server_ip, server_port, keepalive, clean_session, wait_option);
5628 
5629     return(status);
5630 }
5631 
5632 /**************************************************************************/
5633 /*                                                                        */
5634 /*  FUNCTION                                               RELEASE        */
5635 /*                                                                        */
5636 /*    _nxde_mqtt_client_secure_connect                    PORTABLE C      */
5637 /*                                                           6.1          */
5638 /*  AUTHOR                                                                */
5639 /*                                                                        */
5640 /*    Yuxin Zhou, Microsoft Corporation                                   */
5641 /*                                                                        */
5642 /*  DESCRIPTION                                                           */
5643 /*                                                                        */
5644 /*    This function checks for errors in MQTT client TLS secure connect.  */
5645 /*                                                                        */
5646 /*                                                                        */
5647 /*  INPUT                                                                 */
5648 /*                                                                        */
5649 /*    client_ptr                            Pointer to MQTT Client        */
5650 /*    server_ip                             Server IP address structure   */
5651 /*    server_port                           Server port number, in host   */
5652 /*                                            byte order                  */
5653 /*    tls_setup                             User-supplied callback        */
5654 /*                                            function to set up TLS      */
5655 /*                                            parameters.                 */
5656 /*    keepalive                             The MQTT keepalive timer      */
5657 /*    wait_option                           Suspension option             */
5658 /*                                                                        */
5659 /*                                                                        */
5660 /*  OUTPUT                                                                */
5661 /*                                                                        */
5662 /*    status                                Completion status             */
5663 /*                                                                        */
5664 /*  CALLS                                                                 */
5665 /*                                                                        */
5666 /*    _nxd_mqtt_client_connect              Actual MQTT Client connect    */
5667 /*                                            call                        */
5668 /*                                                                        */
5669 /*  CALLED BY                                                             */
5670 /*                                                                        */
5671 /*    Application Code                                                    */
5672 /*                                                                        */
5673 /*  RELEASE HISTORY                                                       */
5674 /*                                                                        */
5675 /*    DATE              NAME                      DESCRIPTION             */
5676 /*                                                                        */
5677 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5678 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
5679 /*                                            corrected mqtt client state,*/
5680 /*                                            resulting in version 6.1    */
5681 /*                                                                        */
5682 /**************************************************************************/
5683 #ifdef NX_SECURE_ENABLE
5684 
_nxde_mqtt_client_secure_connect(NXD_MQTT_CLIENT * client_ptr,NXD_ADDRESS * server_ip,UINT server_port,UINT (* tls_setup)(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION *,NX_SECURE_X509_CERT *,NX_SECURE_X509_CERT *),UINT keepalive,UINT clean_session,ULONG wait_option)5685 UINT _nxde_mqtt_client_secure_connect(NXD_MQTT_CLIENT *client_ptr, NXD_ADDRESS *server_ip, UINT server_port,
5686                                       UINT (*tls_setup)(NXD_MQTT_CLIENT *client_ptr, NX_SECURE_TLS_SESSION *,
5687                                                         NX_SECURE_X509_CERT *, NX_SECURE_X509_CERT *),
5688                                       UINT keepalive, UINT clean_session, ULONG wait_option)
5689 {
5690 
5691 UINT status;
5692 
5693     if (client_ptr == NX_NULL)
5694     {
5695         return(NX_PTR_ERROR);
5696     }
5697 
5698     if (server_ip == NX_NULL)
5699     {
5700         return(NX_PTR_ERROR);
5701     }
5702 
5703     if (server_port == 0)
5704     {
5705         return(NX_INVALID_PORT);
5706     }
5707 
5708     status = _nxd_mqtt_client_secure_connect(client_ptr, server_ip, server_port, tls_setup,
5709                                              keepalive, clean_session, wait_option);
5710 
5711     return(status);
5712 }
5713 #endif /* NX_SECURE_ENABLE */
5714 
5715 
5716 /**************************************************************************/
5717 /*                                                                        */
5718 /*  FUNCTION                                               RELEASE        */
5719 /*                                                                        */
5720 /*    _nxde_mqtt_client_delete                            PORTABLE C      */
5721 /*                                                           6.1          */
5722 /*  AUTHOR                                                                */
5723 /*                                                                        */
5724 /*    Yuxin Zhou, Microsoft Corporation                                   */
5725 /*                                                                        */
5726 /*  DESCRIPTION                                                           */
5727 /*                                                                        */
5728 /*    This function checks for errors in MQTT client delete call.         */
5729 /*                                                                        */
5730 /*                                                                        */
5731 /*  INPUT                                                                 */
5732 /*                                                                        */
5733 /*    client_ptr                            Pointer to MQTT Client        */
5734 /*                                                                        */
5735 /*  OUTPUT                                                                */
5736 /*                                                                        */
5737 /*    status                                Completion status             */
5738 /*                                                                        */
5739 /*  CALLS                                                                 */
5740 /*                                                                        */
5741 /*    _nxd_mqtt_client_delete               Actual MQTT Client delete     */
5742 /*                                            call.                       */
5743 /*                                                                        */
5744 /*  CALLED BY                                                             */
5745 /*                                                                        */
5746 /*    Application Code                                                    */
5747 /*                                                                        */
5748 /*  RELEASE HISTORY                                                       */
5749 /*                                                                        */
5750 /*    DATE              NAME                      DESCRIPTION             */
5751 /*                                                                        */
5752 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5753 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5754 /*                                            resulting in version 6.1    */
5755 /*                                                                        */
5756 /**************************************************************************/
_nxde_mqtt_client_delete(NXD_MQTT_CLIENT * client_ptr)5757 UINT _nxde_mqtt_client_delete(NXD_MQTT_CLIENT *client_ptr)
5758 {
5759 
5760 UINT status;
5761 
5762     if (client_ptr == NX_NULL)
5763     {
5764         return(NX_PTR_ERROR);
5765     }
5766 
5767     status = _nxd_mqtt_client_delete(client_ptr);
5768 
5769     return(status);
5770 }
5771 
5772 
5773 
5774 /**************************************************************************/
5775 /*                                                                        */
5776 /*  FUNCTION                                               RELEASE        */
5777 /*                                                                        */
5778 /*    _nxde_mqtt_client_publish                           PORTABLE C      */
5779 /*                                                           6.1          */
5780 /*  AUTHOR                                                                */
5781 /*                                                                        */
5782 /*    Yuxin Zhou, Microsoft Corporation                                   */
5783 /*                                                                        */
5784 /*  DESCRIPTION                                                           */
5785 /*                                                                        */
5786 /*    This function performs error checking to the publish service.       */
5787 /*                                                                        */
5788 /*                                                                        */
5789 /*  INPUT                                                                 */
5790 /*                                                                        */
5791 /*    client_ptr                            Pointer to MQTT Client        */
5792 /*    topic_name                            Name of the topic             */
5793 /*    topic_name_length                     Length of the topic name      */
5794 /*    message                               Message string                */
5795 /*    message_length                        Length of the message,        */
5796 /*                                            in bytes                    */
5797 /*    retain                                The retain flag, whether      */
5798 /*                                            or not the broker should    */
5799 /*                                            store this message          */
5800 /*    QoS                                   Expected QoS level            */
5801 /*    wait_option                           Suspension option             */
5802 /*                                                                        */
5803 /*  OUTPUT                                                                */
5804 /*                                                                        */
5805 /*    status                                Completion status             */
5806 /*                                                                        */
5807 /*  CALLS                                                                 */
5808 /*                                                                        */
5809 /*                                                                        */
5810 /*  CALLED BY                                                             */
5811 /*                                                                        */
5812 /*    Application Code                                                    */
5813 /*                                                                        */
5814 /*  RELEASE HISTORY                                                       */
5815 /*                                                                        */
5816 /*    DATE              NAME                      DESCRIPTION             */
5817 /*                                                                        */
5818 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5819 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5820 /*                                            resulting in version 6.1    */
5821 /*                                                                        */
5822 /**************************************************************************/
_nxde_mqtt_client_publish(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,CHAR * message,UINT message_length,UINT retain,UINT QoS,ULONG wait_option)5823 UINT _nxde_mqtt_client_publish(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length,
5824                                CHAR *message, UINT message_length, UINT retain, UINT QoS, ULONG wait_option)
5825 {
5826     /* Validate client_ptr */
5827     if (client_ptr == NX_NULL)
5828     {
5829         return(NX_PTR_ERROR);
5830     }
5831 
5832     /* Validate topic_name */
5833     if ((topic_name == NX_NULL) || (topic_name_length == 0))
5834     {
5835         return(NXD_MQTT_INVALID_PARAMETER);
5836     }
5837 
5838     /* Validate message length. */
5839     if (message && (message_length == 0))
5840     {
5841         return(NXD_MQTT_INVALID_PARAMETER);
5842     }
5843 
5844     /* Validate QoS value. */
5845     if (QoS > 3)
5846     {
5847         return(NXD_MQTT_INVALID_PARAMETER);
5848     }
5849 
5850     return(_nxd_mqtt_client_publish(client_ptr, topic_name, topic_name_length, message, message_length, retain, QoS, wait_option));
5851 }
5852 
5853 
5854 /**************************************************************************/
5855 /*                                                                        */
5856 /*  FUNCTION                                               RELEASE        */
5857 /*                                                                        */
5858 /*    _nxde_mqtt_client_subscribe                         PORTABLE C      */
5859 /*                                                           6.1          */
5860 /*  AUTHOR                                                                */
5861 /*                                                                        */
5862 /*    Yuxin Zhou, Microsoft Corporation                                   */
5863 /*                                                                        */
5864 /*  DESCRIPTION                                                           */
5865 /*                                                                        */
5866 /*    This function performs error checking to the subscribe service.     */
5867 /*                                                                        */
5868 /*                                                                        */
5869 /*  INPUT                                                                 */
5870 /*                                                                        */
5871 /*    client_ptr                            Pointer to MQTT Client        */
5872 /*    topic_name                            Pointer to the topic string   */
5873 /*                                            to subscribe to             */
5874 /*    topic_name_length                     Length of the topic string    */
5875 /*                                            in bytes                    */
5876 /*                                                                        */
5877 /*  OUTPUT                                                                */
5878 /*                                                                        */
5879 /*    client_ptr                            Pointer to MQTT Client        */
5880 /*    topic_name                            Pointer to the topic string   */
5881 /*                                            to subscribe to             */
5882 /*    topic_name_length                     Length of the topic string    */
5883 /*                                            in bytes                    */
5884 /*    QoS                                   Expected QoS level            */
5885 /*                                                                        */
5886 /*  CALLS                                                                 */
5887 /*                                                                        */
5888 /*    _nxd_mqtt_client_subscribe                                          */
5889 /*                                                                        */
5890 /*  CALLED BY                                                             */
5891 /*                                                                        */
5892 /*    Application Code                                                    */
5893 /*                                                                        */
5894 /*  RELEASE HISTORY                                                       */
5895 /*                                                                        */
5896 /*    DATE              NAME                      DESCRIPTION             */
5897 /*                                                                        */
5898 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5899 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5900 /*                                            resulting in version 6.1    */
5901 /*                                                                        */
5902 /**************************************************************************/
_nxde_mqtt_client_subscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length,UINT QoS)5903 UINT _nxde_mqtt_client_subscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length, UINT QoS)
5904 {
5905 
5906 
5907     /* Validate client_ptr */
5908     if (client_ptr == NX_NULL)
5909     {
5910         return(NX_PTR_ERROR);
5911     }
5912 
5913     /* Validate topic_name */
5914     if ((topic_name == NX_NULL) || (topic_name_length == 0))
5915     {
5916         return(NXD_MQTT_INVALID_PARAMETER);
5917     }
5918 
5919     /* Validate QoS value. */
5920     if (QoS > 2)
5921     {
5922         return(NXD_MQTT_INVALID_PARAMETER);
5923     }
5924 
5925     return(_nxd_mqtt_client_subscribe(client_ptr, topic_name, topic_name_length, QoS));
5926 }
5927 
5928 
5929 
5930 
5931 /**************************************************************************/
5932 /*                                                                        */
5933 /*  FUNCTION                                               RELEASE        */
5934 /*                                                                        */
5935 /*    _nxde_mqtt_client_unsubscribe                       PORTABLE C      */
5936 /*                                                           6.1          */
5937 /*  AUTHOR                                                                */
5938 /*                                                                        */
5939 /*    Yuxin Zhou, Microsoft Corporation                                   */
5940 /*                                                                        */
5941 /*  DESCRIPTION                                                           */
5942 /*                                                                        */
5943 /*    This function performs error checking to the unsubscribe service.   */
5944 /*                                                                        */
5945 /*                                                                        */
5946 /*  INPUT                                                                 */
5947 /*                                                                        */
5948 /*    client_ptr                            Pointer to MQTT Client        */
5949 /*    topic_name                            Pointer to the topic string   */
5950 /*                                            to unsubscribe              */
5951 /*    topic_name_length                     Length of the topic string    */
5952 /*                                            in bytes                    */
5953 /*                                                                        */
5954 /*  OUTPUT                                                                */
5955 /*                                                                        */
5956 /*    status                                Completion status             */
5957 /*                                                                        */
5958 /*  CALLS                                                                 */
5959 /*                                                                        */
5960 /*                                                                        */
5961 /*  CALLED BY                                                             */
5962 /*                                                                        */
5963 /*    Application Code                                                    */
5964 /*                                                                        */
5965 /*  RELEASE HISTORY                                                       */
5966 /*                                                                        */
5967 /*    DATE              NAME                      DESCRIPTION             */
5968 /*                                                                        */
5969 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
5970 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
5971 /*                                            resulting in version 6.1    */
5972 /*                                                                        */
5973 /**************************************************************************/
_nxde_mqtt_client_unsubscribe(NXD_MQTT_CLIENT * client_ptr,CHAR * topic_name,UINT topic_name_length)5974 UINT _nxde_mqtt_client_unsubscribe(NXD_MQTT_CLIENT *client_ptr, CHAR *topic_name, UINT topic_name_length)
5975 {
5976     /* Validate client_ptr */
5977     if (client_ptr == NX_NULL)
5978     {
5979         return(NX_PTR_ERROR);
5980     }
5981 
5982     /* Validate topic_name */
5983     if ((topic_name == NX_NULL) || (topic_name_length == 0))
5984     {
5985         return(NXD_MQTT_INVALID_PARAMETER);
5986     }
5987 
5988     return(_nxd_mqtt_client_unsubscribe(client_ptr, topic_name, topic_name_length));
5989 }
5990 
5991 
5992 /**************************************************************************/
5993 /*                                                                        */
5994 /*  FUNCTION                                               RELEASE        */
5995 /*                                                                        */
5996 /*    _nxde_mqtt_client_disconnect                        PORTABLE C      */
5997 /*                                                           6.1          */
5998 /*  AUTHOR                                                                */
5999 /*                                                                        */
6000 /*    Yuxin Zhou, Microsoft Corporation                                   */
6001 /*                                                                        */
6002 /*  DESCRIPTION                                                           */
6003 /*                                                                        */
6004 /*    This function checks for errors in MQTT client disconnect call.     */
6005 /*                                                                        */
6006 /*                                                                        */
6007 /*  INPUT                                                                 */
6008 /*                                                                        */
6009 /*    client_ptr                            Pointer to MQTT Client        */
6010 /*                                                                        */
6011 /*  OUTPUT                                                                */
6012 /*                                                                        */
6013 /*    status                                Completion status             */
6014 /*                                                                        */
6015 /*  CALLS                                                                 */
6016 /*                                                                        */
6017 /*    _nxd_mqtt_client_disconnect           Actual MQTT Client disconnect */
6018 /*                                            call                        */
6019 /*                                                                        */
6020 /*  CALLED BY                                                             */
6021 /*                                                                        */
6022 /*    Application Code                                                    */
6023 /*                                                                        */
6024 /*  RELEASE HISTORY                                                       */
6025 /*                                                                        */
6026 /*    DATE              NAME                      DESCRIPTION             */
6027 /*                                                                        */
6028 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6029 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6030 /*                                            resulting in version 6.1    */
6031 /*                                                                        */
6032 /**************************************************************************/
_nxde_mqtt_client_disconnect(NXD_MQTT_CLIENT * client_ptr)6033 UINT _nxde_mqtt_client_disconnect(NXD_MQTT_CLIENT *client_ptr)
6034 {
6035     /* Validate client_ptr */
6036     if (client_ptr == NX_NULL)
6037     {
6038         return(NX_PTR_ERROR);
6039     }
6040 
6041     return(_nxd_mqtt_client_disconnect(client_ptr));
6042 }
6043 
6044 
6045 /**************************************************************************/
6046 /*                                                                        */
6047 /*  FUNCTION                                               RELEASE        */
6048 /*                                                                        */
6049 /*    _nxde_mqtt_client_message_get                       PORTABLE C      */
6050 /*                                                           6.1          */
6051 /*  AUTHOR                                                                */
6052 /*                                                                        */
6053 /*    Yuxin Zhou, Microsoft Corporation                                   */
6054 /*                                                                        */
6055 /*  DESCRIPTION                                                           */
6056 /*                                                                        */
6057 /*    This function checks for errors in MQTT client message get call.    */
6058 /*                                                                        */
6059 /*                                                                        */
6060 /*  INPUT                                                                 */
6061 /*                                                                        */
6062 /*    client_ptr                            Pointer to MQTT Client        */
6063 /*    topic_buffer                          Pointer to the topic buffer   */
6064 /*                                            where topic is copied to    */
6065 /*    topic_buffer_size                     Size of the topic buffer.     */
6066 /*    actual_topic_length                   Number of bytes copied into   */
6067 /*                                            topic_buffer                */
6068 /*    message_buffer                        Pointer to the buffer where   */
6069 /*                                            message is copied to        */
6070 /*    message_buffer_size                   Size of the message_buffer    */
6071 /*    actual_message_length                 Number of bytes copied into   */
6072 /*                                            the message buffer.         */
6073 /*                                                                        */
6074 /*  OUTPUT                                                                */
6075 /*                                                                        */
6076 /*    status                                Completion status             */
6077 /*                                                                        */
6078 /*  CALLS                                                                 */
6079 /*                                                                        */
6080 /*    _nxd_mqtt_client_message_get                                        */
6081 /*                                                                        */
6082 /*                                                                        */
6083 /*  CALLED BY                                                             */
6084 /*                                                                        */
6085 /*    Application Code                                                    */
6086 /*                                                                        */
6087 /*  RELEASE HISTORY                                                       */
6088 /*                                                                        */
6089 /*    DATE              NAME                      DESCRIPTION             */
6090 /*                                                                        */
6091 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6092 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6093 /*                                            resulting in version 6.1    */
6094 /*                                                                        */
6095 /**************************************************************************/
_nxde_mqtt_client_message_get(NXD_MQTT_CLIENT * client_ptr,UCHAR * topic_buffer,UINT topic_buffer_size,UINT * actual_topic_length,UCHAR * message_buffer,UINT message_buffer_size,UINT * actual_message_length)6096 UINT _nxde_mqtt_client_message_get(NXD_MQTT_CLIENT *client_ptr, UCHAR *topic_buffer, UINT topic_buffer_size, UINT *actual_topic_length,
6097                                    UCHAR *message_buffer, UINT message_buffer_size, UINT *actual_message_length)
6098 {
6099 
6100     /* Validate client_ptr */
6101     if (client_ptr == NX_NULL)
6102     {
6103         return(NX_PTR_ERROR);
6104     }
6105 
6106     /* Topic and topic_length can be NULL if caller does not care the topic string. */
6107 
6108     /* Validate message.  Message_length can be NULL if caller does not care message length. */
6109     if ((message_buffer == NX_NULL) || (topic_buffer == NX_NULL))
6110     {
6111         return(NXD_MQTT_INVALID_PARAMETER);
6112     }
6113 
6114 
6115     return(_nxd_mqtt_client_message_get(client_ptr, topic_buffer, topic_buffer_size, actual_topic_length,
6116                                         message_buffer, message_buffer_size, actual_message_length));
6117 }
6118 
6119 
6120 /**************************************************************************/
6121 /*                                                                        */
6122 /*  FUNCTION                                               RELEASE        */
6123 /*                                                                        */
6124 /*    _nxde_mqtt_client_receive_notify_set                PORTABLE C      */
6125 /*                                                           6.1          */
6126 /*  AUTHOR                                                                */
6127 /*                                                                        */
6128 /*    Yuxin Zhou, Microsoft Corporation                                   */
6129 /*                                                                        */
6130 /*  DESCRIPTION                                                           */
6131 /*                                                                        */
6132 /*    This function checks for errors in MQTT client publish notify call. */
6133 /*                                                                        */
6134 /*                                                                        */
6135 /*  INPUT                                                                 */
6136 /*                                                                        */
6137 /*    client_ptr                            Pointer to MQTT Client        */
6138 /*    receive_notify                        User-supplied callback        */
6139 /*                                            function, which is invoked  */
6140 /*                                            upon receiving a publish    */
6141 /*                                            message.                    */
6142 /*                                                                        */
6143 /*  OUTPUT                                                                */
6144 /*                                                                        */
6145 /*    status                                Completion status             */
6146 /*                                                                        */
6147 /*  CALLS                                                                 */
6148 /*                                                                        */
6149 /*    _nxd_mqtt_client_receive_notify_set                                 */
6150 /*                                                                        */
6151 /*                                                                        */
6152 /*  CALLED BY                                                             */
6153 /*                                                                        */
6154 /*    Application Code                                                    */
6155 /*                                                                        */
6156 /*  RELEASE HISTORY                                                       */
6157 /*                                                                        */
6158 /*    DATE              NAME                      DESCRIPTION             */
6159 /*                                                                        */
6160 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6161 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6162 /*                                            resulting in version 6.1    */
6163 /*                                                                        */
6164 /**************************************************************************/
_nxde_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* receive_notify)(NXD_MQTT_CLIENT * client_ptr,UINT message_count))6165 UINT _nxde_mqtt_client_receive_notify_set(NXD_MQTT_CLIENT *client_ptr,
6166                                           VOID (*receive_notify)(NXD_MQTT_CLIENT *client_ptr, UINT message_count))
6167 {
6168     /* Validate client_ptr */
6169     if (client_ptr == NX_NULL)
6170     {
6171         return(NX_PTR_ERROR);
6172     }
6173 
6174     if (receive_notify == NX_NULL)
6175     {
6176         return(NX_PTR_ERROR);
6177     }
6178 
6179     return(_nxd_mqtt_client_receive_notify_set(client_ptr, receive_notify));
6180 }
6181 
6182 
6183 
6184 /**************************************************************************/
6185 /*                                                                        */
6186 /*  FUNCTION                                               RELEASE        */
6187 /*                                                                        */
6188 /*    _nxd_mqtt_client_disconnect_notify_set              PORTABLE C      */
6189 /*                                                           6.1          */
6190 /*  AUTHOR                                                                */
6191 /*                                                                        */
6192 /*    Yuxin Zhou, Microsoft Corporation                                   */
6193 /*                                                                        */
6194 /*  DESCRIPTION                                                           */
6195 /*                                                                        */
6196 /*    This function sets the notify function for the disconnect event.    */
6197 /*                                                                        */
6198 /*  INPUT                                                                 */
6199 /*                                                                        */
6200 /*    client_ptr                            Pointer to MQTT Client        */
6201 /*    disconnect_notify                     The notify function to be     */
6202 /*                                            used when the client is     */
6203 /*                                            disconnected from the       */
6204 /*                                            server.                     */
6205 /*                                                                        */
6206 /*  OUTPUT                                                                */
6207 /*                                                                        */
6208 /*    status                                Completion status             */
6209 /*                                                                        */
6210 /*  CALLS                                                                 */
6211 /*                                                                        */
6212 /*    None                                                                */
6213 /*                                                                        */
6214 /*  CALLED BY                                                             */
6215 /*                                                                        */
6216 /*    Application Code                                                    */
6217 /*                                                                        */
6218 /*  RELEASE HISTORY                                                       */
6219 /*                                                                        */
6220 /*    DATE              NAME                      DESCRIPTION             */
6221 /*                                                                        */
6222 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6223 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6224 /*                                            resulting in version 6.1    */
6225 /*                                                                        */
6226 /**************************************************************************/
_nxd_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* disconnect_notify)(NXD_MQTT_CLIENT *))6227 UINT _nxd_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT *client_ptr, VOID (*disconnect_notify)(NXD_MQTT_CLIENT *))
6228 {
6229 
6230     client_ptr -> nxd_mqtt_disconnect_notify = disconnect_notify;
6231 
6232     return(NXD_MQTT_SUCCESS);
6233 }
6234 
6235 
6236 /**************************************************************************/
6237 /*                                                                        */
6238 /*  FUNCTION                                               RELEASE        */
6239 /*                                                                        */
6240 /*    _nxde_mqtt_client_disconnect_notify_set             PORTABLE C      */
6241 /*                                                           6.1          */
6242 /*  AUTHOR                                                                */
6243 /*                                                                        */
6244 /*    Yuxin Zhou, Microsoft Corporation                                   */
6245 /*                                                                        */
6246 /*  DESCRIPTION                                                           */
6247 /*                                                                        */
6248 /*    This function checks for errors in setting MQTT client disconnect   */
6249 /*    callback function.                                                  */
6250 /*                                                                        */
6251 /*  INPUT                                                                 */
6252 /*                                                                        */
6253 /*    client_ptr                            Pointer to MQTT Client        */
6254 /*    disconnect_callback                   The callback function to be   */
6255 /*                                            used when an on-going       */
6256 /*                                            connection is disconnected. */
6257 /*                                                                        */
6258 /*  OUTPUT                                                                */
6259 /*                                                                        */
6260 /*    status                                Completion status             */
6261 /*                                                                        */
6262 /*  CALLS                                                                 */
6263 /*                                                                        */
6264 /*    _nxd_mqtt_client_disconnect_notify_set                              */
6265 /*                                                                        */
6266 /*  CALLED BY                                                             */
6267 /*                                                                        */
6268 /*    Application Code                                                    */
6269 /*                                                                        */
6270 /*  RELEASE HISTORY                                                       */
6271 /*                                                                        */
6272 /*    DATE              NAME                      DESCRIPTION             */
6273 /*                                                                        */
6274 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6275 /*  09-30-2020     Yuxin Zhou               Modified comment(s),          */
6276 /*                                            resulting in version 6.1    */
6277 /*                                                                        */
6278 /**************************************************************************/
_nxde_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT * client_ptr,VOID (* disconnect_notify)(NXD_MQTT_CLIENT *))6279 UINT _nxde_mqtt_client_disconnect_notify_set(NXD_MQTT_CLIENT *client_ptr, VOID (*disconnect_notify)(NXD_MQTT_CLIENT *))
6280 {
6281 
6282     /* Validate client_ptr */
6283     if (client_ptr == NX_NULL)
6284     {
6285         return(NX_PTR_ERROR);
6286     }
6287     return(_nxd_mqtt_client_disconnect_notify_set(client_ptr, disconnect_notify));
6288 }
6289 
6290 
6291 #ifdef NXD_MQTT_CLOUD_ENABLE
6292 /**************************************************************************/
6293 /*                                                                        */
6294 /*  FUNCTION                                               RELEASE        */
6295 /*                                                                        */
6296 /*    _nxd_mqtt_client_cloud_create                       PORTABLE C      */
6297 /*                                                           6.1          */
6298 /*  AUTHOR                                                                */
6299 /*                                                                        */
6300 /*    Yuxin Zhou, Microsoft Corporation                                   */
6301 /*                                                                        */
6302 /*  DESCRIPTION                                                           */
6303 /*                                                                        */
6304 /*    This function creates mqtt client running on cloud helper thread.   */
6305 /*                                                                        */
6306 /*                                                                        */
6307 /*  INPUT                                                                 */
6308 /*                                                                        */
6309 /*    client_ptr                            Pointer to MQTT Client        */
6310 /*    client_name                           Name string used in by the    */
6311 /*                                            client                      */
6312 /*    client_id                             Client ID used by the client  */
6313 /*    client_id_length                      Length of Client ID, in bytes */
6314 /*    ip_ptr                                Pointer to IP instance        */
6315 /*    pool_ptr                              Pointer to packet pool        */
6316 /*    cloud_ptr                             Pointer to Cloud instance     */
6317 /*                                                                        */
6318 /*  OUTPUT                                                                */
6319 /*                                                                        */
6320 /*    status                                Completion status             */
6321 /*                                                                        */
6322 /*  CALLS                                                                 */
6323 /*                                                                        */
6324 /*    _nxd_mqtt_client_create               Actual client create call     */
6325 /*                                                                        */
6326 /*  CALLED BY                                                             */
6327 /*                                                                        */
6328 /*    Application Code                                                    */
6329 /*                                                                        */
6330 /*  RELEASE HISTORY                                                       */
6331 /*                                                                        */
6332 /*    DATE              NAME                      DESCRIPTION             */
6333 /*                                                                        */
6334 /*  05-19-2020     Yuxin Zhou               Initial Version 6.0           */
6335 /*  09-30-2020     Yuxin Zhou               Modified comment(s), and      */
6336 /*                                            corrected mqtt client state,*/
6337 /*                                            resulting in version 6.1    */
6338 /*                                                                        */
6339 /**************************************************************************/
_nxd_mqtt_client_cloud_create(NXD_MQTT_CLIENT * client_ptr,CHAR * client_name,CHAR * client_id,UINT client_id_length,NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr,NX_CLOUD * cloud_ptr)6340 UINT _nxd_mqtt_client_cloud_create(NXD_MQTT_CLIENT *client_ptr, CHAR *client_name, CHAR *client_id, UINT client_id_length,
6341                                    NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr, NX_CLOUD *cloud_ptr)
6342 {
6343 
6344 UINT    status;
6345 
6346 
6347     /* Check for invalid input pointers.  */
6348     if ((client_ptr == NX_NULL) || (ip_ptr == NX_NULL) || (ip_ptr -> nx_ip_id != NX_IP_ID) ||
6349         (pool_ptr == NX_NULL) || (cloud_ptr == NX_NULL) || (cloud_ptr -> nx_cloud_id != NX_CLOUD_ID))
6350     {
6351         return(NX_PTR_ERROR);
6352     }
6353 
6354     /* Create MQTT client.  */
6355     status = _nxd_mqtt_client_create_internal(client_ptr, client_name, client_id, client_id_length, ip_ptr,
6356                                               pool_ptr, NX_NULL, 0, 0);
6357 
6358     /* Check status.  */
6359     if (status)
6360     {
6361         return(status);
6362     }
6363 
6364     /* Save the cloud pointer.  */
6365     client_ptr -> nxd_mqtt_client_cloud_ptr = cloud_ptr;
6366 
6367     /* Save the mutex pointer.  */
6368     client_ptr -> nxd_mqtt_client_mutex_ptr = &(cloud_ptr -> nx_cloud_mutex);
6369 
6370     /* Register MQTT on cloud helper.  */
6371     status = nx_cloud_module_register(client_ptr -> nxd_mqtt_client_cloud_ptr, &(client_ptr -> nxd_mqtt_client_cloud_module), client_name, NX_CLOUD_MODULE_MQTT_EVENT,
6372                                       _nxd_mqtt_client_event_process, client_ptr);
6373 
6374     /* Determine if an error occurred.  */
6375     if (status != NX_SUCCESS)
6376     {
6377 
6378         /* Delete internal resource created in _nxd_mqtt_client_create_internal().  */
6379 
6380         /* Delete socket.  */
6381         nx_tcp_socket_delete(&(client_ptr -> nxd_mqtt_client_socket));
6382 
6383         return(NXD_MQTT_INTERNAL_ERROR);
6384     }
6385 
6386     /* Update state.  */
6387     client_ptr -> nxd_mqtt_client_state = NXD_MQTT_CLIENT_STATE_IDLE;
6388 
6389     return(NXD_MQTT_SUCCESS);
6390 }
6391 #endif /* NXD_MQTT_CLOUD_ENABLE */
6392 
6393 #ifdef NXD_MQTT_OVER_WEBSOCKET
6394 /**************************************************************************/
6395 /*                                                                        */
6396 /*  FUNCTION                                               RELEASE        */
6397 /*                                                                        */
6398 /*    _nxd_mqtt_client_websocket_connection_status_callback               */
6399 /*                                                        PORTABLE C      */
6400 /*                                                           6.2.0        */
6401 /*  AUTHOR                                                                */
6402 /*                                                                        */
6403 /*    Yuxin Zhou, Microsoft Corporation                                   */
6404 /*                                                                        */
6405 /*  DESCRIPTION                                                           */
6406 /*                                                                        */
6407 /*    This function is the websocket connection status callback.          */
6408 /*                                                                        */
6409 /*  INPUT                                                                 */
6410 /*                                                                        */
6411 /*    websocket_client_ptr                  Pointer to websocket client   */
6412 /*    context                               Pointer to MQTT client        */
6413 /*    status                                Websocket connection status   */
6414 /*                                                                        */
6415 /*  OUTPUT                                                                */
6416 /*                                                                        */
6417 /*    None                                                                */
6418 /*                                                                        */
6419 /*  CALLS                                                                 */
6420 /*                                                                        */
6421 /*    _nxd_mqtt_client_connect_packet_send                                */
6422 /*    _nxd_mqtt_client_connection_end                                     */
6423 /*                                                                        */
6424 /*  CALLED BY                                                             */
6425 /*                                                                        */
6426 /*    _nxd_mqtt_client_event_process                                      */
6427 /*                                                                        */
6428 /*  RELEASE HISTORY                                                       */
6429 /*                                                                        */
6430 /*    DATE              NAME                      DESCRIPTION             */
6431 /*                                                                        */
6432 /*  10-31-2022     Yuxin Zhou               Initial Version 6.2.0         */
6433 /*                                                                        */
6434 /**************************************************************************/
_nxd_mqtt_client_websocket_connection_status_callback(NX_WEBSOCKET_CLIENT * websocket_client_ptr,VOID * context,UINT status)6435 VOID _nxd_mqtt_client_websocket_connection_status_callback(NX_WEBSOCKET_CLIENT *websocket_client_ptr, VOID *context, UINT status)
6436 {
6437 NXD_MQTT_CLIENT *client_ptr = (NXD_MQTT_CLIENT *)context;
6438 
6439 
6440     NX_PARAMETER_NOT_USED(websocket_client_ptr);
6441 
6442     if (status == NX_SUCCESS)
6443     {
6444 
6445         /* Start to send MQTT connect packet.  */
6446         status = _nxd_mqtt_client_connect_packet_send(client_ptr, NX_NO_WAIT);
6447     }
6448 
6449     /* If an error occurs.  */
6450     if (status)
6451     {
6452 
6453         /* End connection. */
6454         _nxd_mqtt_client_connection_end(client_ptr, NX_NO_WAIT);
6455 
6456         /* Check callback function.  */
6457         if (client_ptr -> nxd_mqtt_connect_notify)
6458         {
6459             client_ptr -> nxd_mqtt_connect_notify(client_ptr, status, client_ptr -> nxd_mqtt_connect_context);
6460         }
6461     }
6462 }
6463 
6464 /**************************************************************************/
6465 /*                                                                        */
6466 /*  FUNCTION                                               RELEASE        */
6467 /*                                                                        */
6468 /*    _nxd_mqtt_client_websocket_set                      PORTABLE C      */
6469 /*                                                           6.2.0        */
6470 /*  AUTHOR                                                                */
6471 /*                                                                        */
6472 /*    Yuxin Zhou, Microsoft Corporation                                   */
6473 /*                                                                        */
6474 /*  DESCRIPTION                                                           */
6475 /*                                                                        */
6476 /*    This function sets the websocket.                                   */
6477 /*                                                                        */
6478 /*  INPUT                                                                 */
6479 /*                                                                        */
6480 /*    client_ptr                            Pointer to MQTT Client        */
6481 /*    host                                  Host used by the client       */
6482 /*    host_length                           Length of host, in bytes      */
6483 /*    uri_path                              URI path used by the client   */
6484 /*    uri_path_length                       Length of uri path, in bytes  */
6485 /*                                                                        */
6486 /*  OUTPUT                                                                */
6487 /*                                                                        */
6488 /*    status                                Completion status             */
6489 /*                                                                        */
6490 /*  CALLS                                                                 */
6491 /*                                                                        */
6492 /*    None                                                                */
6493 /*                                                                        */
6494 /*  CALLED BY                                                             */
6495 /*                                                                        */
6496 /*    Application Code                                                    */
6497 /*                                                                        */
6498 /*  RELEASE HISTORY                                                       */
6499 /*                                                                        */
6500 /*    DATE              NAME                      DESCRIPTION             */
6501 /*                                                                        */
6502 /*  10-31-2022     Yuxin Zhou               Initial Version 6.2.0         */
6503 /*                                                                        */
6504 /**************************************************************************/
_nxd_mqtt_client_websocket_set(NXD_MQTT_CLIENT * client_ptr,UCHAR * host,UINT host_length,UCHAR * uri_path,UINT uri_path_length)6505 UINT _nxd_mqtt_client_websocket_set(NXD_MQTT_CLIENT *client_ptr, UCHAR *host, UINT host_length, UCHAR *uri_path, UINT uri_path_length)
6506 {
6507 UINT status;
6508 
6509     /* Obtain the mutex. */
6510     status = tx_mutex_get(client_ptr -> nxd_mqtt_client_mutex_ptr, NX_WAIT_FOREVER);
6511 
6512     if (status != TX_SUCCESS)
6513     {
6514         return(NXD_MQTT_MUTEX_FAILURE);
6515     }
6516 
6517     /* Set the host info.  */
6518     client_ptr -> nxd_mqtt_client_websocket_host = host;
6519     client_ptr -> nxd_mqtt_client_websocket_host_length = host_length;
6520     client_ptr -> nxd_mqtt_client_websocket_uri_path = uri_path;
6521     client_ptr -> nxd_mqtt_client_websocket_uri_path_length = uri_path_length;
6522 
6523     /* Create WebSocket.  */
6524     status = nx_websocket_client_create(&client_ptr -> nxd_mqtt_client_websocket, (UCHAR *)"",
6525                                         client_ptr -> nxd_mqtt_client_ip_ptr,
6526                                         client_ptr -> nxd_mqtt_client_packet_pool_ptr);
6527 
6528     /* Check status.  */
6529     if (status)
6530     {
6531         tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
6532         return(status);
6533     }
6534 
6535     client_ptr -> nxd_mqtt_client_use_websocket = NX_TRUE;
6536 
6537     tx_mutex_put(client_ptr -> nxd_mqtt_client_mutex_ptr);
6538     return(NXD_MQTT_SUCCESS);
6539 }
6540 
6541 
6542 /**************************************************************************/
6543 /*                                                                        */
6544 /*  FUNCTION                                               RELEASE        */
6545 /*                                                                        */
6546 /*    _nxde_mqtt_client_websocket_set                     PORTABLE C      */
6547 /*                                                           6.2.0        */
6548 /*  AUTHOR                                                                */
6549 /*                                                                        */
6550 /*    Yuxin Zhou, Microsoft Corporation                                   */
6551 /*                                                                        */
6552 /*  DESCRIPTION                                                           */
6553 /*                                                                        */
6554 /*    This function checks for errors in setting MQTT client websocket.   */
6555 /*                                                                        */
6556 /*  INPUT                                                                 */
6557 /*                                                                        */
6558 /*    client_ptr                            Pointer to MQTT Client        */
6559 /*    host                                  Host used by the client       */
6560 /*    host_length                           Length of host, in bytes      */
6561 /*    uri_path                              URI path used by the client   */
6562 /*    uri_path_length                       Length of uri path, in bytes  */
6563 /*                                                                        */
6564 /*  OUTPUT                                                                */
6565 /*                                                                        */
6566 /*    status                                Completion status             */
6567 /*                                                                        */
6568 /*  CALLS                                                                 */
6569 /*                                                                        */
6570 /*    _nxd_mqtt_client_websocket_set                                      */
6571 /*                                                                        */
6572 /*  CALLED BY                                                             */
6573 /*                                                                        */
6574 /*    Application Code                                                    */
6575 /*                                                                        */
6576 /*  RELEASE HISTORY                                                       */
6577 /*                                                                        */
6578 /*    DATE              NAME                      DESCRIPTION             */
6579 /*                                                                        */
6580 /*  10-31-2022     Yuxin Zhou               Initial Version 6.2.0         */
6581 /*                                                                        */
6582 /**************************************************************************/
_nxde_mqtt_client_websocket_set(NXD_MQTT_CLIENT * client_ptr,UCHAR * host,UINT host_length,UCHAR * uri_path,UINT uri_path_length)6583 UINT _nxde_mqtt_client_websocket_set(NXD_MQTT_CLIENT *client_ptr, UCHAR *host, UINT host_length, UCHAR *uri_path, UINT uri_path_length)
6584 {
6585 
6586     /* Validate the parameters.  */
6587     if ((client_ptr == NX_NULL) || (host == NX_NULL) || (host_length == 0) ||
6588         (uri_path == NX_NULL) || (uri_path_length == 0))
6589     {
6590         return(NX_PTR_ERROR);
6591     }
6592 
6593     return(_nxd_mqtt_client_websocket_set(client_ptr, host, host_length, uri_path, uri_path_length));
6594 }
6595 #endif /* NXD_MQTT_OVER_WEBSOCKET */
6596