1 /*
2  * Copyright (c) 2017 Intel Corporation
3  * Copyright (c) 2024 Nordic Semiconductor ASA
4  *
5  * SPDX-License-Identifier: Apache-2.0
6  */
7 
8 #include <zephyr/ztest.h>
9 #include <zephyr/misc/lorem_ipsum.h>
10 #include <zephyr/net/socket.h>
11 #include <zephyr/net/mqtt.h>
12 #include <zephyr/random/random.h>
13 
14 #include "mqtt_internal.h"
15 
16 #define SERVER_ADDR        "::1"
17 #define SERVER_PORT        1883
18 #define APP_SLEEP_MSECS    500
19 #define MQTT_CLIENTID      "zephyr_publisher"
20 #define BUFFER_SIZE        128
21 #define BROKER_BUFFER_SIZE 1500
22 #define TIMEOUT            100
23 
24 static uint8_t broker_buf[BROKER_BUFFER_SIZE];
25 static size_t broker_offset;
26 static uint8_t broker_topic[32];
27 static uint8_t rx_buffer[BUFFER_SIZE];
28 static uint8_t tx_buffer[BUFFER_SIZE];
29 static struct mqtt_client client_ctx;
30 static struct sockaddr broker;
31 int s_sock = -1, c_sock = -1;
32 static struct zsock_pollfd client_fds[1];
33 static int client_nfds;
34 
35 static struct mqtt_test_ctx {
36 	bool connected;
37 	bool ping_resp_handled;
38 	bool publish_handled;
39 	bool puback_handled;
40 	bool pubcomp_handled;
41 	bool suback_handled;
42 	bool unsuback_handled;
43 	uint16_t msg_id;
44 	int payload_left;
45 	const uint8_t *payload;
46 } test_ctx;
47 
48 static const uint8_t payload_short[] = "Short payload";
49 static const uint8_t payload_long[] = LOREM_IPSUM;
50 
51 static const uint8_t connect_ack_reply[] = {
52 	MQTT_PKT_TYPE_CONNACK, 0x02, 0, 0,
53 };
54 
55 static const uint8_t ping_resp_reply[] = {
56 	MQTT_PKT_TYPE_PINGRSP, 0,
57 };
58 
59 static const uint8_t puback_reply_template[] = {
60 	MQTT_PKT_TYPE_PUBACK, 0x02, 0, 0,
61 };
62 
63 static const uint8_t pubrec_reply_template[] = {
64 	MQTT_PKT_TYPE_PUBREC, 0x02, 0, 0,
65 };
66 
67 static const uint8_t pubcomp_reply_template[] = {
68 	MQTT_PKT_TYPE_PUBCOMP, 0x02, 0, 0,
69 };
70 
71 static const uint8_t suback_reply_template[] = {
72 	MQTT_PKT_TYPE_SUBACK, 0x03, 0, 0, 0x02,
73 };
74 
75 static const uint8_t unsuback_reply_template[] = {
76 	MQTT_PKT_TYPE_UNSUBACK, 0x02, 0, 0,
77 };
78 
get_mqtt_topic(void)79 static const char *get_mqtt_topic(void)
80 {
81 	return "sensors";
82 }
83 
prepare_client_fds(struct mqtt_client * client)84 static void prepare_client_fds(struct mqtt_client *client)
85 {
86 	client_fds[0].fd = client->transport.tcp.sock;
87 	client_fds[0].events = ZSOCK_POLLIN;
88 	client_nfds = 1;
89 }
90 
clear_client_fds(void)91 static void clear_client_fds(void)
92 {
93 	client_nfds = 0;
94 }
95 
client_wait(bool timeout_allowed)96 static void client_wait(bool timeout_allowed)
97 {
98 	int ret;
99 
100 	zassert_true(client_nfds > 0, "Client FDS should be set at this point");
101 	ret = zsock_poll(client_fds, client_nfds, TIMEOUT);
102 	if (timeout_allowed) {
103 		zassert_true(ret >= 0, "poll() error, (%d)", ret);
104 	} else {
105 		zassert_true(ret > 0, "poll() error, (%d)", ret);
106 	}
107 }
108 
broker_init(void)109 static void broker_init(void)
110 {
111 	struct sockaddr_in6 *broker6 = net_sin6(&broker);
112 	struct sockaddr_in6 bind_addr = {
113 		.sin6_family = AF_INET6,
114 		.sin6_port = htons(SERVER_PORT),
115 		.sin6_addr = IN6ADDR_ANY_INIT,
116 	};
117 	int reuseaddr = 1;
118 	int ret;
119 
120 	broker6->sin6_family = AF_INET6;
121 	broker6->sin6_port = htons(SERVER_PORT);
122 	zsock_inet_pton(AF_INET6, SERVER_ADDR, &broker6->sin6_addr);
123 
124 	memset(broker_topic, 0, sizeof(broker_topic));
125 	broker_offset = 0;
126 
127 	s_sock = zsock_socket(AF_INET6, SOCK_STREAM, IPPROTO_TCP);
128 	if (s_sock < 0) {
129 		printk("Failed to create server socket\n");
130 	}
131 
132 	ret = zsock_setsockopt(s_sock, SOL_SOCKET, SO_REUSEADDR, &reuseaddr,
133 			       sizeof(reuseaddr));
134 	if (ret < 0) {
135 		printk("Failed to set SO_REUSEADDR on server socket, %d\n", errno);
136 	}
137 
138 	ret = zsock_bind(s_sock, (struct sockaddr *)&bind_addr, sizeof(bind_addr));
139 	if (ret < 0) {
140 		printk("Failed to bind server socket, %d\n", errno);
141 	}
142 
143 	ret = zsock_listen(s_sock, 1);
144 	if (ret < 0) {
145 		printk("Failed to listen on server socket, %d\n", errno);
146 	}
147 }
148 
broker_destroy(void)149 static void broker_destroy(void)
150 {
151 	if (s_sock >= 0) {
152 		zsock_close(s_sock);
153 		s_sock = -1;
154 	}
155 
156 	if (c_sock >= 0) {
157 		zsock_close(c_sock);
158 		c_sock = -1;
159 	}
160 }
161 
162 #define LISTEN_SOCK_ID 0
163 #define CLIENT_SOCK_ID 1
164 
test_send_reply(const uint8_t * reply,size_t len)165 static void test_send_reply(const uint8_t *reply, size_t len)
166 {
167 	while (len) {
168 		ssize_t out_len = zsock_send(c_sock, reply, len, 0);
169 
170 		zassert_true(out_len > 0, "Broker send failed (%d)", -errno);
171 
172 		reply = reply + out_len;
173 		len -= out_len;
174 	}
175 }
176 
encode_fixed_hdr(uint8_t * buf,uint8_t type_flags,uint32_t length)177 static uint8_t encode_fixed_hdr(uint8_t *buf, uint8_t type_flags, uint32_t length)
178 {
179 	uint8_t bytes = 0U;
180 
181 	bytes++;
182 	*buf = type_flags;
183 	buf++;
184 
185 	do {
186 		bytes++;
187 		*buf = length & MQTT_LENGTH_VALUE_MASK;
188 		length >>= MQTT_LENGTH_SHIFT;
189 		if (length > 0) {
190 			*buf |= MQTT_LENGTH_CONTINUATION_BIT;
191 		}
192 		buf++;
193 	} while (length > 0);
194 
195 	return bytes;
196 }
197 
broker_validate_packet(uint8_t * buf,size_t length,uint8_t type,uint8_t flags)198 static void broker_validate_packet(uint8_t *buf, size_t length, uint8_t type,
199 				   uint8_t flags)
200 {
201 	switch (type) {
202 	case MQTT_PKT_TYPE_CONNECT: {
203 		test_send_reply(connect_ack_reply, sizeof(connect_ack_reply));
204 		break;
205 	}
206 	case MQTT_PKT_TYPE_PUBLISH: {
207 		uint8_t qos = (flags & MQTT_HEADER_QOS_MASK) >> 1;
208 		uint8_t reply_ack[sizeof(puback_reply_template)];
209 		uint16_t topic_len, var_len = 0;
210 		bool ack = false;
211 
212 		topic_len = sys_get_be16(buf);
213 
214 		if (qos == MQTT_QOS_0_AT_MOST_ONCE) {
215 			var_len = topic_len + 2;
216 		} else if (qos == MQTT_QOS_1_AT_LEAST_ONCE) {
217 			ack = true;
218 			var_len = topic_len + 4;
219 			memcpy(reply_ack, puback_reply_template, sizeof(reply_ack));
220 		} else if (qos == MQTT_QOS_2_EXACTLY_ONCE) {
221 			ack = true;
222 			var_len = topic_len + 4;
223 			memcpy(reply_ack, pubrec_reply_template, sizeof(reply_ack));
224 		} else {
225 			zassert_unreachable("Invalid qos received");
226 		}
227 
228 		zassert_equal(topic_len, strlen(get_mqtt_topic()), "Invalid topic length");
229 		zassert_mem_equal(buf + 2, get_mqtt_topic(), topic_len, "Invalid topic");
230 		zassert_equal(length - var_len, strlen(test_ctx.payload),
231 			      "Invalid payload length");
232 		zassert_mem_equal(buf + var_len, test_ctx.payload,
233 				  strlen(test_ctx.payload), "Invalid payload");
234 
235 		if (ack) {
236 			/* Copy packet ID. */
237 			memcpy(reply_ack + 2, buf + topic_len + 2, 2);
238 			test_send_reply(reply_ack, sizeof(reply_ack));
239 		}
240 
241 		if (topic_len == strlen(broker_topic) &&
242 		    memcmp(buf + 2, broker_topic, topic_len) == 0) {
243 			uint8_t fixed_hdr[MQTT_FIXED_HEADER_MAX_SIZE];
244 			uint8_t hdr_len = encode_fixed_hdr(
245 				fixed_hdr, MQTT_PKT_TYPE_PUBLISH | flags, length);
246 
247 			/* Send publish back */
248 			test_send_reply(fixed_hdr, hdr_len);
249 			test_send_reply(buf, length);
250 		}
251 
252 		break;
253 	}
254 	case MQTT_PKT_TYPE_PUBACK: {
255 		uint16_t message_id;
256 
257 		zassert_equal(length, 2, "Invalid PUBACK length");
258 
259 		message_id = sys_get_be16(buf);
260 		zassert_equal(message_id, test_ctx.msg_id,
261 			      "Invalid packet ID received.");
262 		break;
263 	}
264 	case MQTT_PKT_TYPE_PUBREL: {
265 		uint8_t reply[sizeof(pubcomp_reply_template)];
266 
267 		memcpy(reply, pubcomp_reply_template, sizeof(reply));
268 		memcpy(reply + 2, buf, 2);
269 		test_send_reply(reply, sizeof(reply));
270 
271 		break;
272 	}
273 	case MQTT_PKT_TYPE_SUBSCRIBE: {
274 		uint16_t topic_len;
275 		uint8_t reply[sizeof(suback_reply_template)];
276 
277 		topic_len = sys_get_be16(buf + 2);
278 		zassert_true(topic_len <= length - 5, "Invalid topic length");
279 		zassert_true(topic_len < sizeof(broker_topic),
280 			     "Topic length too long to handle");
281 		memcpy(broker_topic, buf + 4, topic_len);
282 		broker_topic[topic_len] = '\0';
283 
284 		memcpy(reply, suback_reply_template, sizeof(suback_reply_template));
285 		/* Copy packet ID. */
286 		memcpy(reply + 2, buf, 2);
287 		test_send_reply(reply, sizeof(reply));
288 
289 		break;
290 	}
291 	case MQTT_PKT_TYPE_UNSUBSCRIBE: {
292 		uint16_t topic_len;
293 		uint8_t reply[sizeof(unsuback_reply_template)];
294 
295 		topic_len = sys_get_be16(buf + 2);
296 		zassert_true(topic_len <= length - 4, "Invalid topic length");
297 		zassert_mem_equal(broker_topic, buf + 4, topic_len,
298 			     "Invalid topic received");
299 		memset(broker_topic, 0, sizeof(broker_topic));
300 
301 		memcpy(reply, unsuback_reply_template, sizeof(unsuback_reply_template));
302 		/* Copy packet ID. */
303 		memcpy(reply + 2, buf, 2);
304 		test_send_reply(reply, sizeof(reply));
305 
306 		break;
307 	}
308 	case MQTT_PKT_TYPE_PINGREQ: {
309 		test_send_reply(ping_resp_reply, sizeof(ping_resp_reply));
310 		break;
311 	}
312 	case MQTT_PKT_TYPE_DISCONNECT: {
313 		zsock_close(c_sock);
314 		c_sock = -1;
315 		break;
316 	}
317 	default:
318 		zassert_true(false, "Not yet supported (%02x)", type);
319 	}
320 }
321 
broker_receive(uint8_t expected_packet)322 static int broker_receive(uint8_t expected_packet)
323 {
324 	struct buf_ctx buf;
325 	size_t bytes_consumed = 0;
326 	uint8_t type_and_flags, type, flags;
327 	uint32_t length;
328 	int ret;
329 
330 	zassert_false(broker_offset == sizeof(broker_buf), "Cannot fit full payload!");
331 
332 	ret = zsock_recv(c_sock, broker_buf + broker_offset,
333 			 sizeof(broker_buf) - broker_offset,
334 			 ZSOCK_MSG_DONTWAIT);
335 
336 	if (ret == -1 && errno == EAGAIN) {
337 		/* EAGAIN expected only if there already was data in the buffer. */
338 		zassert_true(broker_offset > 0, "Unexpected EAGAIN in broker");
339 	} else {
340 		zassert_true(ret > 0, "Broker receive failed (%d)", -errno);
341 		broker_offset += ret;
342 	}
343 
344 	if (broker_offset < MQTT_FIXED_HEADER_MIN_SIZE) {
345 		return -EAGAIN;
346 	}
347 
348 	buf.cur = broker_buf;
349 	buf.end = broker_buf + broker_offset;
350 
351 	ret = fixed_header_decode(&buf, &type_and_flags, &length);
352 	if (ret == -EAGAIN) {
353 		return ret;
354 	}
355 
356 	zassert_ok(ret, "Failed to decode fixed header (%d)", ret);
357 
358 	if (length > buf.end - buf.cur) {
359 		return -EAGAIN;
360 	}
361 
362 	bytes_consumed += buf.cur - broker_buf;
363 	bytes_consumed += length;
364 
365 	type = type_and_flags & 0xF0;
366 	flags = type_and_flags & 0x0F;
367 	zassert_equal(type, expected_packet,
368 		      "Unexpected packet type received at the broker, (%02x)",
369 		      type);
370 
371 	broker_validate_packet(buf.cur, length, type, flags);
372 
373 	broker_offset -= bytes_consumed;
374 	if (broker_offset > 0) {
375 		memmove(broker_buf, broker_buf + bytes_consumed,
376 			broker_offset);
377 	}
378 
379 	return 0;
380 }
381 
broker_process(uint8_t expected_packet)382 static void broker_process(uint8_t expected_packet)
383 {
384 	struct zsock_pollfd fds[2] = {
385 		{ s_sock, ZSOCK_POLLIN, 0},
386 		{ c_sock, ZSOCK_POLLIN, 0},
387 	};
388 	int ret;
389 
390 	if (c_sock >= 0 && broker_offset > 0) {
391 		ret = broker_receive(expected_packet);
392 		if (ret == 0) {
393 			goto out;
394 		}
395 	}
396 
397 	while (true) {
398 		ret = zsock_poll(fds, ARRAY_SIZE(fds), TIMEOUT);
399 		zassert_true(ret > 0, "Unexpected timeout on poll");
400 
401 		for (int i = 0; i < ARRAY_SIZE(fds); i++) {
402 			if (fds[i].fd < 0) {
403 				continue;
404 			}
405 
406 			zassert_false((fds[i].revents & ZSOCK_POLLERR) ||
407 				      (fds[i].revents & ZSOCK_POLLHUP) ||
408 				      (fds[i].revents & ZSOCK_POLLNVAL),
409 				      "Unexpected poll event, (%02x)",
410 				      fds[i].revents);
411 
412 			if (!(fds[i].revents & ZSOCK_POLLIN)) {
413 				continue;
414 			}
415 
416 			if (i == LISTEN_SOCK_ID) {
417 				zassert_equal(c_sock, -1, "Client already connected");
418 				ret = zsock_accept(s_sock, NULL, NULL);
419 				zassert_true(ret >= 0, "Accept failed (%d)", -errno);
420 
421 				c_sock = ret;
422 				fds[CLIENT_SOCK_ID].fd = c_sock;
423 			} else {
424 				ret = broker_receive(expected_packet);
425 				if (ret == 0) {
426 					goto out;
427 				}
428 			}
429 		}
430 	}
431 
432 out:
433 	return;
434 }
435 
publish_handler(struct mqtt_client * const client,const struct mqtt_evt * evt)436 static void publish_handler(struct mqtt_client *const client,
437 			    const struct mqtt_evt *evt)
438 {
439 	int ret;
440 	static uint8_t buf[sizeof(payload_long)];
441 
442 	zassert_equal(evt->result, 0, "MQTT PUBLISH error: %d", evt->result);
443 	zassert_equal(test_ctx.payload_left,
444 		      evt->param.publish.message.payload.len,
445 		      "Invalid payload length: %d",
446 		      evt->param.publish.message.payload.len);
447 
448 	ret = mqtt_readall_publish_payload(client, buf, test_ctx.payload_left);
449 	zassert_ok(ret, "Error while reading publish payload (%d)", ret);
450 	zassert_mem_equal(test_ctx.payload, buf,
451 			  evt->param.publish.message.payload.len,
452 			  "Invalid payload content");
453 
454 	test_ctx.payload_left = 0;
455 	test_ctx.publish_handled = true;
456 }
457 
mqtt_evt_handler(struct mqtt_client * const client,const struct mqtt_evt * evt)458 static void mqtt_evt_handler(struct mqtt_client *const client,
459 			     const struct mqtt_evt *evt)
460 {
461 	int ret;
462 
463 	switch (evt->type) {
464 	case MQTT_EVT_CONNACK:
465 		zassert_ok(evt->result, "MQTT connect failed %d", evt->result);
466 		test_ctx.connected = true;
467 		break;
468 
469 	case MQTT_EVT_DISCONNECT:
470 		test_ctx.connected = false;
471 		test_ctx.payload_left = -1;
472 
473 		break;
474 
475 	case MQTT_EVT_PUBLISH:
476 		publish_handler(client, evt);
477 
478 		if (evt->param.publish.message.topic.qos == MQTT_QOS_1_AT_LEAST_ONCE) {
479 			const struct mqtt_puback_param ack = {
480 				.message_id = evt->param.publish.message_id,
481 			};
482 
483 			ret = mqtt_publish_qos1_ack(client, &ack);
484 			zassert_ok(ret, "Failed to send MQTT PUBACK (%d)", ret);
485 		}
486 
487 		if (evt->param.publish.message.topic.qos == MQTT_QOS_2_EXACTLY_ONCE) {
488 			const struct mqtt_pubrec_param ack = {
489 				.message_id = evt->param.publish.message_id,
490 			};
491 
492 			ret = mqtt_publish_qos2_receive(client, &ack);
493 			zassert_ok(ret, "Failed to send MQTT PUBREC (%d)", ret);
494 		}
495 
496 		break;
497 
498 	case MQTT_EVT_PUBACK:
499 		zassert_ok(evt->result, "MQTT PUBACK error %d", evt->result);
500 		zassert_equal(evt->param.puback.message_id, test_ctx.msg_id,
501 			      "Invalid packet ID received.");
502 		test_ctx.puback_handled = true;
503 
504 		break;
505 
506 	case MQTT_EVT_PUBREC: {
507 		const struct mqtt_pubrel_param rel_param = {
508 			.message_id = evt->param.pubrec.message_id
509 		};
510 
511 		zassert_ok(evt->result, "MQTT PUBREC error %d", evt->result);
512 		zassert_equal(evt->param.pubrec.message_id, test_ctx.msg_id,
513 			      "Invalid packet ID received.");
514 
515 		ret = mqtt_publish_qos2_release(client, &rel_param);
516 		zassert_ok(ret, "Failed to send MQTT PUBREL: %d", ret);
517 
518 		break;
519 	}
520 
521 	case MQTT_EVT_PUBCOMP:
522 		zassert_ok(evt->result, "MQTT PUBCOMP error %d", evt->result);
523 		zassert_equal(evt->param.pubcomp.message_id, test_ctx.msg_id,
524 			      "Invalid packet ID received.");
525 		test_ctx.pubcomp_handled = true;
526 
527 		break;
528 
529 	case MQTT_EVT_SUBACK:
530 		zassert_ok(evt->result, "MQTT SUBACK error %d", evt->result);
531 		zassert_equal(evt->param.suback.message_id, test_ctx.msg_id,
532 			      "Invalid packet ID received.");
533 		test_ctx.suback_handled = true;
534 
535 		break;
536 
537 	case MQTT_EVT_UNSUBACK:
538 		zassert_ok(evt->result, "MQTT UNSUBACK error %d", evt->result);
539 		zassert_equal(evt->param.unsuback.message_id, test_ctx.msg_id,
540 			      "Invalid packet ID received.");
541 
542 		test_ctx.unsuback_handled = true;
543 
544 		break;
545 
546 	case MQTT_EVT_PINGRESP:
547 		test_ctx.ping_resp_handled = true;
548 		break;
549 
550 	default:
551 		zassert_unreachable("Invalid MQTT packet");
552 		break;
553 	}
554 }
555 
client_init(struct mqtt_client * client)556 static void client_init(struct mqtt_client *client)
557 {
558 	mqtt_client_init(client);
559 
560 	/* MQTT client configuration */
561 	client->broker = &broker;
562 	client->evt_cb = mqtt_evt_handler;
563 	client->client_id.utf8 = (uint8_t *)MQTT_CLIENTID;
564 	client->client_id.size = strlen(MQTT_CLIENTID);
565 	client->password = NULL;
566 	client->user_name = NULL;
567 	client->protocol_version = MQTT_VERSION_3_1_1;
568 	client->transport.type = MQTT_TRANSPORT_NON_SECURE;
569 	client->clean_session = true;
570 
571 	client->rx_buf = rx_buffer;
572 	client->rx_buf_size = sizeof(rx_buffer);
573 	client->tx_buf = tx_buffer;
574 	client->tx_buf_size = sizeof(tx_buffer);
575 }
576 
test_connect(void)577 static void test_connect(void)
578 {
579 	int ret;
580 
581 	ret = mqtt_connect(&client_ctx);
582 	zassert_ok(ret, "MQTT client failed to connect (%d)", ret);
583 	broker_process(MQTT_PKT_TYPE_CONNECT);
584 	prepare_client_fds(&client_ctx);
585 
586 	client_wait(false);
587 	ret = mqtt_input(&client_ctx);
588 	zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
589 }
590 
test_pingreq(void)591 static void test_pingreq(void)
592 {
593 	int ret;
594 
595 	ret = mqtt_ping(&client_ctx);
596 	zassert_ok(ret, "MQTT client failed to send ping (%d)", ret);
597 	broker_process(MQTT_PKT_TYPE_PINGREQ);
598 
599 	client_wait(false);
600 	ret = mqtt_input(&client_ctx);
601 	zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
602 }
603 
test_publish(enum mqtt_qos qos)604 static void test_publish(enum mqtt_qos qos)
605 {
606 	int ret;
607 	struct mqtt_publish_param param;
608 
609 	test_ctx.payload_left = strlen(test_ctx.payload);
610 	while (test_ctx.msg_id == 0) {
611 		test_ctx.msg_id = sys_rand16_get();
612 	}
613 
614 	param.message.topic.qos = qos;
615 	param.message.topic.topic.utf8 = (uint8_t *)get_mqtt_topic();
616 	param.message.topic.topic.size =
617 			strlen(param.message.topic.topic.utf8);
618 	param.message.payload.data = (uint8_t *)test_ctx.payload;
619 	param.message.payload.len = test_ctx.payload_left;
620 	param.message_id = test_ctx.msg_id;
621 	param.dup_flag = 0U;
622 	param.retain_flag = 0U;
623 
624 	ret = mqtt_publish(&client_ctx, &param);
625 	zassert_ok(ret, "MQTT client failed to publish (%d)", ret);
626 	broker_process(MQTT_PKT_TYPE_PUBLISH);
627 
628 	client_wait(true);
629 	ret = mqtt_input(&client_ctx);
630 	zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
631 
632 	/* Second input handle for expected Publish Complete response. */
633 	if (qos == MQTT_QOS_2_EXACTLY_ONCE) {
634 		broker_process(MQTT_PKT_TYPE_PUBREL);
635 		client_wait(false);
636 		ret = mqtt_input(&client_ctx);
637 		zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
638 	}
639 }
640 
test_subscribe(void)641 static void test_subscribe(void)
642 {
643 	int ret;
644 	struct mqtt_topic topic;
645 	struct mqtt_subscription_list sub;
646 
647 	while (test_ctx.msg_id == 0) {
648 		test_ctx.msg_id = sys_rand16_get();
649 	}
650 
651 	topic.topic.utf8 = get_mqtt_topic();
652 	topic.topic.size = strlen(topic.topic.utf8);
653 	topic.qos = MQTT_QOS_2_EXACTLY_ONCE;
654 	sub.list = &topic;
655 	sub.list_count = 1U;
656 	sub.message_id = test_ctx.msg_id;
657 
658 	ret = mqtt_subscribe(&client_ctx, &sub);
659 	zassert_ok(ret, "MQTT client failed to subscribe (%d)", ret);
660 	broker_process(MQTT_PKT_TYPE_SUBSCRIBE);
661 
662 	client_wait(false);
663 	ret = mqtt_input(&client_ctx);
664 	zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
665 }
666 
test_unsubscribe(void)667 static void test_unsubscribe(void)
668 {
669 	int ret;
670 	struct mqtt_topic topic;
671 	struct mqtt_subscription_list unsub;
672 
673 	while (test_ctx.msg_id == 0) {
674 		test_ctx.msg_id = sys_rand16_get();
675 	}
676 
677 	topic.topic.utf8 = get_mqtt_topic();
678 	topic.topic.size = strlen(topic.topic.utf8);
679 	unsub.list = &topic;
680 	unsub.list_count = 1U;
681 	unsub.message_id = test_ctx.msg_id;
682 
683 	ret = mqtt_unsubscribe(&client_ctx, &unsub);
684 	zassert_ok(ret, "MQTT client failed to unsubscribe (%d)", ret);
685 	broker_process(MQTT_PKT_TYPE_UNSUBSCRIBE);
686 
687 	client_wait(false);
688 	ret = mqtt_input(&client_ctx);
689 	zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
690 }
691 
test_disconnect(void)692 static void test_disconnect(void)
693 {
694 	int ret;
695 
696 	ret = mqtt_disconnect(&client_ctx);
697 	zassert_ok(ret, "MQTT client failed to disconnect (%d)", ret);
698 	broker_process(MQTT_PKT_TYPE_DISCONNECT);
699 
700 	client_wait(false);
701 	ret = mqtt_input(&client_ctx);
702 	zassert_equal(ret, -ENOTCONN, "Client should no longer be connected");
703 }
704 
ZTEST(mqtt_client,test_mqtt_connect)705 ZTEST(mqtt_client, test_mqtt_connect)
706 {
707 	test_connect();
708 	zassert_true(test_ctx.connected, "MQTT client should be connected");
709 	test_disconnect();
710 	zassert_false(test_ctx.connected, "MQTT client should be disconnected");
711 }
712 
ZTEST(mqtt_client,test_mqtt_ping)713 ZTEST(mqtt_client, test_mqtt_ping)
714 {
715 	test_connect();
716 	test_pingreq();
717 	zassert_true(test_ctx.ping_resp_handled, "MQTT client should handle ping response");
718 	test_disconnect();
719 }
720 
ZTEST(mqtt_client,test_mqtt_publish_qos0)721 ZTEST(mqtt_client, test_mqtt_publish_qos0)
722 {
723 	test_ctx.payload = payload_short;
724 
725 	test_connect();
726 	test_publish(MQTT_QOS_0_AT_MOST_ONCE);
727 	zassert_false(test_ctx.puback_handled, "MQTT client should not receive puback");
728 	zassert_false(test_ctx.pubcomp_handled, "MQTT client should not receive pubcomp");
729 	test_disconnect();
730 }
731 
ZTEST(mqtt_client,test_mqtt_publish_qos1)732 ZTEST(mqtt_client, test_mqtt_publish_qos1)
733 {
734 	test_ctx.payload = payload_short;
735 
736 	test_connect();
737 	test_publish(MQTT_QOS_1_AT_LEAST_ONCE);
738 	zassert_true(test_ctx.puback_handled, "MQTT client should receive puback");
739 	zassert_false(test_ctx.pubcomp_handled, "MQTT client should not receive pubcomp");
740 	test_disconnect();
741 }
742 
ZTEST(mqtt_client,test_mqtt_publish_qos2)743 ZTEST(mqtt_client, test_mqtt_publish_qos2)
744 {
745 	test_ctx.payload = payload_short;
746 
747 	test_connect();
748 	test_publish(MQTT_QOS_2_EXACTLY_ONCE);
749 	zassert_false(test_ctx.puback_handled, "MQTT client should not receive puback");
750 	zassert_true(test_ctx.pubcomp_handled, "MQTT client should receive pubcomp");
751 	test_disconnect();
752 }
753 
ZTEST(mqtt_client,test_mqtt_subscribe)754 ZTEST(mqtt_client, test_mqtt_subscribe)
755 {
756 	test_connect();
757 	test_subscribe();
758 	zassert_true(test_ctx.suback_handled, "MQTT client should receive suback");
759 	zassert_str_equal(broker_topic, get_mqtt_topic(), "Invalid topic");
760 	test_unsubscribe();
761 	zassert_true(test_ctx.unsuback_handled, "MQTT client should receive unsuback");
762 	zassert_str_equal(broker_topic, "", "Topic should be cleared now");
763 	test_disconnect();
764 }
765 
test_pubsub(const uint8_t * payload,enum mqtt_qos qos)766 static void test_pubsub(const uint8_t *payload, enum mqtt_qos qos)
767 {
768 	int ret;
769 
770 	test_ctx.payload = payload;
771 
772 	test_connect();
773 	test_subscribe();
774 	test_publish(qos);
775 
776 	while (test_ctx.payload_left > 0) {
777 		client_wait(false);
778 		ret = mqtt_input(&client_ctx);
779 		zassert_ok(ret, "MQTT client input processing failed (%d)", ret);
780 	}
781 
782 	zassert_true(test_ctx.publish_handled, "MQTT client should receive publish");
783 
784 	if (qos == MQTT_QOS_1_AT_LEAST_ONCE) {
785 		broker_process(MQTT_PKT_TYPE_PUBACK);
786 	}
787 
788 	test_unsubscribe();
789 	test_disconnect();
790 }
791 
ZTEST(mqtt_client,test_mqtt_pubsub_short)792 ZTEST(mqtt_client, test_mqtt_pubsub_short)
793 {
794 	test_pubsub(payload_short, MQTT_QOS_0_AT_MOST_ONCE);
795 	zassert_false(test_ctx.puback_handled, "MQTT client should not receive puback");
796 }
797 
ZTEST(mqtt_client,test_mqtt_pubsub_long)798 ZTEST(mqtt_client, test_mqtt_pubsub_long)
799 {
800 	test_pubsub(payload_long, MQTT_QOS_1_AT_LEAST_ONCE);
801 	zassert_true(test_ctx.puback_handled, "MQTT client should receive puback");
802 }
803 
mqtt_tests_before(void * fixture)804 static void mqtt_tests_before(void *fixture)
805 {
806 	ARG_UNUSED(fixture);
807 
808 	memset(&test_ctx, 0, sizeof(test_ctx));
809 	broker_init();
810 	client_init(&client_ctx);
811 }
812 
mqtt_tests_after(void * fixture)813 static void mqtt_tests_after(void *fixture)
814 {
815 	ARG_UNUSED(fixture);
816 
817 	broker_destroy();
818 	mqtt_abort(&client_ctx);
819 	clear_client_fds();
820 	/* Let the TCP workqueue release TCP contexts. */
821 	k_msleep(10);
822 }
823 
824 ZTEST_SUITE(mqtt_client, NULL, NULL, mqtt_tests_before, mqtt_tests_after, NULL);
825