1 /* MQTT connect test.  This test case validates MQTT client connect without username/password. */
2 
3 #include   "tx_api.h"
4 #include   "nx_api.h"
5 #include   "nxd_mqtt_client.h"
6 extern void    test_control_return(UINT status);
7 
8 #if !defined(NXD_MQTT_REQUIRE_TLS) && defined(NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH)
9 
10 #define     DEMO_STACK_SIZE    2048
11 
12 #define CLIENT_ID "1234"
13 #define TOPIC_LEN 256
14 #define MESSAGE_LEN 256
15 #define CLIENT_MEMORY_SIZE 1024
16 #define PACKET_BUFFER_SIZE 1024
17 
18 /* Define the ThreadX and NetX object control blocks...  */
19 
20 static TX_THREAD               ntest_0;
21 static TX_THREAD               ntest_1;
22 
23 static NX_PACKET_POOL          pool_0;
24 static NX_IP                   ip_0;
25 static NX_IP                   ip_1;
26 static NX_TCP_SOCKET           server_socket;
27 
28 
29 #define NUM_PACKETS                 128
30 #define PACKET_SIZE                 1536
31 #define PACKET_POOL_SIZE            (NUM_PACKETS * (PACKET_SIZE + sizeof(NX_PACKET)))
32 
33 static TX_SEMAPHORE semaphore_server_received;
34 static TX_SEMAPHORE semaphore_client_sent;
35 static UCHAR pool_area[PACKET_POOL_SIZE];
36 
37 /* Define the counters used in the demo application...  */
38 
39 static ULONG                   error_counter;
40 
41 /* Define thread prototypes.  */
42 
43 static void    ntest_0_entry(ULONG thread_input);
44 static void    ntest_1_entry(ULONG thread_input);
45 extern void    _nx_ram_network_driver_1500(struct NX_IP_DRIVER_STRUCT *driver_req);
46 
47 
48 /* Define what the initial system looks like.  */
49 static NXD_MQTT_CLIENT *client_ptr;
50 static UCHAR *client_memory;
51 static CHAR *stack_ptr;
52 static UCHAR *packet_buffer;
53 #ifdef CTEST
test_application_define(void * first_unused_memory)54 VOID test_application_define(void *first_unused_memory)
55 #else
56 void       netx_mqtt_client_transmit_queue_depth_application_define(void *first_unused_memory)
57 #endif
58 {
59 CHAR       *pointer;
60 UINT       status;
61 
62     /* Setup the working pointer.  */
63     pointer = (CHAR *) first_unused_memory;
64 
65     error_counter = 0;
66 
67     /* Create the main thread.  */
68     tx_thread_create(&ntest_0, "thread 0", ntest_0_entry, 0,
69                      pointer, DEMO_STACK_SIZE,
70                      4, 4, TX_NO_TIME_SLICE, TX_DONT_START);
71 
72     pointer = pointer + DEMO_STACK_SIZE;
73 
74     /* Create the main thread.  */
75     tx_thread_create(&ntest_1, "thread 1", ntest_1_entry, 0,
76                      pointer, DEMO_STACK_SIZE,
77                      3, 3, TX_NO_TIME_SLICE, TX_AUTO_START);
78 
79     pointer = pointer + DEMO_STACK_SIZE;
80 
81     tx_semaphore_create(&semaphore_server_received, "semaphore server received", 0);
82     tx_semaphore_create(&semaphore_client_sent, "semaphore client sent", 0);
83 
84     /* Initialize the NetX system.  */
85     nx_system_initialize();
86 
87     /* Create a packet pool.  */
88     status = nx_packet_pool_create(&pool_0, "NetX Main Packet Pool", PACKET_SIZE, pool_area, PACKET_POOL_SIZE);
89 
90     if(status)
91         error_counter++;
92 
93     /* Create an IP instance.  */
94     status = nx_ip_create(&ip_0, "NetX IP Instance 0", IP_ADDRESS(1, 2, 3, 4), 0xFFFFFF00UL, &pool_0, _nx_ram_network_driver_1500,
95                           pointer, 2048, 1);
96     pointer = pointer + 2048;
97 
98     /* Create another IP instance.  */
99     status += nx_ip_create(&ip_1, "NetX IP Instance 1", IP_ADDRESS(1, 2, 3, 5), 0xFFFFFF00UL, &pool_0, _nx_ram_network_driver_1500,
100                            pointer, 2048, 1);
101     pointer = pointer + 2048;
102 
103     if(status)
104         error_counter++;
105 
106     /* Enable ARP and supply ARP cache memory for IP Instance 0.  */
107     status = nx_arp_enable(&ip_0, (void *) pointer, 1024);
108     pointer = pointer + 1024;
109 
110     /* Enable ARP and supply ARP cache memory for IP Instance 1.  */
111     status += nx_arp_enable(&ip_1, (void *) pointer, 1024);
112     pointer = pointer + 1024;
113 
114     /* Check ARP enable status.  */
115     if(status)
116         error_counter++;
117 
118     /* Enable TCP processing for both IP instances.  */
119     status = nx_tcp_enable(&ip_0);
120     status += nx_tcp_enable(&ip_1);
121 
122     /* Check TCP enable status.  */
123     if(status)
124         error_counter++;
125     stack_ptr = pointer;
126     pointer += DEMO_STACK_SIZE;
127 
128     packet_buffer = pointer;
129     pointer += PACKET_BUFFER_SIZE;
130 
131     client_memory = pointer;
132     pointer += CLIENT_MEMORY_SIZE;
133 
134     client_ptr = (NXD_MQTT_CLIENT*)pointer;
135 }
136 
137 #define MQTT_CLIENT_THREAD_PRIORITY  2
138 static UINT keepalive_value;
139 static UINT cleansession_value;
140 static UINT QoS;
141 static UINT retain;
142 static UCHAR *topic;
143 static UCHAR *message;
144 
145 /* Define the test threads.  */
146 /* This thread sets up MQTT client and makes a connect request without username/password. */
ntest_0_entry(ULONG thread_input)147 static void    ntest_0_entry(ULONG thread_input)
148 {
149 UINT       status;
150 NXD_ADDRESS server_address;
151 UINT        i;
152 
153     /* Print out test information banner.  */
154     printf("NetX Test:   MQTT Transmit Queue Depth Test............................");
155 
156     /* Check for earlier error.  */
157     if(error_counter)
158     {
159         printf("ERROR!\n");
160         test_control_return(1);
161     }
162 
163     status = nxd_mqtt_client_create(client_ptr, "my client", CLIENT_ID, strlen(CLIENT_ID), &ip_0, &pool_0,
164                                     stack_ptr, DEMO_STACK_SIZE, MQTT_CLIENT_THREAD_PRIORITY, client_memory, CLIENT_MEMORY_SIZE);
165     if(status)
166         error_counter++;
167 
168     server_address.nxd_ip_version = 4;
169     server_address.nxd_ip_address.v4 = IP_ADDRESS(1, 2, 3, 5);
170     keepalive_value = 0;
171     cleansession_value = 0;
172 
173     topic = packet_buffer;
174     message =  packet_buffer + TOPIC_LEN;
175 
176     for (i = 0; i < TOPIC_LEN; i++)
177     {
178         topic[i] = i & 0xff;
179         message[i] = 0xff - (i & 0xff);
180     }
181 
182     status = nxd_mqtt_client_connect(client_ptr, &server_address, NXD_MQTT_PORT,
183                                      keepalive_value, cleansession_value, NX_IP_PERIODIC_RATE);
184     if (status)
185         error_counter++;
186 
187     tx_thread_sleep(NX_IP_PERIODIC_RATE);
188 
189     QoS = 1;
190     retain = 0;
191 
192     /* Send telemetry to fill the transmit queue. */
193     for (i = 0 ; i < NXD_MQTT_MAXIMUM_TRANSMIT_QUEUE_DEPTH; i++)
194     {
195 
196         /* Send telemetry */
197         status = nxd_mqtt_client_publish(client_ptr, topic, TOPIC_LEN, message, MESSAGE_LEN, retain, QoS, 5 * NX_IP_PERIODIC_RATE);
198         if (status)
199             error_counter++;
200     }
201 
202     /* Send telemetry */
203     status = nxd_mqtt_client_publish(client_ptr, topic, TOPIC_LEN, message, MESSAGE_LEN, retain, QoS, 5 * NX_IP_PERIODIC_RATE);
204     if (status == NXD_MQTT_SUCCESS)
205         error_counter++;
206 
207     tx_semaphore_put(&semaphore_client_sent);
208 
209     tx_semaphore_get(&semaphore_server_received, NX_WAIT_FOREVER);
210 
211     nxd_mqtt_client_disconnect(client_ptr);
212 
213     nxd_mqtt_client_delete(client_ptr);
214 
215     if (pool_0.nx_packet_pool_available != pool_0.nx_packet_pool_total)
216         error_counter++;
217 
218     /* Determine if the test was successful.  */
219     if (error_counter)
220     {
221         printf("ERROR!\n");
222         test_control_return(1);
223     }
224     else
225     {
226         printf("SUCCESS!\n");
227         test_control_return(0);
228     }
229 }
230 
231 /* This thread acts as MQTT server, accepting the connection. */
ntest_1_entry(ULONG thread_input)232 static void    ntest_1_entry(ULONG thread_input)
233 {
234 UINT       status;
235 ULONG      actual_status;
236 NX_PACKET  *packet_ptr;
237 UCHAR      *byte;
238 USHORT      packet_id;
239 UINT        len, packet_len;
240 UCHAR       control_header;
241 UINT        i;
242 
243     /* Ensure the IP instance has been initialized.  */
244     status = nx_ip_status_check(&ip_1, NX_IP_INITIALIZE_DONE, &actual_status, NX_IP_PERIODIC_RATE);
245 
246     /* Check for error.  */
247     if(status)
248         error_counter++;
249 
250     /* Create a socket.  */
251     status = nx_tcp_socket_create(&ip_1, &server_socket, "Server Socket",
252                                   NX_IP_NORMAL, NX_FRAGMENT_OKAY, NX_IP_TIME_TO_LIVE, 65535,
253                                   NX_NULL, NX_NULL);
254 
255     /* Check for error.  */
256     if(status)
257         error_counter++;
258 
259     /* Setup this thread to listen.  */
260     status = nx_tcp_server_socket_listen(&ip_1, NXD_MQTT_PORT, &server_socket, 5, NX_NULL);
261 
262     /* Check for error.  */
263     if(status)
264         error_counter++;
265 
266     tx_thread_resume(&ntest_0);
267 
268     /* Accept a connection from client socket.  */
269     status = nx_tcp_server_socket_accept(&server_socket, NX_IP_PERIODIC_RATE);
270 
271     /* Check for error.  */
272     if (status)
273         error_counter++;
274 
275     tx_thread_sleep(1);
276 
277     status = nx_tcp_socket_receive(&server_socket, &packet_ptr, 5 * NX_IP_PERIODIC_RATE);
278 
279     if (status)
280     {
281         error_counter++;
282     }
283     else
284     {
285 
286         /* Response with Connect SUCCESS */
287         byte = packet_ptr->nx_packet_prepend_ptr;
288         byte[0] = 0x20;
289         byte[1] = 0x02;
290         byte[2] = 0;
291         byte[3] = 0;
292 
293         packet_ptr->nx_packet_append_ptr = packet_ptr->nx_packet_prepend_ptr + 4;
294         packet_ptr->nx_packet_length = 4;
295 
296         status = nx_tcp_socket_send(&server_socket, packet_ptr, 1 * NX_IP_PERIODIC_RATE);
297         if (status)
298             error_counter++;
299 
300         packet_ptr = NX_NULL;
301         packet_len = 0;
302 
303         tx_semaphore_get(&semaphore_client_sent, NX_WAIT_FOREVER);
304 
305         while (1)
306         {
307             status = nx_tcp_socket_receive(&server_socket, &packet_ptr, NX_IP_PERIODIC_RATE);
308 
309             if (status)
310                 break;
311 
312             nx_packet_release(packet_ptr);
313         }
314 
315         tx_semaphore_put(&semaphore_server_received);
316 
317         /* Disconnect.  */
318         status = nx_tcp_socket_disconnect(&server_socket, NX_IP_PERIODIC_RATE);
319 
320         /* Check for error.  */
321         if (status)
322             error_counter++;
323     }
324 
325     /* Unaccept the server socket.  */
326     status = nx_tcp_server_socket_unaccept(&server_socket);
327 
328     /* Check for error.  */
329     if (status)
330         error_counter++;
331 
332     /* Unlisten on the server port.  */
333     status =  nx_tcp_server_socket_unlisten(&ip_1, NXD_MQTT_PORT);
334 
335     /* Check for error.  */
336     if (status)
337         error_counter++;
338 
339     /* Delete the socket.  */
340     status = nx_tcp_socket_delete(&server_socket);
341 
342     /* Check for error.  */
343     if(status)
344         error_counter++;
345 }
346 #else
347 
348 #ifdef CTEST
test_application_define(void * first_unused_memory)349 VOID test_application_define(void *first_unused_memory)
350 #else
351 void    netx_mqtt_client_transmit_queue_depth_application_define(void *first_unused_memory)
352 #endif
353 {
354 
355     /* Print out test information banner.  */
356     printf("NetX Test:   MQTT Transmit Queue Depth Test............................N/A\n");
357 
358     test_control_return(3);
359 }
360 #endif
361