1 /***************************************************************************
2 * Copyright (c) 2024 Microsoft Corporation
3 *
4 * This program and the accompanying materials are made available under the
5 * terms of the MIT License which is available at
6 * https://opensource.org/licenses/MIT.
7 *
8 * SPDX-License-Identifier: MIT
9 **************************************************************************/
10
11 /*
12
13 This is a small demonstration of the high-performance NetX TCP/IP stack
14 MQTT client.
15
16 This demo program establishes a connection to the Mosquitto server at IP address
17 10.0.10.1. It subscribes to a topic, send a message to the same topic.
18 The application shall be able to receive the same message from the broker.
19
20 This demo assumes that the NetX Duo TCP/IP stack has been properly configured,
21 with TCP module enabled. The entry function is "thread_mqtt_entry". Caller
22 needs to pass in the IP instance and an instance of a valid packet pool.
23 */
24
25 #include "nx_api.h"
26 #include "nxd_mqtt_client.h"
27
28 /* MQTT Demo defines */
29
30 /* IP Address of the local server. */
31 #define LOCAL_SERVER_ADDRESS (IP_ADDRESS(192, 168, 1, 1))
32
33
34 /*****************************************************************************************/
35 /* MQTT Local Server IoT Client example. */
36 /*****************************************************************************************/
37
38 #define DEMO_STACK_SIZE 2048
39 #define CLIENT_ID_STRING "mytestclient"
40 #define MQTT_CLIENT_STACK_SIZE 4096
41
42 #define STRLEN(p) (sizeof(p) - 1)
43
44
45 /* Declare the MQTT thread stack space. */
46 static ULONG mqtt_client_stack[MQTT_CLIENT_STACK_SIZE / sizeof(ULONG)];
47
48 /* Declare the MQTT client control block. */
49 static NXD_MQTT_CLIENT mqtt_client;
50
51 /* Define the symbol for signaling a received message. */
52
53
54 /* Define the test threads. */
55 #define TOPIC_NAME "topic"
56 #define MESSAGE_STRING "This is a message. "
57
58 /* Define the priority of the MQTT internal thread. */
59 #define MQTT_THREAD_PRIORTY 2
60
61 /* Define the MQTT keep alive timer for 5 minutes */
62 #define MQTT_KEEP_ALIVE_TIMER 300
63
64 #define QOS0 0
65 #define QOS1 1
66
67 /* Declare event flag, which is used in this demo. */
68 TX_EVENT_FLAGS_GROUP mqtt_app_flag;
69 #define DEMO_MESSAGE_EVENT 1
70 #define DEMO_ALL_EVENTS 3
71
72 /* Declare buffers to hold message and topic. */
73 static UCHAR message_buffer[NXD_MQTT_MAX_MESSAGE_LENGTH];
74 static UCHAR topic_buffer[NXD_MQTT_MAX_TOPIC_NAME_LENGTH];
75
76 /* Declare the disconnect notify function. */
my_disconnect_func(NXD_MQTT_CLIENT * client_ptr)77 static VOID my_disconnect_func(NXD_MQTT_CLIENT *client_ptr)
78 {
79 NX_PARAMETER_NOT_USED(client_ptr);
80 printf("client disconnected from server\n");
81 }
82
83
my_notify_func(NXD_MQTT_CLIENT * client_ptr,UINT number_of_messages)84 static VOID my_notify_func(NXD_MQTT_CLIENT* client_ptr, UINT number_of_messages)
85 {
86 NX_PARAMETER_NOT_USED(client_ptr);
87 NX_PARAMETER_NOT_USED(number_of_messages);
88 tx_event_flags_set(&mqtt_app_flag, DEMO_MESSAGE_EVENT, TX_OR);
89 return;
90
91 }
92
93 void thread_mqtt_entry(NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr);
94
95 static ULONG error_counter;
thread_mqtt_entry(NX_IP * ip_ptr,NX_PACKET_POOL * pool_ptr)96 void thread_mqtt_entry(NX_IP *ip_ptr, NX_PACKET_POOL *pool_ptr)
97 {
98 UINT status;
99 NXD_ADDRESS server_ip;
100 ULONG events;
101 UINT topic_length, message_length;
102
103 /* Create MQTT client instance. */
104 status = nxd_mqtt_client_create(&mqtt_client, "my_client", CLIENT_ID_STRING, STRLEN(CLIENT_ID_STRING),
105 ip_ptr, pool_ptr, (VOID*)mqtt_client_stack, sizeof(mqtt_client_stack),
106 MQTT_THREAD_PRIORTY, NX_NULL, 0);
107
108 if (status)
109 {
110 printf("Error in creating MQTT client: 0x%02x\n", status);
111 error_counter++;
112 }
113
114 #ifdef NXD_MQTT_OVER_WEBSOCKET
115 status = nxd_mqtt_client_websocket_set(&mqtt_client, (UCHAR *)"test.mosquitto.org", sizeof("test.mosquitto.org") - 1,
116 (UCHAR *)"/mqtt", sizeof("/mqtt") - 1);
117 if (status)
118 {
119 printf("Error in setting MQTT over WebSocket: 0x%02x\r\n", status);
120 error_counter++;
121 }
122 #endif /* NXD_MQTT_OVER_WEBSOCKET */
123
124 /* Register the disconnect notification function. */
125 nxd_mqtt_client_disconnect_notify_set(&mqtt_client, my_disconnect_func);
126
127 /* Create an event flag for this demo. */
128 status = tx_event_flags_create(&mqtt_app_flag, "my app event");
129 if(status)
130 error_counter++;
131
132
133 server_ip.nxd_ip_version = 4;
134 server_ip.nxd_ip_address.v4 = LOCAL_SERVER_ADDRESS;
135
136
137 /* Start the connection to the server. */
138 status = nxd_mqtt_client_connect(&mqtt_client, &server_ip, NXD_MQTT_PORT,
139 MQTT_KEEP_ALIVE_TIMER, 0, NX_WAIT_FOREVER);
140 if(status)
141 error_counter++;
142
143 /* Subscribe to the topic with QoS level 0. */
144 status = nxd_mqtt_client_subscribe(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME), QOS0);
145 if(status)
146 error_counter++;
147
148 /* Set the receive notify function. */
149 status = nxd_mqtt_client_receive_notify_set(&mqtt_client, my_notify_func);
150 if(status)
151 error_counter++;
152
153 /* Publish a message with QoS Level 1. */
154 status = nxd_mqtt_client_publish(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME),
155 (CHAR*)MESSAGE_STRING, STRLEN(MESSAGE_STRING), 0, QOS1, NX_WAIT_FOREVER);
156
157
158 /* Now wait for the broker to publish the message. */
159
160 tx_event_flags_get(&mqtt_app_flag, DEMO_ALL_EVENTS, TX_OR_CLEAR, &events, TX_WAIT_FOREVER);
161 if(events & DEMO_MESSAGE_EVENT)
162 {
163 status = nxd_mqtt_client_message_get(&mqtt_client, topic_buffer, sizeof(topic_buffer), &topic_length,
164 message_buffer, sizeof(message_buffer), &message_length);
165 if(status == NXD_MQTT_SUCCESS)
166 {
167 topic_buffer[topic_length] = 0;
168 message_buffer[message_length] = 0;
169 printf("topic = %s, message = %s\n", topic_buffer, message_buffer);
170 }
171 }
172
173 /* Now unsubscribe the topic. */
174 nxd_mqtt_client_unsubscribe(&mqtt_client, TOPIC_NAME, STRLEN(TOPIC_NAME));
175
176 /* Disconnect from the broker. */
177 nxd_mqtt_client_disconnect(&mqtt_client);
178
179 /* Delete the client instance, release all the resources. */
180 nxd_mqtt_client_delete(&mqtt_client);
181
182 return;
183
184 }
185
186
187