1 /***************************************************************************
2  * Copyright (c) 2024 Microsoft Corporation
3  *
4  * This program and the accompanying materials are made available under the
5  * terms of the MIT License which is available at
6  * https://opensource.org/licenses/MIT.
7  *
8  * SPDX-License-Identifier: MIT
9  **************************************************************************/
10 
11 /*
12 
13    This is a small demonstration of the high-performance NetX TCP/IP stack
14    MQTT client.
15 
16    This demo program establishes a connection to the Mosquitto server at IP address
17    10.0.10.1.  It subscribes to a topic, send a message to the same topic.
18    The application shall be able to receive the same message from the broker.
19 
20    This demo assumes that the NetX Duo TCP/IP stack has been properly configured,
21    with TCP module enabled.   The entry function is "thread_mqtt_entry".  Caller
22    needs to pass in the IP instance and an instance of a valid packet pool.
23 */
24 
25 #include "nx_api.h"
26 #include "nxd_mqtt_client.h"
27 
28 /* MQTT Demo defines */
29 
30 /* IP Address of the local server. */
31 #define  LOCAL_SERVER_ADDRESS (IP_ADDRESS(192, 168, 1, 1))
32 
33 
34 /*****************************************************************************************/
35 /* MQTT Local Server IoT Client example.                                                 */
36 /*****************************************************************************************/
37 
38 #define  DEMO_STACK_SIZE            2048
39 #define  CLIENT_ID_STRING           "mytestclient"
40 #define  MQTT_CLIENT_STACK_SIZE     4096
41 
42 #define  STRLEN(p)                  (sizeof(p) - 1)
43 
44 
45 /* Declare the MQTT thread stack space. */
46 static ULONG                        mqtt_client_stack[MQTT_CLIENT_STACK_SIZE / sizeof(ULONG)];
47 
48 /* Declare the MQTT client control block. */
49 static NXD_MQTT_CLIENT              mqtt_client;
50 
51 /* Define the symbol for signaling a received message. */
52 
53 
54 /* Define the test threads.  */
55 #define TOPIC_NAME                  "topic"
56 #define MESSAGE_STRING              "This is a message. "
57 
58 /* Define the priority of the MQTT internal thread. */
59 #define MQTT_THREAD_PRIORTY         2
60 
61 /* Define the MQTT keep alive timer for 5 minutes */
62 #define MQTT_KEEP_ALIVE_TIMER       300
63 
64 #define QOS0                        0
65 #define QOS1                        1
66 
67 /* Declare event flag, which is used in this demo. */
68 TX_EVENT_FLAGS_GROUP                mqtt_app_flag;
69 #define DEMO_MESSAGE_EVENT          1
70 #define DEMO_ALL_EVENTS             3
71 
72 /* Declare buffers to hold message and topic. */
73 static UCHAR message_buffer[NXD_MQTT_MAX_MESSAGE_LENGTH];
74 static UCHAR topic_buffer[NXD_MQTT_MAX_TOPIC_NAME_LENGTH];
75 
76 /* Declare the disconnect notify function. */
my_disconnect_func(NXD_MQTT_CLIENT * client_ptr)77 static VOID my_disconnect_func(NXD_MQTT_CLIENT *client_ptr)
78 {
79     NX_PARAMETER_NOT_USED(client_ptr);
80     printf("client disconnected from server\n");
81 }
82 
83 
my_notify_func(NXD_MQTT_CLIENT * client_ptr,UINT number_of_messages)84 static VOID my_notify_func(NXD_MQTT_CLIENT* client_ptr, UINT number_of_messages)
85 {
86     NX_PARAMETER_NOT_USED(client_ptr);
87     NX_PARAMETER_NOT_USED(number_of_messages);
88     tx_event_flags_set(&mqtt_app_flag, DEMO_MESSAGE_EVENT, TX_OR);
89     return;
90 
91 }
92 
93 void thread_mqtt_entry(NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr);
94 
95 static ULONG    error_counter;
thread_mqtt_entry(NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr)96 void thread_mqtt_entry(NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr)
97 {
98 UINT status;
99 NXD_ADDRESS server_ip;
100 ULONG events;
101 UINT topic_length, message_length;
102 
103     /* Create MQTT client instance. */
104     status = nxd_mqtt_client_create(&mqtt_client, "my_client", CLIENT_ID_STRING, STRLEN(CLIENT_ID_STRING),
105                                     ip_ptr, pool_ptr, (VOID*)mqtt_client_stack, sizeof(mqtt_client_stack),
106                                     MQTT_THREAD_PRIORTY, NX_NULL, 0);
107 
108     if (status)
109     {
110         printf("Error in creating MQTT client: 0x%02x\n", status);
111         error_counter++;
112     }
113 
114 #ifdef NXD_MQTT_OVER_WEBSOCKET
115     status = nxd_mqtt_client_websocket_set(&mqtt_client, (UCHAR *)"test.mosquitto.org", sizeof("test.mosquitto.org") - 1,
116                                            (UCHAR *)"/mqtt", sizeof("/mqtt") - 1);
117     if (status)
118     {
119         printf("Error in setting MQTT over WebSocket: 0x%02x\r\n", status);
120         error_counter++;
121     }
122 #endif /* NXD_MQTT_OVER_WEBSOCKET */
123 
124     /* Register the disconnect notification function. */
125     nxd_mqtt_client_disconnect_notify_set(&mqtt_client, my_disconnect_func);
126 
127     /* Create an event flag for this demo. */
128     status = tx_event_flags_create(&mqtt_app_flag, "my app event");
129     if(status)
130         error_counter++;
131 
132 
133     server_ip.nxd_ip_version = 4;
134     server_ip.nxd_ip_address.v4 = LOCAL_SERVER_ADDRESS;
135 
136 
137     /* Start the connection to the server. */
138     status = nxd_mqtt_client_connect(&mqtt_client, &server_ip, NXD_MQTT_PORT,
139                                      MQTT_KEEP_ALIVE_TIMER, 0, NX_WAIT_FOREVER);
140     if(status)
141         error_counter++;
142 
143     /* Subscribe to the topic with QoS level 0. */
144     status = nxd_mqtt_client_subscribe(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME), QOS0);
145     if(status)
146         error_counter++;
147 
148     /* Set the receive notify function. */
149     status = nxd_mqtt_client_receive_notify_set(&mqtt_client, my_notify_func);
150     if(status)
151         error_counter++;
152 
153     /* Publish a message with QoS Level 1. */
154     status = nxd_mqtt_client_publish(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME),
155                                      (CHAR*)MESSAGE_STRING, STRLEN(MESSAGE_STRING), 0, QOS1, NX_WAIT_FOREVER);
156 
157 
158     /* Now wait for the broker to publish the message. */
159 
160     tx_event_flags_get(&mqtt_app_flag, DEMO_ALL_EVENTS, TX_OR_CLEAR, &events, TX_WAIT_FOREVER);
161     if(events & DEMO_MESSAGE_EVENT)
162     {
163         status = nxd_mqtt_client_message_get(&mqtt_client, topic_buffer, sizeof(topic_buffer), &topic_length,
164                                              message_buffer, sizeof(message_buffer), &message_length);
165         if(status == NXD_MQTT_SUCCESS)
166         {
167             topic_buffer[topic_length] = 0;
168             message_buffer[message_length] = 0;
169             printf("topic = %s, message = %s\n", topic_buffer, message_buffer);
170         }
171     }
172 
173     /* Now unsubscribe the topic. */
174     nxd_mqtt_client_unsubscribe(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME));
175 
176     /* Disconnect from the broker. */
177     nxd_mqtt_client_disconnect(&mqtt_client);
178 
179     /* Delete the client instance, release all the resources. */
180     nxd_mqtt_client_delete(&mqtt_client);
181 
182     return;
183 
184 }
185 
186 
187