1 /**************************************************************************/
2 /*                                                                        */
3 /*       Copyright (c) Microsoft Corporation. All rights reserved.        */
4 /*                                                                        */
5 /*       This software is licensed under the Microsoft Software License   */
6 /*       Terms for Microsoft Azure RTOS. Full text of the license can be  */
7 /*       found in the LICENSE file at https://aka.ms/AzureRTOS_EULA       */
8 /*       and in the root directory of this software.                      */
9 /*                                                                        */
10 /**************************************************************************/
11 
12 /*
13 
14    This is a small demonstration of the high-performance NetX TCP/IP stack
15    MQTT client.
16 
17    This demo program establishes a connection to the Mosquitto server at IP address
18    10.0.10.1.  It subscribes to a topic, send a message to the same topic.
19    The application shall be able to receive the same message from the broker.
20 
21    This demo assumes that the NetX Duo TCP/IP stack has been properly configured,
22    with TCP module enabled.   The entry function is "thread_mqtt_entry".  Caller
23    needs to pass in the IP instance and an instance of a valid packet pool.
24 */
25 
26 #include "nx_api.h"
27 #include "nxd_mqtt_client.h"
28 
29 /* MQTT Demo defines */
30 
31 /* IP Address of the local server. */
32 #define  LOCAL_SERVER_ADDRESS (IP_ADDRESS(192, 168, 1, 1))
33 
34 
35 /*****************************************************************************************/
36 /* MQTT Local Server IoT Client example.                                                 */
37 /*****************************************************************************************/
38 
39 #define  DEMO_STACK_SIZE            2048
40 #define  CLIENT_ID_STRING           "mytestclient"
41 #define  MQTT_CLIENT_STACK_SIZE     4096
42 
43 #define  STRLEN(p)                  (sizeof(p) - 1)
44 
45 
46 /* Declare the MQTT thread stack space. */
47 static ULONG                        mqtt_client_stack[MQTT_CLIENT_STACK_SIZE / sizeof(ULONG)];
48 
49 /* Declare the MQTT client control block. */
50 static NXD_MQTT_CLIENT              mqtt_client;
51 
52 /* Define the symbol for signaling a received message. */
53 
54 
55 /* Define the test threads.  */
56 #define TOPIC_NAME                  "topic"
57 #define MESSAGE_STRING              "This is a message. "
58 
59 /* Define the priority of the MQTT internal thread. */
60 #define MQTT_THREAD_PRIORTY         2
61 
62 /* Define the MQTT keep alive timer for 5 minutes */
63 #define MQTT_KEEP_ALIVE_TIMER       300
64 
65 #define QOS0                        0
66 #define QOS1                        1
67 
68 /* Declare event flag, which is used in this demo. */
69 TX_EVENT_FLAGS_GROUP                mqtt_app_flag;
70 #define DEMO_MESSAGE_EVENT          1
71 #define DEMO_ALL_EVENTS             3
72 
73 /* Declare buffers to hold message and topic. */
74 static UCHAR message_buffer[NXD_MQTT_MAX_MESSAGE_LENGTH];
75 static UCHAR topic_buffer[NXD_MQTT_MAX_TOPIC_NAME_LENGTH];
76 
77 /* Declare the disconnect notify function. */
my_disconnect_func(NXD_MQTT_CLIENT * client_ptr)78 static VOID my_disconnect_func(NXD_MQTT_CLIENT *client_ptr)
79 {
80     NX_PARAMETER_NOT_USED(client_ptr);
81     printf("client disconnected from server\n");
82 }
83 
84 
my_notify_func(NXD_MQTT_CLIENT * client_ptr,UINT number_of_messages)85 static VOID my_notify_func(NXD_MQTT_CLIENT* client_ptr, UINT number_of_messages)
86 {
87     NX_PARAMETER_NOT_USED(client_ptr);
88     NX_PARAMETER_NOT_USED(number_of_messages);
89     tx_event_flags_set(&mqtt_app_flag, DEMO_MESSAGE_EVENT, TX_OR);
90     return;
91 
92 }
93 
94 void thread_mqtt_entry(NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr);
95 
96 static ULONG    error_counter;
thread_mqtt_entry(NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr)97 void thread_mqtt_entry(NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr)
98 {
99 UINT status;
100 NXD_ADDRESS server_ip;
101 ULONG events;
102 UINT topic_length, message_length;
103 
104     /* Create MQTT client instance. */
105     status = nxd_mqtt_client_create(&mqtt_client, "my_client", CLIENT_ID_STRING, STRLEN(CLIENT_ID_STRING),
106                                     ip_ptr, pool_ptr, (VOID*)mqtt_client_stack, sizeof(mqtt_client_stack),
107                                     MQTT_THREAD_PRIORTY, NX_NULL, 0);
108 
109     if (status)
110     {
111         printf("Error in creating MQTT client: 0x%02x\n", status);
112         error_counter++;
113     }
114 
115 #ifdef NXD_MQTT_OVER_WEBSOCKET
116     status = nxd_mqtt_client_websocket_set(&mqtt_client, (UCHAR *)"test.mosquitto.org", sizeof("test.mosquitto.org") - 1,
117                                            (UCHAR *)"/mqtt", sizeof("/mqtt") - 1);
118     if (status)
119     {
120         printf("Error in setting MQTT over WebSocket: 0x%02x\r\n", status);
121         error_counter++;
122     }
123 #endif /* NXD_MQTT_OVER_WEBSOCKET */
124 
125     /* Register the disconnect notification function. */
126     nxd_mqtt_client_disconnect_notify_set(&mqtt_client, my_disconnect_func);
127 
128     /* Create an event flag for this demo. */
129     status = tx_event_flags_create(&mqtt_app_flag, "my app event");
130     if(status)
131         error_counter++;
132 
133 
134     server_ip.nxd_ip_version = 4;
135     server_ip.nxd_ip_address.v4 = LOCAL_SERVER_ADDRESS;
136 
137 
138     /* Start the connection to the server. */
139     status = nxd_mqtt_client_connect(&mqtt_client, &server_ip, NXD_MQTT_PORT,
140                                      MQTT_KEEP_ALIVE_TIMER, 0, NX_WAIT_FOREVER);
141     if(status)
142         error_counter++;
143 
144     /* Subscribe to the topic with QoS level 0. */
145     status = nxd_mqtt_client_subscribe(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME), QOS0);
146     if(status)
147         error_counter++;
148 
149     /* Set the receive notify function. */
150     status = nxd_mqtt_client_receive_notify_set(&mqtt_client, my_notify_func);
151     if(status)
152         error_counter++;
153 
154     /* Publish a message with QoS Level 1. */
155     status = nxd_mqtt_client_publish(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME),
156                                      (CHAR*)MESSAGE_STRING, STRLEN(MESSAGE_STRING), 0, QOS1, NX_WAIT_FOREVER);
157 
158 
159     /* Now wait for the broker to publish the message. */
160 
161     tx_event_flags_get(&mqtt_app_flag, DEMO_ALL_EVENTS, TX_OR_CLEAR, &events, TX_WAIT_FOREVER);
162     if(events & DEMO_MESSAGE_EVENT)
163     {
164         status = nxd_mqtt_client_message_get(&mqtt_client, topic_buffer, sizeof(topic_buffer), &topic_length,
165                                              message_buffer, sizeof(message_buffer), &message_length);
166         if(status == NXD_MQTT_SUCCESS)
167         {
168             topic_buffer[topic_length] = 0;
169             message_buffer[message_length] = 0;
170             printf("topic = %s, message = %s\n", topic_buffer, message_buffer);
171         }
172     }
173 
174     /* Now unsubscribe the topic. */
175     nxd_mqtt_client_unsubscribe(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME));
176 
177     /* Disconnect from the broker. */
178     nxd_mqtt_client_disconnect(&mqtt_client);
179 
180     /* Delete the client instance, release all the resources. */
181     nxd_mqtt_client_delete(&mqtt_client);
182 
183     return;
184 
185 }
186 
187 
188