1 /* MQTT connect test. This test case validates MQTT client connect without username/password. */
2
3 #include "tx_api.h"
4 #include "nx_api.h"
5 #include "nxd_mqtt_client.h"
6 extern void test_control_return(UINT status);
7 extern void SET_ERROR_COUNTER(ULONG *error_counter, CHAR *filename, int line_number);
8 #define DEMO_STACK_SIZE 2048
9
10
11 #define NUM_PACKETS 30
12 #define PACKET_SIZE 1536
13 #define PACKET_POOL_SIZE (NUM_PACKETS * (PACKET_SIZE + sizeof(NX_PACKET)))
14
15 #define CLIENT_ID "1234"
16
17 /* Define the ThreadX and NetX object control blocks... */
18
19 static TX_THREAD ntest_0;
20 static TX_THREAD ntest_1;
21
22 static NX_PACKET_POOL pool_0;
23 static NX_IP ip_0;
24 static NX_IP ip_1;
25 static NX_TCP_SOCKET server_socket;
26 static UCHAR pool_area[PACKET_POOL_SIZE];
27
28 #ifdef NX_SECURE_ENABLE
29
30 #include "../web_test/test_device_cert.c"
31 #include "../web_test/test_ca_cert.c"
32
33 /* Declare external cryptosuites. */
34 extern const NX_SECURE_TLS_CRYPTO nx_crypto_tls_ciphers;
35
36 static NX_SECURE_TLS_SESSION tls_server_session;
37 static NX_SECURE_X509_CERT server_local_certificate;
38
39 /* Define crypto metadata buffer. */
40 static UCHAR client_metadata[5*4096];
41 static UCHAR server_metadata[5*4096];
42
43 /* For remote certificate. */
44 static NX_SECURE_X509_CERT remote_certificate, remote_issuer, ca_certificate;
45 static UCHAR remote_cert_buffer[2000];
46 static UCHAR remote_issuer_buffer[2000];
47 static UCHAR tls_packet_buffer[2][4096];
48
49 #define TEST_LOOP 2
50 #else
51 #define TEST_LOOP 1
52 #endif
53
54 static TX_SEMAPHORE semaphore_server_start;
55 static TX_SEMAPHORE semaphore_client_stop;
56
57 /* Define the counters used in the demo application... */
58
59 static ULONG error_counter;
60
61 /* Define thread prototypes. */
62
63 static void ntest_0_entry(ULONG thread_input);
64 static void ntest_1_entry(ULONG thread_input);
65 extern void _nx_ram_network_driver(struct NX_IP_DRIVER_STRUCT *driver_req);
66
67 #define PRIMARY_INTERFACE 0
68
69 /* Define what the initial system looks like. */
70 static NXD_MQTT_CLIENT *client_ptr;
71 static UCHAR *stack_ptr;
72 #ifdef CTEST
test_application_define(void * first_unused_memory)73 VOID test_application_define(void *first_unused_memory)
74 #else
75 void netx_mqtt_client_connect_v6_application_define(void *first_unused_memory)
76 #endif
77 {
78 CHAR *pointer;
79 UINT status;
80
81 UINT ip0_primary_linklocal_address_index;
82 UINT ip0_primary_global_address_index;
83
84
85 UINT ip1_primary_linklocal_address_index;
86 UINT ip1_primary_global_address_index;
87
88
89 NXD_ADDRESS ip_address;
90 //NXD_ADDRESS ip1_primary_global_address;
91
92
93 /* Setup the working pointer. */
94 pointer = (CHAR *) first_unused_memory;
95
96 error_counter = 0;
97
98 /* Create the main thread. */
99 tx_thread_create(&ntest_0, "thread 0", ntest_0_entry, 0,
100 pointer, DEMO_STACK_SIZE,
101 4, 4, TX_NO_TIME_SLICE, TX_DONT_START);
102
103 pointer = pointer + DEMO_STACK_SIZE;
104
105 /* Create the main thread. */
106 tx_thread_create(&ntest_1, "thread 1", ntest_1_entry, 0,
107 pointer, DEMO_STACK_SIZE,
108 3, 3, TX_NO_TIME_SLICE, TX_AUTO_START);
109
110 pointer = pointer + DEMO_STACK_SIZE;
111
112 tx_semaphore_create(&semaphore_server_start, "semaphore server start", 0);
113 tx_semaphore_create(&semaphore_client_stop, "semaphore client stop", 0);
114
115 /* Initialize the NetX system. */
116 nx_system_initialize();
117
118 /* Create a packet pool. */
119 status = nx_packet_pool_create(&pool_0, "NetX Main Packet Pool", PACKET_SIZE, pool_area, PACKET_POOL_SIZE);
120
121 if(status)
122 error_counter++;
123
124 /* Create an IP instance. */
125 status = nx_ip_create(&ip_0, "NetX IP Instance 0", IP_ADDRESS(1, 2, 3, 4), 0xFFFFFF00UL, &pool_0, _nx_ram_network_driver,
126 pointer, 2048, 1);
127 pointer = pointer + 2048;
128
129 /* Enable IPv6 */
130 nxd_ipv6_enable(&ip_0);
131
132 /* Create another IP instance. */
133 status += nx_ip_create(&ip_1, "NetX IP Instance 1", IP_ADDRESS(1, 2, 3, 5), 0xFFFFFF00UL, &pool_0, _nx_ram_network_driver,
134 pointer, 2048, 1);
135 pointer = pointer + 2048;
136
137 if(status)
138 error_counter++;
139
140 /* Enable IPv6 */
141 nxd_ipv6_enable(&ip_1);
142
143
144 /* Set IPv6 address on ip_1. */
145
146 /* Set ip_0 primary link local address. */
147 status = nxd_ipv6_address_set(&ip_0, PRIMARY_INTERFACE, NX_NULL, 10, &ip0_primary_linklocal_address_index);
148 if (status)
149 error_counter++;
150
151 /* Set ip_1 primary link local address. */
152 status = nxd_ipv6_address_set(&ip_1, PRIMARY_INTERFACE, NX_NULL, 10, &ip1_primary_linklocal_address_index);
153 if (status)
154 error_counter++;
155
156
157
158 /* Set ip_0 primary interface global address. */
159 ip_address.nxd_ip_version = NX_IP_VERSION_V6;
160 ip_address.nxd_ip_address.v6[0] = 0x20010000;
161 ip_address.nxd_ip_address.v6[1] = 0;
162 ip_address.nxd_ip_address.v6[2] = 0;
163 ip_address.nxd_ip_address.v6[3] = 4;
164
165 status = nxd_ipv6_address_set(&ip_0, PRIMARY_INTERFACE, &ip_address, 64, &ip0_primary_global_address_index);
166
167 if (status)
168 error_counter++;
169
170 /* Set ip_1 primary interface global address. */
171 ip_address.nxd_ip_version = NX_IP_VERSION_V6;
172 ip_address.nxd_ip_address.v6[0] = 0x20010000;
173 ip_address.nxd_ip_address.v6[1] = 0;
174 ip_address.nxd_ip_address.v6[2] = 0;
175 ip_address.nxd_ip_address.v6[3] = 5;
176
177 status = nxd_ipv6_address_set(&ip_1, PRIMARY_INTERFACE, &ip_address, 64, &ip1_primary_global_address_index);
178
179 if (status)
180 error_counter++;
181
182
183 /* Enable ARP and supply ARP cache memory for IP Instance 0. */
184 status = nx_arp_enable(&ip_0, (void *) pointer, 1024);
185 pointer = pointer + 1024;
186
187 /* Enable ARP and supply ARP cache memory for IP Instance 1. */
188 status += nx_arp_enable(&ip_1, (void *) pointer, 1024);
189 pointer = pointer + 1024;
190
191 /* Check ARP enable status. */
192 if(status)
193 error_counter++;
194
195 /* Enable TCP processing for both IP instances. */
196 status = nx_tcp_enable(&ip_0);
197 status += nx_tcp_enable(&ip_1);
198 status += nxd_icmp_enable(&ip_0);
199 status += nxd_icmp_enable(&ip_1);
200
201 /* Check TCP enable status. */
202 if(status)
203 error_counter++;
204
205 stack_ptr = pointer;
206 pointer += DEMO_STACK_SIZE;
207 client_ptr = (NXD_MQTT_CLIENT*)pointer;
208
209 }
210
211 #ifdef NX_SECURE_ENABLE
212
213 /* Define the callback function for tls connection. */
client_tls_setup(NXD_MQTT_CLIENT * client_ptr,NX_SECURE_TLS_SESSION * tls_session,NX_SECURE_X509_CERT * certificate,NX_SECURE_X509_CERT * trusted_certificate)214 static UINT client_tls_setup(NXD_MQTT_CLIENT* client_ptr, NX_SECURE_TLS_SESSION* tls_session,
215 NX_SECURE_X509_CERT* certificate, NX_SECURE_X509_CERT* trusted_certificate)
216 {
217 UINT status;
218
219 /* Create a tls session. */
220 status = nx_secure_tls_session_create(tls_session,
221 &nx_crypto_tls_ciphers,
222 client_metadata,
223 sizeof(client_metadata));
224
225 if (status)
226 {
227 return status;
228 }
229
230 nx_secure_tls_session_packet_buffer_set(tls_session, tls_packet_buffer[0], sizeof(tls_packet_buffer[0]));
231 nx_secure_tls_remote_certificate_allocate(tls_session, &remote_certificate, remote_cert_buffer, sizeof(remote_cert_buffer));
232 nx_secure_tls_remote_certificate_allocate(tls_session, &remote_issuer, remote_issuer_buffer, sizeof(remote_issuer_buffer));
233
234 nx_secure_x509_certificate_initialize(&ca_certificate, test_ca_cert_der, test_ca_cert_der_len,
235 NX_NULL, 0, NX_NULL, 0, NX_SECURE_X509_KEY_TYPE_NONE);
236 nx_secure_tls_trusted_certificate_add(tls_session, &ca_certificate);
237
238 return(NX_SUCCESS);
239 }
240
server_tls_setup(NX_SECURE_TLS_SESSION * tls_session)241 static UINT server_tls_setup(NX_SECURE_TLS_SESSION *tls_session)
242 {
243 UINT status;
244
245 status = nx_secure_tls_session_create(tls_session,
246 &nx_crypto_tls_ciphers,
247 server_metadata,
248 sizeof(server_metadata));
249 if (status)
250 {
251 return status;
252 }
253
254 memset(&server_local_certificate, 0, sizeof(server_local_certificate));
255 nx_secure_x509_certificate_initialize(&server_local_certificate,
256 test_device_cert_der, test_device_cert_der_len,
257 NX_NULL, 0, test_device_cert_key_der,
258 test_device_cert_key_der_len, NX_SECURE_X509_KEY_TYPE_RSA_PKCS1_DER);
259
260 nx_secure_tls_local_certificate_add(tls_session, &server_local_certificate);
261
262 nx_secure_tls_session_packet_buffer_set(tls_session, tls_packet_buffer[1], sizeof(tls_packet_buffer[1]));
263
264 return(NX_SUCCESS);
265 }
266 #endif
267
268 #define MQTT_CLIENT_THREAD_PRIORITY 2
269 static UINT keepalive_value;
270 static UINT cleansession_value;
271 #ifdef CTEST
272 static
273 #else /* CTEST */
274 extern
275 #endif /* CTEST */
276 UCHAR mqtt_memory[8192];
277 /* Define the test threads. */
278 /* This thread sets up MQTT client and makes a connect request without username/password. */
ntest_0_entry(ULONG thread_input)279 static void ntest_0_entry(ULONG thread_input)
280 {
281 UINT status;
282 NXD_ADDRESS server_address;
283 UINT i;
284
285 /* Print out test information banner. */
286 printf("NetX Test: MQTT Connect V6 Test .....................................");
287
288 /* Check for earlier error. */
289 if(error_counter)
290 {
291 printf("ERROR!\n");
292 test_control_return(1);
293 }
294
295 /* Wait 5 seconds for the IP thread to finish its initilization and
296 for the IPv6 stack to finish DAD process. */
297 tx_thread_sleep(5 * NX_IP_PERIODIC_RATE);
298
299 status = nxd_mqtt_client_create(client_ptr, "my client", CLIENT_ID, strlen(CLIENT_ID), &ip_0, &pool_0,
300 stack_ptr, DEMO_STACK_SIZE, MQTT_CLIENT_THREAD_PRIORITY,
301 mqtt_memory, sizeof(mqtt_memory));
302 if(status)
303 error_counter++;
304 tx_thread_sleep(1);
305
306 server_address.nxd_ip_version = 6;
307 server_address.nxd_ip_address.v6[0] = 0x20010000;
308 server_address.nxd_ip_address.v6[1] = 0;
309 server_address.nxd_ip_address.v6[2] = 0;
310 server_address.nxd_ip_address.v6[3] = 5;
311
312 keepalive_value = 0;
313 cleansession_value = 0;
314
315 for (i = 0; i < TEST_LOOP; i++)
316 {
317 tx_semaphore_get(&semaphore_server_start, NX_WAIT_FOREVER);
318
319 if (i == 0)
320 {
321 status = nxd_mqtt_client_connect(client_ptr, &server_address, NXD_MQTT_PORT,
322 keepalive_value, cleansession_value, NX_IP_PERIODIC_RATE);
323 #ifndef NXD_MQTT_REQUIRE_TLS
324 if (status)
325 error_counter++;
326 #else
327 if (status != NXD_MQTT_CONNECT_FAILURE)
328 error_counter++;
329 #endif
330 }
331 #ifdef NX_SECURE_ENABLE
332 else
333 {
334 status = nxd_mqtt_client_secure_connect(client_ptr, &server_address, NXD_MQTT_PORT,
335 client_tls_setup,
336 keepalive_value, cleansession_value, NX_IP_PERIODIC_RATE);
337
338 if (status)
339 error_counter++;
340 }
341 #endif
342
343 nxd_mqtt_client_disconnect(client_ptr);
344
345 tx_semaphore_put(&semaphore_client_stop);
346 }
347 nxd_mqtt_client_delete(client_ptr);
348
349
350 /* Determine if the test was successful. */
351 if(error_counter)
352 {
353 printf("ERROR!\n");
354 test_control_return(1);
355 }
356 else
357 {
358 printf("SUCCESS!\n");
359 test_control_return(0);
360 }
361 }
362
363
364 static UCHAR content[100];
365
366 static UCHAR fixed_header[] = {0x10, 0x00, 0x00, 0x04, 'M', 'Q', 'T', 'T', 0x4, 0x0, 0x0, 0x0};
367
368 /* This thread acts as MQTT server, accepting the connection. */
ntest_1_entry(ULONG thread_input)369 static void ntest_1_entry(ULONG thread_input)
370 {
371 UINT status;
372 ULONG actual_status;
373 NX_PACKET *packet_ptr;
374 UCHAR *byte;
375 UINT i;
376
377 /* Ensure the IP instance has been initialized. */
378 status = nx_ip_status_check(&ip_1, NX_IP_INITIALIZE_DONE, &actual_status, NX_IP_PERIODIC_RATE);
379
380 /* Check for error. */
381 if(status)
382 error_counter++;
383
384 /* Create a socket. */
385 status = nx_tcp_socket_create(&ip_1, &server_socket, "Server Socket",
386 NX_IP_NORMAL, NX_FRAGMENT_OKAY, NX_IP_TIME_TO_LIVE, 1000,
387 NX_NULL, NX_NULL);
388
389 /* Check for error. */
390 if(status)
391 error_counter++;
392
393 /* Setup this thread to listen. */
394 status = nx_tcp_server_socket_listen(&ip_1, NXD_MQTT_PORT, &server_socket, 5, NX_NULL);
395
396 /* Check for error. */
397 if(status)
398 error_counter++;
399
400 #ifdef NX_SECURE_ENABLE
401 /* Session setup. */
402 server_tls_setup(&tls_server_session);
403 #endif
404
405 tx_thread_resume(&ntest_0);
406
407 /* Accept a connection from client socket. */
408 tx_thread_sleep(5 * NX_IP_PERIODIC_RATE);
409
410 for (i = 0; i < TEST_LOOP; i++)
411 {
412
413 tx_semaphore_put(&semaphore_server_start);
414
415 status = nx_tcp_server_socket_accept(&server_socket, NX_IP_PERIODIC_RATE * 2);
416
417 /* Check for error. */
418 if (status)
419 {
420 #ifdef NXD_MQTT_REQUIRE_TLS
421 if (i == 1)
422 #endif
423 SET_ERROR_COUNTER(&error_counter, __FILE__, __LINE__);
424 }
425
426 #ifdef NX_SECURE_ENABLE
427 if (i == 1)
428 {
429 status = nx_secure_tls_session_start(&tls_server_session, &server_socket, NX_WAIT_FOREVER);
430 if (status)
431 error_counter++;
432 }
433 #endif
434
435 tx_thread_sleep(1);
436 if (i == 0)
437 {
438 status = nx_tcp_socket_receive(&server_socket, &packet_ptr, NX_IP_PERIODIC_RATE);
439 }
440 #ifdef NX_SECURE_ENABLE
441 else
442 {
443 status = nx_secure_tls_session_receive(&tls_server_session, &packet_ptr, NX_WAIT_FOREVER);
444 }
445 #endif
446
447 if (status)
448 {
449 #ifdef NXD_MQTT_REQUIRE_TLS
450 if (i == 1)
451 #endif
452 error_counter++;
453 }
454 else
455 {
456
457 /* construct the connect message. */
458 memcpy(content, fixed_header, sizeof(fixed_header));
459 /* Append client ID */
460 content[sizeof(fixed_header)] = (strlen(CLIENT_ID) >> 8) & 0xf;
461 content[sizeof(fixed_header) + 1] = (strlen(CLIENT_ID) & 0xf);
462 memcpy(content + sizeof(fixed_header) + 2, CLIENT_ID, strlen(CLIENT_ID));
463
464 content[1] = (UCHAR)(sizeof(fixed_header) + strlen(CLIENT_ID));
465
466 /* Fill in the connection_flag, keepalive, and cleansession flags. */
467 content[10] = keepalive_value >> 8;
468 content[11] = keepalive_value & 0xFF;
469 if (cleansession_value)
470 content[9] = content[9] | 2;
471
472 /* Validate the MQTT connect request. */
473 if (memcmp(packet_ptr->nx_packet_prepend_ptr, content, sizeof(fixed_header) + strlen(CLIENT_ID)))
474 error_counter++;
475
476 #ifdef NX_SECURE_ENABLE
477 if (i == 1)
478 {
479 nx_packet_release(packet_ptr);
480 status = nx_secure_tls_packet_allocate(&tls_server_session, &pool_0, &packet_ptr, NX_NO_WAIT);
481 if (status)
482 error_counter++;
483 }
484 #endif
485
486 /* Response with SUCCESS */
487 byte = packet_ptr->nx_packet_prepend_ptr;
488 byte[0] = 0x20;
489 byte[1] = 0x02;
490 byte[2] = 0;
491 byte[3] = 0;
492
493 packet_ptr->nx_packet_append_ptr = packet_ptr->nx_packet_prepend_ptr + 4;
494 packet_ptr->nx_packet_length = 4;
495
496 if (i == 0)
497 {
498 status = nx_tcp_socket_send(&server_socket, packet_ptr, 1);
499 }
500 #ifdef NX_SECURE_ENABLE
501 else
502 {
503 status = nx_secure_tls_session_send(&tls_server_session, packet_ptr, NX_WAIT_FOREVER);
504 }
505 #endif
506 if (status)
507 error_counter++;
508
509 tx_thread_sleep(1);
510
511 /* Disconnect. */
512 status = nx_tcp_socket_disconnect(&server_socket, NX_IP_PERIODIC_RATE);
513
514 /* Check for error. */
515 if (status)
516 error_counter++;
517 }
518
519 tx_semaphore_get(&semaphore_client_stop, NX_WAIT_FOREVER);
520
521 /* Unaccept the server socket. */
522 status = nx_tcp_server_socket_unaccept(&server_socket);
523
524 /* Check for error. */
525 if (status)
526 error_counter++;
527
528 /* Prepare to accept another connection. */
529 status = nx_tcp_server_socket_relisten(&ip_1, NXD_MQTT_PORT, &server_socket);
530
531 /* Check for error. */
532 if (status)
533 error_counter++;
534 }
535
536 #ifdef NX_SECURE_ENABLE
537 /* Delete the session. */
538 nx_secure_tls_session_delete(&tls_server_session);
539 #endif
540
541 /* Unlisten on the server port. */
542 status = nx_tcp_server_socket_unlisten(&ip_1, NXD_MQTT_PORT);
543
544 /* Check for error. */
545 if (status)
546 error_counter++;
547
548 /* Delete the socket. */
549 status = nx_tcp_socket_delete(&server_socket);
550
551 /* Check for error. */
552 if(status)
553 error_counter++;
554 }
555
556