1 /*
2 * Copyright (c) 2017 Intel Corporation
3 * Copyright (c) 2024 Nordic Semiconductor ASA
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 */
7
8 #include <zephyr/ztest.h>
9 #include <zephyr/misc/lorem_ipsum.h>
10 #include <zephyr/net/socket.h>
11 #include <zephyr/net/mqtt.h>
12 #include <zephyr/random/random.h>
13
14 #include "mqtt_internal.h"
15
16 #define SERVER_ADDR "::1"
17 #define SERVER_PORT 1883
18 #define APP_SLEEP_MSECS 500
19 #define MQTT_CLIENTID "zephyr_publisher"
20 #define BUFFER_SIZE 128
21 #define BROKER_BUFFER_SIZE 1500
22 #define TIMEOUT 100
23
24 static uint8_t broker_buf[BROKER_BUFFER_SIZE];
25 static size_t broker_offset;
26 static uint8_t broker_topic[32];
27 static uint8_t rx_buffer[BUFFER_SIZE];
28 static uint8_t tx_buffer[BUFFER_SIZE];
29 static struct mqtt_client client_ctx;
30 static struct sockaddr broker;
31 int s_sock = -1, c_sock = -1;
32 static struct zsock_pollfd client_fds[1];
33 static int client_nfds;
34
35 static struct mqtt_test_ctx {
36 bool connected;
37 bool ping_resp_handled;
38 bool publish_handled;
39 bool puback_handled;
40 bool pubcomp_handled;
41 bool suback_handled;
42 bool unsuback_handled;
43 uint16_t msg_id;
44 int payload_left;
45 const uint8_t *payload;
46 } test_ctx;
47
48 static const uint8_t payload_short[] = "Short payload";
49 static const uint8_t payload_long[] = LOREM_IPSUM;
50
51 static const uint8_t connect_ack_reply[] = {
52 MQTT_PKT_TYPE_CONNACK, 0x02, 0, 0,
53 };
54
55 static const uint8_t ping_resp_reply[] = {
56 MQTT_PKT_TYPE_PINGRSP, 0,
57 };
58
59 static const uint8_t puback_reply_template[] = {
60 MQTT_PKT_TYPE_PUBACK, 0x02, 0, 0,
61 };
62
63 static const uint8_t pubrec_reply_template[] = {
64 MQTT_PKT_TYPE_PUBREC, 0x02, 0, 0,
65 };
66
67 static const uint8_t pubcomp_reply_template[] = {
68 MQTT_PKT_TYPE_PUBCOMP, 0x02, 0, 0,
69 };
70
71 static const uint8_t suback_reply_template[] = {
72 MQTT_PKT_TYPE_SUBACK, 0x03, 0, 0, 0x02,
73 };
74
75 static const uint8_t unsuback_reply_template[] = {
76 MQTT_PKT_TYPE_UNSUBACK, 0x02, 0, 0,
77 };
78
get_mqtt_topic(void)79 static const char *get_mqtt_topic(void)
80 {
81 return "sensors";
82 }
83
prepare_client_fds(struct mqtt_client * client)84 static void prepare_client_fds(struct mqtt_client *client)
85 {
86 client_fds[0].fd = client->transport.tcp.sock;
87 client_fds[0].events = ZSOCK_POLLIN;
88 client_nfds = 1;
89 }
90
clear_client_fds(void)91 static void clear_client_fds(void)
92 {
93 client_nfds = 0;
94 }
95
client_wait(bool timeout_allowed)96 static void client_wait(bool timeout_allowed)
97 {
98 int ret;
99
100 zassert_true(client_nfds > 0, "Client FDS should be set at this point");
101 ret = zsock_poll(client_fds, client_nfds, TIMEOUT);
102 if (timeout_allowed) {
103 zassert_true(ret >= 0, "poll() error, (%d)", ret);
104 } else {
105 zassert_true(ret > 0, "poll() error, (%d)", ret);
106 }
107 }
108
broker_init(void)109 static void broker_init(void)
110 {
111 struct sockaddr_in6 *broker6 = net_sin6(&broker);
112 struct sockaddr_in6 bind_addr = {
113 .sin6_family = AF_INET6,
114 .sin6_port = htons(SERVER_PORT),
115 .sin6_addr = IN6ADDR_ANY_INIT,
116 };
117 int reuseaddr = 1;
118 int ret;
119
120 broker6->sin6_family = AF_INET6;
121 broker6->sin6_port = htons(SERVER_PORT);
122 zsock_inet_pton(AF_INET6, SERVER_ADDR, &broker6->sin6_addr);
123
124 memset(broker_topic, 0, sizeof(broker_topic));
125 broker_offset = 0;
126
127 s_sock = zsock_socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
128 if (s_sock < 0) {
129 printk("Failed to create server socket\n");
130 }
131
132 ret = zsock_setsockopt(s_sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr,
133 sizeof(reuseaddr));
134 if (ret < 0) {
135 printk("Failed to set SO_REUSEADDR on server socket, %d\n", errno);
136 }
137
138 ret = zsock_bind(s_sock, (struct sockaddr *)&bind_addr, sizeof(bind_addr));
139 if (ret < 0) {
140 printk("Failed to bind server socket, %d\n", errno);
141 }
142
143 ret = zsock_listen(s_sock, 1);
144 if (ret < 0) {
145 printk("Failed to listen on server socket, %d\n", errno);
146 }
147 }
148
broker_destroy(void)149 static void broker_destroy(void)
150 {
151 if (s_sock >= 0) {
152 zsock_close(s_sock);
153 s_sock = -1;
154 }
155
156 if (c_sock >= 0) {
157 zsock_close(c_sock);
158 c_sock = -1;
159 }
160 }
161
162 #define LISTEN_SOCK_ID 0
163 #define CLIENT_SOCK_ID 1
164
test_send_reply(const uint8_t * reply,size_t len)165 static void test_send_reply(const uint8_t *reply, size_t len)
166 {
167 while (len) {
168 ssize_t out_len = zsock_send(c_sock, reply, len, 0);
169
170 zassert_true(out_len > 0, "Broker send failed (%d)", -errno);
171
172 reply = reply + out_len;
173 len -= out_len;
174 }
175 }
176
encode_fixed_hdr(uint8_t * buf,uint8_t type_flags,uint32_t length)177 static uint8_t encode_fixed_hdr(uint8_t *buf, uint8_t type_flags, uint32_t length)
178 {
179 uint8_t bytes = 0U;
180
181 bytes++;
182 *buf = type_flags;
183 buf++;
184
185 do {
186 bytes++;
187 *buf = length & MQTT_LENGTH_VALUE_MASK;
188 length >>= MQTT_LENGTH_SHIFT;
189 if (length > 0) {
190 *buf |= MQTT_LENGTH_CONTINUATION_BIT;
191 }
192 buf++;
193 } while (length > 0);
194
195 return bytes;
196 }
197
broker_validate_packet(uint8_t * buf,size_t length,uint8_t type,uint8_t flags)198 static void broker_validate_packet(uint8_t *buf, size_t length, uint8_t type,
199 uint8_t flags)
200 {
201 switch (type) {
202 case MQTT_PKT_TYPE_CONNECT: {
203 test_send_reply(connect_ack_reply, sizeof(connect_ack_reply));
204 break;
205 }
206 case MQTT_PKT_TYPE_PUBLISH: {
207 uint8_t qos = (flags & MQTT_HEADER_QOS_MASK) >> 1;
208 uint8_t reply_ack[sizeof(puback_reply_template)];
209 uint16_t topic_len, var_len = 0;
210 bool ack = false;
211
212 topic_len = sys_get_be16(buf);
213
214 if (qos == MQTT_QOS_0_AT_MOST_ONCE) {
215 var_len = topic_len + 2;
216 } else if (qos == MQTT_QOS_1_AT_LEAST_ONCE) {
217 ack = true;
218 var_len = topic_len + 4;
219 memcpy(reply_ack, puback_reply_template, sizeof(reply_ack));
220 } else if (qos == MQTT_QOS_2_EXACTLY_ONCE) {
221 ack = true;
222 var_len = topic_len + 4;
223 memcpy(reply_ack, pubrec_reply_template, sizeof(reply_ack));
224 } else {
225 zassert_unreachable("Invalid qos received");
226 }
227
228 zassert_equal(topic_len, strlen(get_mqtt_topic()), "Invalid topic length");
229 zassert_mem_equal(buf + 2, get_mqtt_topic(), topic_len, "Invalid topic");
230 zassert_equal(length - var_len, strlen(test_ctx.payload),
231 "Invalid payload length");
232 zassert_mem_equal(buf + var_len, test_ctx.payload,
233 strlen(test_ctx.payload), "Invalid payload");
234
235 if (ack) {
236 /* Copy packet ID. */
237 memcpy(reply_ack + 2, buf + topic_len + 2, 2);
238 test_send_reply(reply_ack, sizeof(reply_ack));
239 }
240
241 if (topic_len == strlen(broker_topic) &&
242 memcmp(buf + 2, broker_topic, topic_len) == 0) {
243 uint8_t fixed_hdr[MQTT_FIXED_HEADER_MAX_SIZE];
244 uint8_t hdr_len = encode_fixed_hdr(
245 fixed_hdr, MQTT_PKT_TYPE_PUBLISH | flags, length);
246
247 /* Send publish back */
248 test_send_reply(fixed_hdr, hdr_len);
249 test_send_reply(buf, length);
250 }
251
252 break;
253 }
254 case MQTT_PKT_TYPE_PUBACK: {
255 uint16_t message_id;
256
257 zassert_equal(length, 2, "Invalid PUBACK length");
258
259 message_id = sys_get_be16(buf);
260 zassert_equal(message_id, test_ctx.msg_id,
261 "Invalid packet ID received.");
262 break;
263 }
264 case MQTT_PKT_TYPE_PUBREL: {
265 uint8_t reply[sizeof(pubcomp_reply_template)];
266
267 memcpy(reply, pubcomp_reply_template, sizeof(reply));
268 memcpy(reply + 2, buf, 2);
269 test_send_reply(reply, sizeof(reply));
270
271 break;
272 }
273 case MQTT_PKT_TYPE_SUBSCRIBE: {
274 uint16_t topic_len;
275 uint8_t reply[sizeof(suback_reply_template)];
276
277 topic_len = sys_get_be16(buf + 2);
278 zassert_true(topic_len <= length - 5, "Invalid topic length");
279 zassert_true(topic_len < sizeof(broker_topic),
280 "Topic length too long to handle");
281 memcpy(broker_topic, buf + 4, topic_len);
282 broker_topic[topic_len] = '\0';
283
284 memcpy(reply, suback_reply_template, sizeof(suback_reply_template));
285 /* Copy packet ID. */
286 memcpy(reply + 2, buf, 2);
287 test_send_reply(reply, sizeof(reply));
288
289 break;
290 }
291 case MQTT_PKT_TYPE_UNSUBSCRIBE: {
292 uint16_t topic_len;
293 uint8_t reply[sizeof(unsuback_reply_template)];
294
295 topic_len = sys_get_be16(buf + 2);
296 zassert_true(topic_len <= length - 4, "Invalid topic length");
297 zassert_mem_equal(broker_topic, buf + 4, topic_len,
298 "Invalid topic received");
299 memset(broker_topic, 0, sizeof(broker_topic));
300
301 memcpy(reply, unsuback_reply_template, sizeof(unsuback_reply_template));
302 /* Copy packet ID. */
303 memcpy(reply + 2, buf, 2);
304 test_send_reply(reply, sizeof(reply));
305
306 break;
307 }
308 case MQTT_PKT_TYPE_PINGREQ: {
309 test_send_reply(ping_resp_reply, sizeof(ping_resp_reply));
310 break;
311 }
312 case MQTT_PKT_TYPE_DISCONNECT: {
313 zsock_close(c_sock);
314 c_sock = -1;
315 break;
316 }
317 default:
318 zassert_true(false, "Not yet supported (%02x)", type);
319 }
320 }
321
broker_receive(uint8_t expected_packet)322 static int broker_receive(uint8_t expected_packet)
323 {
324 struct buf_ctx buf;
325 size_t bytes_consumed = 0;
326 uint8_t type_and_flags, type, flags;
327 uint32_t length;
328 int ret;
329
330 zassert_false(broker_offset == sizeof(broker_buf), "Cannot fit full payload!");
331
332 ret = zsock_recv(c_sock, broker_buf + broker_offset,
333 sizeof(broker_buf) - broker_offset,
334 ZSOCK_MSG_DONTWAIT);
335
336 if (ret == -1 && errno == EAGAIN) {
337 /* EAGAIN expected only if there already was data in the buffer. */
338 zassert_true(broker_offset > 0, "Unexpected EAGAIN in broker");
339 } else {
340 zassert_true(ret > 0, "Broker receive failed (%d)", -errno);
341 broker_offset += ret;
342 }
343
344 if (broker_offset < MQTT_FIXED_HEADER_MIN_SIZE) {
345 return -EAGAIN;
346 }
347
348 buf.cur = broker_buf;
349 buf.end = broker_buf + broker_offset;
350
351 ret = fixed_header_decode(&buf, &type_and_flags, &length);
352 if (ret == -EAGAIN) {
353 return ret;
354 }
355
356 zassert_ok(ret, "Failed to decode fixed header (%d)", ret);
357
358 if (length > buf.end - buf.cur) {
359 return -EAGAIN;
360 }
361
362 bytes_consumed += buf.cur - broker_buf;
363 bytes_consumed += length;
364
365 type = type_and_flags & 0xF0;
366 flags = type_and_flags & 0x0F;
367 zassert_equal(type, expected_packet,
368 "Unexpected packet type received at the broker, (%02x)",
369 type);
370
371 broker_validate_packet(buf.cur, length, type, flags);
372
373 broker_offset -= bytes_consumed;
374 if (broker_offset > 0) {
375 memmove(broker_buf, broker_buf + bytes_consumed,
376 broker_offset);
377 }
378
379 return 0;
380 }
381
broker_process(uint8_t expected_packet)382 static void broker_process(uint8_t expected_packet)
383 {
384 struct zsock_pollfd fds[2] = {
385 { s_sock, ZSOCK_POLLIN, 0},
386 { c_sock, ZSOCK_POLLIN, 0},
387 };
388 int ret;
389
390 if (c_sock >= 0 && broker_offset > 0) {
391 ret = broker_receive(expected_packet);
392 if (ret == 0) {
393 goto out;
394 }
395 }
396
397 while (true) {
398 ret = zsock_poll(fds, ARRAY_SIZE(fds), TIMEOUT);
399 zassert_true(ret > 0, "Unexpected timeout on poll");
400
401 for (int i = 0; i < ARRAY_SIZE(fds); i++) {
402 if (fds[i].fd < 0) {
403 continue;
404 }
405
406 zassert_false((fds[i].revents & ZSOCK_POLLERR) ||
407 (fds[i].revents & ZSOCK_POLLHUP) ||
408 (fds[i].revents & ZSOCK_POLLNVAL),
409 "Unexpected poll event, (%02x)",
410 fds[i].revents);
411
412 if (!(fds[i].revents & ZSOCK_POLLIN)) {
413 continue;
414 }
415
416 if (i == LISTEN_SOCK_ID) {
417 zassert_equal(c_sock, -1, "Client already connected");
418 ret = zsock_accept(s_sock, NULL, NULL);
419 zassert_true(ret >= 0, "Accept failed (%d)", -errno);
420
421 c_sock = ret;
422 fds[CLIENT_SOCK_ID].fd = c_sock;
423 } else {
424 ret = broker_receive(expected_packet);
425 if (ret == 0) {
426 goto out;
427 }
428 }
429 }
430 }
431
432 out:
433 return;
434 }
435
publish_handler(struct mqtt_client * const client,const struct mqtt_evt * evt)436 static void publish_handler(struct mqtt_client *const client,
437 const struct mqtt_evt *evt)
438 {
439 int ret;
440 static uint8_t buf[sizeof(payload_long)];
441
442 zassert_equal(evt->result, 0, "MQTT PUBLISH error: %d", evt->result);
443 zassert_equal(test_ctx.payload_left,
444 evt->param.publish.message.payload.len,
445 "Invalid payload length: %d",
446 evt->param.publish.message.payload.len);
447
448 ret = mqtt_readall_publish_payload(client, buf, test_ctx.payload_left);
449 zassert_ok(ret, "Error while reading publish payload (%d)", ret);
450 zassert_mem_equal(test_ctx.payload, buf,
451 evt->param.publish.message.payload.len,
452 "Invalid payload content");
453
454 test_ctx.payload_left = 0;
455 test_ctx.publish_handled = true;
456 }
457
mqtt_evt_handler(struct mqtt_client * const client,const struct mqtt_evt * evt)458 static void mqtt_evt_handler(struct mqtt_client *const client,
459 const struct mqtt_evt *evt)
460 {
461 int ret;
462
463 switch (evt->type) {
464 case MQTT_EVT_CONNACK:
465 zassert_ok(evt->result, "MQTT connect failed %d", evt->result);
466 test_ctx.connected = true;
467 break;
468
469 case MQTT_EVT_DISCONNECT:
470 test_ctx.connected = false;
471 test_ctx.payload_left = -1;
472
473 break;
474
475 case MQTT_EVT_PUBLISH:
476 publish_handler(client, evt);
477
478 if (evt->param.publish.message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
479 const struct mqtt_puback_param ack = {
480 .message_id = evt->param.publish.message_id,
481 };
482
483 ret = mqtt_publish_qos1_ack(client, &ack);
484 zassert_ok(ret, "Failed to send MQTT PUBACK (%d)", ret);
485 }
486
487 if (evt->param.publish.message.topic.qos == MQTT_QOS_2_EXACTLY_ONCE) {
488 const struct mqtt_pubrec_param ack = {
489 .message_id = evt->param.publish.message_id,
490 };
491
492 ret = mqtt_publish_qos2_receive(client, &ack);
493 zassert_ok(ret, "Failed to send MQTT PUBREC (%d)", ret);
494 }
495
496 break;
497
498 case MQTT_EVT_PUBACK:
499 zassert_ok(evt->result, "MQTT PUBACK error %d", evt->result);
500 zassert_equal(evt->param.puback.message_id, test_ctx.msg_id,
501 "Invalid packet ID received.");
502 test_ctx.puback_handled = true;
503
504 break;
505
506 case MQTT_EVT_PUBREC: {
507 const struct mqtt_pubrel_param rel_param = {
508 .message_id = evt->param.pubrec.message_id
509 };
510
511 zassert_ok(evt->result, "MQTT PUBREC error %d", evt->result);
512 zassert_equal(evt->param.pubrec.message_id, test_ctx.msg_id,
513 "Invalid packet ID received.");
514
515 ret = mqtt_publish_qos2_release(client, &rel_param);
516 zassert_ok(ret, "Failed to send MQTT PUBREL: %d", ret);
517
518 break;
519 }
520
521 case MQTT_EVT_PUBCOMP:
522 zassert_ok(evt->result, "MQTT PUBCOMP error %d", evt->result);
523 zassert_equal(evt->param.pubcomp.message_id, test_ctx.msg_id,
524 "Invalid packet ID received.");
525 test_ctx.pubcomp_handled = true;
526
527 break;
528
529 case MQTT_EVT_SUBACK:
530 zassert_ok(evt->result, "MQTT SUBACK error %d", evt->result);
531 zassert_equal(evt->param.suback.message_id, test_ctx.msg_id,
532 "Invalid packet ID received.");
533 test_ctx.suback_handled = true;
534
535 break;
536
537 case MQTT_EVT_UNSUBACK:
538 zassert_ok(evt->result, "MQTT UNSUBACK error %d", evt->result);
539 zassert_equal(evt->param.unsuback.message_id, test_ctx.msg_id,
540 "Invalid packet ID received.");
541
542 test_ctx.unsuback_handled = true;
543
544 break;
545
546 case MQTT_EVT_PINGRESP:
547 test_ctx.ping_resp_handled = true;
548 break;
549
550 default:
551 zassert_unreachable("Invalid MQTT packet");
552 break;
553 }
554 }
555
client_init(struct mqtt_client * client)556 static void client_init(struct mqtt_client *client)
557 {
558 mqtt_client_init(client);
559
560 /* MQTT client configuration */
561 client->broker = &broker;
562 client->evt_cb = mqtt_evt_handler;
563 client->client_id.utf8 = (uint8_t *)MQTT_CLIENTID;
564 client->client_id.size = strlen(MQTT_CLIENTID);
565 client->password = NULL;
566 client->user_name = NULL;
567 client->protocol_version = MQTT_VERSION_3_1_1;
568 client->transport.type = MQTT_TRANSPORT_NON_SECURE;
569 client->clean_session = true;
570
571 client->rx_buf = rx_buffer;
572 client->rx_buf_size = sizeof(rx_buffer);
573 client->tx_buf = tx_buffer;
574 client->tx_buf_size = sizeof(tx_buffer);
575 }
576
test_connect(void)577 static void test_connect(void)
578 {
579 int ret;
580
581 ret = mqtt_connect(&client_ctx);
582 zassert_ok(ret, "MQTT client failed to connect (%d)", ret);
583 broker_process(MQTT_PKT_TYPE_CONNECT);
584 prepare_client_fds(&client_ctx);
585
586 client_wait(false);
587 ret = mqtt_input(&client_ctx);
588 zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
589 }
590
test_pingreq(void)591 static void test_pingreq(void)
592 {
593 int ret;
594
595 ret = mqtt_ping(&client_ctx);
596 zassert_ok(ret, "MQTT client failed to send ping (%d)", ret);
597 broker_process(MQTT_PKT_TYPE_PINGREQ);
598
599 client_wait(false);
600 ret = mqtt_input(&client_ctx);
601 zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
602 }
603
test_publish(enum mqtt_qos qos)604 static void test_publish(enum mqtt_qos qos)
605 {
606 int ret;
607 struct mqtt_publish_param param;
608
609 test_ctx.payload_left = strlen(test_ctx.payload);
610 while (test_ctx.msg_id == 0) {
611 test_ctx.msg_id = sys_rand16_get();
612 }
613
614 param.message.topic.qos = qos;
615 param.message.topic.topic.utf8 = (uint8_t *)get_mqtt_topic();
616 param.message.topic.topic.size =
617 strlen(param.message.topic.topic.utf8);
618 param.message.payload.data = (uint8_t *)test_ctx.payload;
619 param.message.payload.len = test_ctx.payload_left;
620 param.message_id = test_ctx.msg_id;
621 param.dup_flag = 0U;
622 param.retain_flag = 0U;
623
624 ret = mqtt_publish(&client_ctx, ¶m);
625 zassert_ok(ret, "MQTT client failed to publish (%d)", ret);
626 broker_process(MQTT_PKT_TYPE_PUBLISH);
627
628 client_wait(true);
629 ret = mqtt_input(&client_ctx);
630 zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
631
632 /* Second input handle for expected Publish Complete response. */
633 if (qos == MQTT_QOS_2_EXACTLY_ONCE) {
634 broker_process(MQTT_PKT_TYPE_PUBREL);
635 client_wait(false);
636 ret = mqtt_input(&client_ctx);
637 zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
638 }
639 }
640
test_subscribe(void)641 static void test_subscribe(void)
642 {
643 int ret;
644 struct mqtt_topic topic;
645 struct mqtt_subscription_list sub;
646
647 while (test_ctx.msg_id == 0) {
648 test_ctx.msg_id = sys_rand16_get();
649 }
650
651 topic.topic.utf8 = get_mqtt_topic();
652 topic.topic.size = strlen(topic.topic.utf8);
653 topic.qos = MQTT_QOS_2_EXACTLY_ONCE;
654 sub.list = &topic;
655 sub.list_count = 1U;
656 sub.message_id = test_ctx.msg_id;
657
658 ret = mqtt_subscribe(&client_ctx, &sub);
659 zassert_ok(ret, "MQTT client failed to subscribe (%d)", ret);
660 broker_process(MQTT_PKT_TYPE_SUBSCRIBE);
661
662 client_wait(false);
663 ret = mqtt_input(&client_ctx);
664 zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
665 }
666
test_unsubscribe(void)667 static void test_unsubscribe(void)
668 {
669 int ret;
670 struct mqtt_topic topic;
671 struct mqtt_subscription_list unsub;
672
673 while (test_ctx.msg_id == 0) {
674 test_ctx.msg_id = sys_rand16_get();
675 }
676
677 topic.topic.utf8 = get_mqtt_topic();
678 topic.topic.size = strlen(topic.topic.utf8);
679 unsub.list = &topic;
680 unsub.list_count = 1U;
681 unsub.message_id = test_ctx.msg_id;
682
683 ret = mqtt_unsubscribe(&client_ctx, &unsub);
684 zassert_ok(ret, "MQTT client failed to unsubscribe (%d)", ret);
685 broker_process(MQTT_PKT_TYPE_UNSUBSCRIBE);
686
687 client_wait(false);
688 ret = mqtt_input(&client_ctx);
689 zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
690 }
691
test_disconnect(void)692 static void test_disconnect(void)
693 {
694 int ret;
695
696 ret = mqtt_disconnect(&client_ctx);
697 zassert_ok(ret, "MQTT client failed to disconnect (%d)", ret);
698 broker_process(MQTT_PKT_TYPE_DISCONNECT);
699
700 client_wait(false);
701 ret = mqtt_input(&client_ctx);
702 zassert_equal(ret, -ENOTCONN, "Client should no longer be connected");
703 }
704
ZTEST(mqtt_client,test_mqtt_connect)705 ZTEST(mqtt_client, test_mqtt_connect)
706 {
707 test_connect();
708 zassert_true(test_ctx.connected, "MQTT client should be connected");
709 test_disconnect();
710 zassert_false(test_ctx.connected, "MQTT client should be disconnected");
711 }
712
ZTEST(mqtt_client,test_mqtt_ping)713 ZTEST(mqtt_client, test_mqtt_ping)
714 {
715 test_connect();
716 test_pingreq();
717 zassert_true(test_ctx.ping_resp_handled, "MQTT client should handle ping response");
718 test_disconnect();
719 }
720
ZTEST(mqtt_client,test_mqtt_publish_qos0)721 ZTEST(mqtt_client, test_mqtt_publish_qos0)
722 {
723 test_ctx.payload = payload_short;
724
725 test_connect();
726 test_publish(MQTT_QOS_0_AT_MOST_ONCE);
727 zassert_false(test_ctx.puback_handled, "MQTT client should not receive puback");
728 zassert_false(test_ctx.pubcomp_handled, "MQTT client should not receive pubcomp");
729 test_disconnect();
730 }
731
ZTEST(mqtt_client,test_mqtt_publish_qos1)732 ZTEST(mqtt_client, test_mqtt_publish_qos1)
733 {
734 test_ctx.payload = payload_short;
735
736 test_connect();
737 test_publish(MQTT_QOS_1_AT_LEAST_ONCE);
738 zassert_true(test_ctx.puback_handled, "MQTT client should receive puback");
739 zassert_false(test_ctx.pubcomp_handled, "MQTT client should not receive pubcomp");
740 test_disconnect();
741 }
742
ZTEST(mqtt_client,test_mqtt_publish_qos2)743 ZTEST(mqtt_client, test_mqtt_publish_qos2)
744 {
745 test_ctx.payload = payload_short;
746
747 test_connect();
748 test_publish(MQTT_QOS_2_EXACTLY_ONCE);
749 zassert_false(test_ctx.puback_handled, "MQTT client should not receive puback");
750 zassert_true(test_ctx.pubcomp_handled, "MQTT client should receive pubcomp");
751 test_disconnect();
752 }
753
ZTEST(mqtt_client,test_mqtt_subscribe)754 ZTEST(mqtt_client, test_mqtt_subscribe)
755 {
756 test_connect();
757 test_subscribe();
758 zassert_true(test_ctx.suback_handled, "MQTT client should receive suback");
759 zassert_str_equal(broker_topic, get_mqtt_topic(), "Invalid topic");
760 test_unsubscribe();
761 zassert_true(test_ctx.unsuback_handled, "MQTT client should receive unsuback");
762 zassert_str_equal(broker_topic, "", "Topic should be cleared now");
763 test_disconnect();
764 }
765
test_pubsub(const uint8_t * payload,enum mqtt_qos qos)766 static void test_pubsub(const uint8_t *payload, enum mqtt_qos qos)
767 {
768 int ret;
769
770 test_ctx.payload = payload;
771
772 test_connect();
773 test_subscribe();
774 test_publish(qos);
775
776 while (test_ctx.payload_left > 0) {
777 client_wait(false);
778 ret = mqtt_input(&client_ctx);
779 zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
780 }
781
782 zassert_true(test_ctx.publish_handled, "MQTT client should receive publish");
783
784 if (qos == MQTT_QOS_1_AT_LEAST_ONCE) {
785 broker_process(MQTT_PKT_TYPE_PUBACK);
786 }
787
788 test_unsubscribe();
789 test_disconnect();
790 }
791
ZTEST(mqtt_client,test_mqtt_pubsub_short)792 ZTEST(mqtt_client, test_mqtt_pubsub_short)
793 {
794 test_pubsub(payload_short, MQTT_QOS_0_AT_MOST_ONCE);
795 zassert_false(test_ctx.puback_handled, "MQTT client should not receive puback");
796 }
797
ZTEST(mqtt_client,test_mqtt_pubsub_long)798 ZTEST(mqtt_client, test_mqtt_pubsub_long)
799 {
800 test_pubsub(payload_long, MQTT_QOS_1_AT_LEAST_ONCE);
801 zassert_true(test_ctx.puback_handled, "MQTT client should receive puback");
802 }
803
mqtt_tests_before(void * fixture)804 static void mqtt_tests_before(void *fixture)
805 {
806 ARG_UNUSED(fixture);
807
808 memset(&test_ctx, 0, sizeof(test_ctx));
809 broker_init();
810 client_init(&client_ctx);
811 }
812
mqtt_tests_after(void * fixture)813 static void mqtt_tests_after(void *fixture)
814 {
815 ARG_UNUSED(fixture);
816
817 broker_destroy();
818 mqtt_abort(&client_ctx);
819 clear_client_fds();
820 /* Let the TCP workqueue release TCP contexts. */
821 k_msleep(10);
822 }
823
824 ZTEST_SUITE(mqtt_client, NULL, NULL, mqtt_tests_before, mqtt_tests_after, NULL);
825