1 /* MQTT connect test. This test case validates MQTT client connect without username/password. */
2
3 #include "tx_api.h"
4 #include "nx_api.h"
5 #include "nxd_mqtt_client.h"
6 extern void test_control_return(UINT status);
7
8 #if !defined(NXD_MQTT_REQUIRE_TLS) && defined(NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH)
9
10 #define DEMO_STACK_SIZE 2048
11
12 #define CLIENT_ID "1234"
13 #define TOPIC_LEN 256
14 #define MESSAGE_LEN 256
15 #define CLIENT_MEMORY_SIZE 1024
16 #define PACKET_BUFFER_SIZE 1024
17
18 /* Define the ThreadX and NetX object control blocks... */
19
20 static TX_THREAD ntest_0;
21 static TX_THREAD ntest_1;
22
23 static NX_PACKET_POOL pool_0;
24 static NX_IP ip_0;
25 static NX_IP ip_1;
26 static NX_TCP_SOCKET server_socket;
27
28
29 #define NUM_PACKETS 128
30 #define PACKET_SIZE 1536
31 #define PACKET_POOL_SIZE (NUM_PACKETS * (PACKET_SIZE + sizeof(NX_PACKET)))
32
33 static TX_SEMAPHORE semaphore_server_received;
34 static TX_SEMAPHORE semaphore_client_sent;
35 static UCHAR pool_area[PACKET_POOL_SIZE];
36
37 /* Define the counters used in the demo application... */
38
39 static ULONG error_counter;
40
41 /* Define thread prototypes. */
42
43 static void ntest_0_entry(ULONG thread_input);
44 static void ntest_1_entry(ULONG thread_input);
45 extern void _nx_ram_network_driver_1500(struct NX_IP_DRIVER_STRUCT *driver_req);
46
47
48 /* Define what the initial system looks like. */
49 static NXD_MQTT_CLIENT *client_ptr;
50 static UCHAR *client_memory;
51 static CHAR *stack_ptr;
52 static UCHAR *packet_buffer;
53 #ifdef CTEST
test_application_define(void * first_unused_memory)54 VOID test_application_define(void *first_unused_memory)
55 #else
56 void netx_mqtt_client_transmit_queue_depth_application_define(void *first_unused_memory)
57 #endif
58 {
59 CHAR *pointer;
60 UINT status;
61
62 /* Setup the working pointer. */
63 pointer = (CHAR *) first_unused_memory;
64
65 error_counter = 0;
66
67 /* Create the main thread. */
68 tx_thread_create(&ntest_0, "thread 0", ntest_0_entry, 0,
69 pointer, DEMO_STACK_SIZE,
70 4, 4, TX_NO_TIME_SLICE, TX_DONT_START);
71
72 pointer = pointer + DEMO_STACK_SIZE;
73
74 /* Create the main thread. */
75 tx_thread_create(&ntest_1, "thread 1", ntest_1_entry, 0,
76 pointer, DEMO_STACK_SIZE,
77 3, 3, TX_NO_TIME_SLICE, TX_AUTO_START);
78
79 pointer = pointer + DEMO_STACK_SIZE;
80
81 tx_semaphore_create(&semaphore_server_received, "semaphore server received", 0);
82 tx_semaphore_create(&semaphore_client_sent, "semaphore client sent", 0);
83
84 /* Initialize the NetX system. */
85 nx_system_initialize();
86
87 /* Create a packet pool. */
88 status = nx_packet_pool_create(&pool_0, "NetX Main Packet Pool", PACKET_SIZE, pool_area, PACKET_POOL_SIZE);
89
90 if(status)
91 error_counter++;
92
93 /* Create an IP instance. */
94 status = nx_ip_create(&ip_0, "NetX IP Instance 0", IP_ADDRESS(1, 2, 3, 4), 0xFFFFFF00UL, &pool_0, _nx_ram_network_driver_1500,
95 pointer, 2048, 1);
96 pointer = pointer + 2048;
97
98 /* Create another IP instance. */
99 status += nx_ip_create(&ip_1, "NetX IP Instance 1", IP_ADDRESS(1, 2, 3, 5), 0xFFFFFF00UL, &pool_0, _nx_ram_network_driver_1500,
100 pointer, 2048, 1);
101 pointer = pointer + 2048;
102
103 if(status)
104 error_counter++;
105
106 /* Enable ARP and supply ARP cache memory for IP Instance 0. */
107 status = nx_arp_enable(&ip_0, (void *) pointer, 1024);
108 pointer = pointer + 1024;
109
110 /* Enable ARP and supply ARP cache memory for IP Instance 1. */
111 status += nx_arp_enable(&ip_1, (void *) pointer, 1024);
112 pointer = pointer + 1024;
113
114 /* Check ARP enable status. */
115 if(status)
116 error_counter++;
117
118 /* Enable TCP processing for both IP instances. */
119 status = nx_tcp_enable(&ip_0);
120 status += nx_tcp_enable(&ip_1);
121
122 /* Check TCP enable status. */
123 if(status)
124 error_counter++;
125 stack_ptr = pointer;
126 pointer += DEMO_STACK_SIZE;
127
128 packet_buffer = pointer;
129 pointer += PACKET_BUFFER_SIZE;
130
131 client_memory = pointer;
132 pointer += CLIENT_MEMORY_SIZE;
133
134 client_ptr = (NXD_MQTT_CLIENT*)pointer;
135 }
136
137 #define MQTT_CLIENT_THREAD_PRIORITY 2
138 static UINT keepalive_value;
139 static UINT cleansession_value;
140 static UINT QoS;
141 static UINT retain;
142 static UCHAR *topic;
143 static UCHAR *message;
144
145 /* Define the test threads. */
146 /* This thread sets up MQTT client and makes a connect request without username/password. */
ntest_0_entry(ULONG thread_input)147 static void ntest_0_entry(ULONG thread_input)
148 {
149 UINT status;
150 NXD_ADDRESS server_address;
151 UINT i;
152
153 /* Print out test information banner. */
154 printf("NetX Test: MQTT Transmit Queue Depth Test............................");
155
156 /* Check for earlier error. */
157 if(error_counter)
158 {
159 printf("ERROR!\n");
160 test_control_return(1);
161 }
162
163 status = nxd_mqtt_client_create(client_ptr, "my client", CLIENT_ID, strlen(CLIENT_ID), &ip_0, &pool_0,
164 stack_ptr, DEMO_STACK_SIZE, MQTT_CLIENT_THREAD_PRIORITY, client_memory, CLIENT_MEMORY_SIZE);
165 if(status)
166 error_counter++;
167
168 server_address.nxd_ip_version = 4;
169 server_address.nxd_ip_address.v4 = IP_ADDRESS(1, 2, 3, 5);
170 keepalive_value = 0;
171 cleansession_value = 0;
172
173 topic = packet_buffer;
174 message = packet_buffer + TOPIC_LEN;
175
176 for (i = 0; i < TOPIC_LEN; i++)
177 {
178 topic[i] = i & 0xff;
179 message[i] = 0xff - (i & 0xff);
180 }
181
182 status = nxd_mqtt_client_connect(client_ptr, &server_address, NXD_MQTT_PORT,
183 keepalive_value, cleansession_value, NX_IP_PERIODIC_RATE);
184 if (status)
185 error_counter++;
186
187 tx_thread_sleep(NX_IP_PERIODIC_RATE);
188
189 QoS = 1;
190 retain = 0;
191
192 /* Send telemetry to fill the transmit queue. */
193 for (i = 0 ; i < NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH; i++)
194 {
195
196 /* Send telemetry */
197 status = nxd_mqtt_client_publish(client_ptr, topic, TOPIC_LEN, message, MESSAGE_LEN, retain, QoS, 5 * NX_IP_PERIODIC_RATE);
198 if (status)
199 error_counter++;
200 }
201
202 /* Send telemetry */
203 status = nxd_mqtt_client_publish(client_ptr, topic, TOPIC_LEN, message, MESSAGE_LEN, retain, QoS, 5 * NX_IP_PERIODIC_RATE);
204 if (status == NXD_MQTT_SUCCESS)
205 error_counter++;
206
207 tx_semaphore_put(&semaphore_client_sent);
208
209 tx_semaphore_get(&semaphore_server_received, NX_WAIT_FOREVER);
210
211 nxd_mqtt_client_disconnect(client_ptr);
212
213 nxd_mqtt_client_delete(client_ptr);
214
215 if (pool_0.nx_packet_pool_available != pool_0.nx_packet_pool_total)
216 error_counter++;
217
218 /* Determine if the test was successful. */
219 if (error_counter)
220 {
221 printf("ERROR!\n");
222 test_control_return(1);
223 }
224 else
225 {
226 printf("SUCCESS!\n");
227 test_control_return(0);
228 }
229 }
230
231 /* This thread acts as MQTT server, accepting the connection. */
ntest_1_entry(ULONG thread_input)232 static void ntest_1_entry(ULONG thread_input)
233 {
234 UINT status;
235 ULONG actual_status;
236 NX_PACKET *packet_ptr;
237 UCHAR *byte;
238 USHORT packet_id;
239 UINT len, packet_len;
240 UCHAR control_header;
241 UINT i;
242
243 /* Ensure the IP instance has been initialized. */
244 status = nx_ip_status_check(&ip_1, NX_IP_INITIALIZE_DONE, &actual_status, NX_IP_PERIODIC_RATE);
245
246 /* Check for error. */
247 if(status)
248 error_counter++;
249
250 /* Create a socket. */
251 status = nx_tcp_socket_create(&ip_1, &server_socket, "Server Socket",
252 NX_IP_NORMAL, NX_FRAGMENT_OKAY, NX_IP_TIME_TO_LIVE, 65535,
253 NX_NULL, NX_NULL);
254
255 /* Check for error. */
256 if(status)
257 error_counter++;
258
259 /* Setup this thread to listen. */
260 status = nx_tcp_server_socket_listen(&ip_1, NXD_MQTT_PORT, &server_socket, 5, NX_NULL);
261
262 /* Check for error. */
263 if(status)
264 error_counter++;
265
266 tx_thread_resume(&ntest_0);
267
268 /* Accept a connection from client socket. */
269 status = nx_tcp_server_socket_accept(&server_socket, NX_IP_PERIODIC_RATE);
270
271 /* Check for error. */
272 if (status)
273 error_counter++;
274
275 tx_thread_sleep(1);
276
277 status = nx_tcp_socket_receive(&server_socket, &packet_ptr, 5 * NX_IP_PERIODIC_RATE);
278
279 if (status)
280 {
281 error_counter++;
282 }
283 else
284 {
285
286 /* Response with Connect SUCCESS */
287 byte = packet_ptr->nx_packet_prepend_ptr;
288 byte[0] = 0x20;
289 byte[1] = 0x02;
290 byte[2] = 0;
291 byte[3] = 0;
292
293 packet_ptr->nx_packet_append_ptr = packet_ptr->nx_packet_prepend_ptr + 4;
294 packet_ptr->nx_packet_length = 4;
295
296 status = nx_tcp_socket_send(&server_socket, packet_ptr, 1 * NX_IP_PERIODIC_RATE);
297 if (status)
298 error_counter++;
299
300 packet_ptr = NX_NULL;
301 packet_len = 0;
302
303 tx_semaphore_get(&semaphore_client_sent, NX_WAIT_FOREVER);
304
305 while (1)
306 {
307 status = nx_tcp_socket_receive(&server_socket, &packet_ptr, NX_IP_PERIODIC_RATE);
308
309 if (status)
310 break;
311
312 nx_packet_release(packet_ptr);
313 }
314
315 tx_semaphore_put(&semaphore_server_received);
316
317 /* Disconnect. */
318 status = nx_tcp_socket_disconnect(&server_socket, NX_IP_PERIODIC_RATE);
319
320 /* Check for error. */
321 if (status)
322 error_counter++;
323 }
324
325 /* Unaccept the server socket. */
326 status = nx_tcp_server_socket_unaccept(&server_socket);
327
328 /* Check for error. */
329 if (status)
330 error_counter++;
331
332 /* Unlisten on the server port. */
333 status = nx_tcp_server_socket_unlisten(&ip_1, NXD_MQTT_PORT);
334
335 /* Check for error. */
336 if (status)
337 error_counter++;
338
339 /* Delete the socket. */
340 status = nx_tcp_socket_delete(&server_socket);
341
342 /* Check for error. */
343 if(status)
344 error_counter++;
345 }
346 #else
347
348 #ifdef CTEST
test_application_define(void * first_unused_memory)349 VOID test_application_define(void *first_unused_memory)
350 #else
351 void netx_mqtt_client_transmit_queue_depth_application_define(void *first_unused_memory)
352 #endif
353 {
354
355 /* Print out test information banner. */
356 printf("NetX Test: MQTT Transmit Queue Depth Test............................N/A\n");
357
358 test_control_return(3);
359 }
360 #endif
361