1 /**************************************************************************/
2 /* */
3 /* Copyright (c) Microsoft Corporation. All rights reserved. */
4 /* */
5 /* This software is licensed under the Microsoft Software License */
6 /* Terms for Microsoft Azure RTOS. Full text of the license can be */
7 /* found in the LICENSE file at https://aka.ms/AzureRTOS_EULA */
8 /* and in the root directory of this software. */
9 /* */
10 /**************************************************************************/
11
12 /*
13
14 This is a small demonstration of the high-performance NetX TCP/IP stack
15 MQTT client.
16
17 This demo program establishes a connection to the Mosquitto server at IP address
18 10.0.10.1. It subscribes to a topic, send a message to the same topic.
19 The application shall be able to receive the same message from the broker.
20
21 This demo assumes that the NetX Duo TCP/IP stack has been properly configured,
22 with TCP module enabled. The entry function is "thread_mqtt_entry". Caller
23 needs to pass in the IP instance and an instance of a valid packet pool.
24 */
25
26 #include "nx_api.h"
27 #include "nxd_mqtt_client.h"
28
29 /* MQTT Demo defines */
30
31 /* IP Address of the local server. */
32 #define LOCAL_SERVER_ADDRESS (IP_ADDRESS(192, 168, 1, 1))
33
34
35 /*****************************************************************************************/
36 /* MQTT Local Server IoT Client example. */
37 /*****************************************************************************************/
38
39 #define DEMO_STACK_SIZE 2048
40 #define CLIENT_ID_STRING "mytestclient"
41 #define MQTT_CLIENT_STACK_SIZE 4096
42
43 #define STRLEN(p) (sizeof(p) - 1)
44
45
46 /* Declare the MQTT thread stack space. */
47 static ULONG mqtt_client_stack[MQTT_CLIENT_STACK_SIZE / sizeof(ULONG)];
48
49 /* Declare the MQTT client control block. */
50 static NXD_MQTT_CLIENT mqtt_client;
51
52 /* Define the symbol for signaling a received message. */
53
54
55 /* Define the test threads. */
56 #define TOPIC_NAME "topic"
57 #define MESSAGE_STRING "This is a message. "
58
59 /* Define the priority of the MQTT internal thread. */
60 #define MQTT_THREAD_PRIORTY 2
61
62 /* Define the MQTT keep alive timer for 5 minutes */
63 #define MQTT_KEEP_ALIVE_TIMER 300
64
65 #define QOS0 0
66 #define QOS1 1
67
68 /* Declare event flag, which is used in this demo. */
69 TX_EVENT_FLAGS_GROUP mqtt_app_flag;
70 #define DEMO_MESSAGE_EVENT 1
71 #define DEMO_ALL_EVENTS 3
72
73 /* Declare buffers to hold message and topic. */
74 static UCHAR message_buffer[NXD_MQTT_MAX_MESSAGE_LENGTH];
75 static UCHAR topic_buffer[NXD_MQTT_MAX_TOPIC_NAME_LENGTH];
76
77 /* Declare the disconnect notify function. */
my_disconnect_func(NXD_MQTT_CLIENT * client_ptr)78 static VOID my_disconnect_func(NXD_MQTT_CLIENT *client_ptr)
79 {
80 NX_PARAMETER_NOT_USED(client_ptr);
81 printf("client disconnected from server\n");
82 }
83
84
my_notify_func(NXD_MQTT_CLIENT * client_ptr,UINT number_of_messages)85 static VOID my_notify_func(NXD_MQTT_CLIENT* client_ptr, UINT number_of_messages)
86 {
87 NX_PARAMETER_NOT_USED(client_ptr);
88 NX_PARAMETER_NOT_USED(number_of_messages);
89 tx_event_flags_set(&mqtt_app_flag, DEMO_MESSAGE_EVENT, TX_OR);
90 return;
91
92 }
93
94 void thread_mqtt_entry(NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr);
95
96 static ULONG error_counter;
thread_mqtt_entry(NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr)97 void thread_mqtt_entry(NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr)
98 {
99 UINT status;
100 NXD_ADDRESS server_ip;
101 ULONG events;
102 UINT topic_length, message_length;
103
104 /* Create MQTT client instance. */
105 status = nxd_mqtt_client_create(&mqtt_client, "my_client", CLIENT_ID_STRING, STRLEN(CLIENT_ID_STRING),
106 ip_ptr, pool_ptr, (VOID*)mqtt_client_stack, sizeof(mqtt_client_stack),
107 MQTT_THREAD_PRIORTY, NX_NULL, 0);
108
109 if (status)
110 {
111 printf("Error in creating MQTT client: 0x%02x\n", status);
112 error_counter++;
113 }
114
115 #ifdef NXD_MQTT_OVER_WEBSOCKET
116 status = nxd_mqtt_client_websocket_set(&mqtt_client, (UCHAR *)"test.mosquitto.org", sizeof("test.mosquitto.org") - 1,
117 (UCHAR *)"/mqtt", sizeof("/mqtt") - 1);
118 if (status)
119 {
120 printf("Error in setting MQTT over WebSocket: 0x%02x\r\n", status);
121 error_counter++;
122 }
123 #endif /* NXD_MQTT_OVER_WEBSOCKET */
124
125 /* Register the disconnect notification function. */
126 nxd_mqtt_client_disconnect_notify_set(&mqtt_client, my_disconnect_func);
127
128 /* Create an event flag for this demo. */
129 status = tx_event_flags_create(&mqtt_app_flag, "my app event");
130 if(status)
131 error_counter++;
132
133
134 server_ip.nxd_ip_version = 4;
135 server_ip.nxd_ip_address.v4 = LOCAL_SERVER_ADDRESS;
136
137
138 /* Start the connection to the server. */
139 status = nxd_mqtt_client_connect(&mqtt_client, &server_ip, NXD_MQTT_PORT,
140 MQTT_KEEP_ALIVE_TIMER, 0, NX_WAIT_FOREVER);
141 if(status)
142 error_counter++;
143
144 /* Subscribe to the topic with QoS level 0. */
145 status = nxd_mqtt_client_subscribe(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME), QOS0);
146 if(status)
147 error_counter++;
148
149 /* Set the receive notify function. */
150 status = nxd_mqtt_client_receive_notify_set(&mqtt_client, my_notify_func);
151 if(status)
152 error_counter++;
153
154 /* Publish a message with QoS Level 1. */
155 status = nxd_mqtt_client_publish(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME),
156 (CHAR*)MESSAGE_STRING, STRLEN(MESSAGE_STRING), 0, QOS1, NX_WAIT_FOREVER);
157
158
159 /* Now wait for the broker to publish the message. */
160
161 tx_event_flags_get(&mqtt_app_flag, DEMO_ALL_EVENTS, TX_OR_CLEAR, &events, TX_WAIT_FOREVER);
162 if(events & DEMO_MESSAGE_EVENT)
163 {
164 status = nxd_mqtt_client_message_get(&mqtt_client, topic_buffer, sizeof(topic_buffer), &topic_length,
165 message_buffer, sizeof(message_buffer), &message_length);
166 if(status == NXD_MQTT_SUCCESS)
167 {
168 topic_buffer[topic_length] = 0;
169 message_buffer[message_length] = 0;
170 printf("topic = %s, message = %s\n", topic_buffer, message_buffer);
171 }
172 }
173
174 /* Now unsubscribe the topic. */
175 nxd_mqtt_client_unsubscribe(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME));
176
177 /* Disconnect from the broker. */
178 nxd_mqtt_client_disconnect(&mqtt_client);
179
180 /* Delete the client instance, release all the resources. */
181 nxd_mqtt_client_delete(&mqtt_client);
182
183 return;
184
185 }
186
187
188