1 /*
2  * Copyright (c) 2017 Intel Corporation
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/logging/log.h>
8 LOG_MODULE_REGISTER(net_mqtt_publisher_sample, LOG_LEVEL_DBG);
9 
10 #include <zephyr/kernel.h>
11 #include <zephyr/net/socket.h>
12 #include <zephyr/net/mqtt.h>
13 #include <zephyr/random/random.h>
14 
15 #include <string.h>
16 #include <errno.h>
17 
18 #include "config.h"
19 #include "net_sample_common.h"
20 
21 #if defined(CONFIG_USERSPACE)
22 #include <zephyr/app_memory/app_memdomain.h>
23 K_APPMEM_PARTITION_DEFINE(app_partition);
24 struct k_mem_domain app_domain;
25 #define APP_BMEM K_APP_BMEM(app_partition)
26 #define APP_DMEM K_APP_DMEM(app_partition)
27 #else
28 #define APP_BMEM
29 #define APP_DMEM
30 #endif
31 
32 /* Buffers for MQTT client. */
33 static APP_BMEM uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
34 static APP_BMEM uint8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
35 
36 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
37 /* Making RX buffer large enough that the full IPv6 packet can fit into it */
38 #define MQTT_LIB_WEBSOCKET_RECV_BUF_LEN 1280
39 
40 /* Websocket needs temporary buffer to store partial packets */
41 static APP_BMEM uint8_t temp_ws_rx_buf[MQTT_LIB_WEBSOCKET_RECV_BUF_LEN];
42 #endif
43 
44 /* The mqtt client struct */
45 static APP_BMEM struct mqtt_client client_ctx;
46 
47 /* MQTT Broker details. */
48 static APP_BMEM struct sockaddr_storage broker;
49 
50 #if defined(CONFIG_SOCKS)
51 static APP_BMEM struct sockaddr socks5_proxy;
52 #endif
53 
54 static APP_BMEM struct pollfd fds[1];
55 static APP_BMEM int nfds;
56 
57 static APP_BMEM bool connected;
58 
59 #if defined(CONFIG_MQTT_LIB_TLS)
60 
61 #include "test_certs.h"
62 
63 #define TLS_SNI_HOSTNAME "localhost"
64 #define APP_CA_CERT_TAG 1
65 #define APP_PSK_TAG 2
66 
67 static APP_DMEM sec_tag_t m_sec_tags[] = {
68 #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
69 		APP_CA_CERT_TAG,
70 #endif
71 #if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED)
72 		APP_PSK_TAG,
73 #endif
74 };
75 
tls_init(void)76 static int tls_init(void)
77 {
78 	int err = -EINVAL;
79 
80 #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
81 	err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
82 				 ca_certificate, sizeof(ca_certificate));
83 	if (err < 0) {
84 		LOG_ERR("Failed to register public certificate: %d", err);
85 		return err;
86 	}
87 #endif
88 
89 #if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED)
90 	err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK,
91 				 client_psk, sizeof(client_psk));
92 	if (err < 0) {
93 		LOG_ERR("Failed to register PSK: %d", err);
94 		return err;
95 	}
96 
97 	err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK_ID,
98 				 client_psk_id, sizeof(client_psk_id) - 1);
99 	if (err < 0) {
100 		LOG_ERR("Failed to register PSK ID: %d", err);
101 	}
102 #endif
103 
104 	return err;
105 }
106 
107 #endif /* CONFIG_MQTT_LIB_TLS */
108 
prepare_fds(struct mqtt_client * client)109 static void prepare_fds(struct mqtt_client *client)
110 {
111 	if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
112 		fds[0].fd = client->transport.tcp.sock;
113 	}
114 #if defined(CONFIG_MQTT_LIB_TLS)
115 	else if (client->transport.type == MQTT_TRANSPORT_SECURE) {
116 		fds[0].fd = client->transport.tls.sock;
117 	}
118 #endif
119 
120 	fds[0].events = POLLIN;
121 	nfds = 1;
122 }
123 
clear_fds(void)124 static void clear_fds(void)
125 {
126 	nfds = 0;
127 }
128 
wait(int timeout)129 static int wait(int timeout)
130 {
131 	int ret = 0;
132 
133 	if (nfds > 0) {
134 		ret = poll(fds, nfds, timeout);
135 		if (ret < 0) {
136 			LOG_ERR("poll error: %d", errno);
137 		}
138 	}
139 
140 	return ret;
141 }
142 
mqtt_evt_handler(struct mqtt_client * const client,const struct mqtt_evt * evt)143 void mqtt_evt_handler(struct mqtt_client *const client,
144 		      const struct mqtt_evt *evt)
145 {
146 	int err;
147 
148 	switch (evt->type) {
149 	case MQTT_EVT_CONNACK:
150 		if (evt->result != 0) {
151 			LOG_ERR("MQTT connect failed %d", evt->result);
152 			break;
153 		}
154 
155 		connected = true;
156 		LOG_INF("MQTT client connected!");
157 
158 		break;
159 
160 	case MQTT_EVT_DISCONNECT:
161 		LOG_INF("MQTT client disconnected %d", evt->result);
162 
163 		connected = false;
164 		clear_fds();
165 
166 		break;
167 
168 	case MQTT_EVT_PUBACK:
169 		if (evt->result != 0) {
170 			LOG_ERR("MQTT PUBACK error %d", evt->result);
171 			break;
172 		}
173 
174 		LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id);
175 
176 		break;
177 
178 	case MQTT_EVT_PUBREC:
179 		if (evt->result != 0) {
180 			LOG_ERR("MQTT PUBREC error %d", evt->result);
181 			break;
182 		}
183 
184 		LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id);
185 
186 		const struct mqtt_pubrel_param rel_param = {
187 			.message_id = evt->param.pubrec.message_id
188 		};
189 
190 		err = mqtt_publish_qos2_release(client, &rel_param);
191 		if (err != 0) {
192 			LOG_ERR("Failed to send MQTT PUBREL: %d", err);
193 		}
194 
195 		break;
196 
197 	case MQTT_EVT_PUBCOMP:
198 		if (evt->result != 0) {
199 			LOG_ERR("MQTT PUBCOMP error %d", evt->result);
200 			break;
201 		}
202 
203 		LOG_INF("PUBCOMP packet id: %u",
204 			evt->param.pubcomp.message_id);
205 
206 		break;
207 
208 	case MQTT_EVT_PINGRESP:
209 		LOG_INF("PINGRESP packet");
210 		break;
211 
212 	default:
213 		break;
214 	}
215 }
216 
get_mqtt_payload(enum mqtt_qos qos)217 static char *get_mqtt_payload(enum mqtt_qos qos)
218 {
219 #if APP_BLUEMIX_TOPIC
220 	static APP_BMEM char payload[30];
221 
222 	snprintk(payload, sizeof(payload), "{d:{temperature:%d}}",
223 		 sys_rand8_get());
224 #else
225 	static APP_DMEM char payload[] = "DOORS:OPEN_QoSx";
226 
227 	payload[strlen(payload) - 1] = '0' + qos;
228 #endif
229 
230 	return payload;
231 }
232 
get_mqtt_topic(void)233 static char *get_mqtt_topic(void)
234 {
235 #if APP_BLUEMIX_TOPIC
236 	return "iot-2/type/"BLUEMIX_DEVTYPE"/id/"BLUEMIX_DEVID
237 	       "/evt/"BLUEMIX_EVENT"/fmt/"BLUEMIX_FORMAT;
238 #else
239 	return "sensors";
240 #endif
241 }
242 
publish(struct mqtt_client * client,enum mqtt_qos qos)243 static int publish(struct mqtt_client *client, enum mqtt_qos qos)
244 {
245 	struct mqtt_publish_param param;
246 
247 	param.message.topic.qos = qos;
248 	param.message.topic.topic.utf8 = (uint8_t *)get_mqtt_topic();
249 	param.message.topic.topic.size =
250 			strlen(param.message.topic.topic.utf8);
251 	param.message.payload.data = get_mqtt_payload(qos);
252 	param.message.payload.len =
253 			strlen(param.message.payload.data);
254 	param.message_id = sys_rand16_get();
255 	param.dup_flag = 0U;
256 	param.retain_flag = 0U;
257 
258 	return mqtt_publish(client, &param);
259 }
260 
261 #define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR")
262 
263 #define PRINT_RESULT(func, rc) \
264 	LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc))
265 
broker_init(void)266 static void broker_init(void)
267 {
268 #if defined(CONFIG_NET_IPV6)
269 	struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker;
270 
271 	broker6->sin6_family = AF_INET6;
272 	broker6->sin6_port = htons(SERVER_PORT);
273 	inet_pton(AF_INET6, SERVER_ADDR, &broker6->sin6_addr);
274 
275 #if defined(CONFIG_SOCKS)
276 	struct sockaddr_in6 *proxy6 = (struct sockaddr_in6 *)&socks5_proxy;
277 
278 	proxy6->sin6_family = AF_INET6;
279 	proxy6->sin6_port = htons(SOCKS5_PROXY_PORT);
280 	inet_pton(AF_INET6, SOCKS5_PROXY_ADDR, &proxy6->sin6_addr);
281 #endif
282 #else
283 	struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;
284 
285 	broker4->sin_family = AF_INET;
286 	broker4->sin_port = htons(SERVER_PORT);
287 	inet_pton(AF_INET, SERVER_ADDR, &broker4->sin_addr);
288 #if defined(CONFIG_SOCKS)
289 	struct sockaddr_in *proxy4 = (struct sockaddr_in *)&socks5_proxy;
290 
291 	proxy4->sin_family = AF_INET;
292 	proxy4->sin_port = htons(SOCKS5_PROXY_PORT);
293 	inet_pton(AF_INET, SOCKS5_PROXY_ADDR, &proxy4->sin_addr);
294 #endif
295 #endif
296 }
297 
client_init(struct mqtt_client * client)298 static void client_init(struct mqtt_client *client)
299 {
300 	mqtt_client_init(client);
301 
302 	broker_init();
303 
304 	/* MQTT client configuration */
305 	client->broker = &broker;
306 	client->evt_cb = mqtt_evt_handler;
307 	client->client_id.utf8 = (uint8_t *)MQTT_CLIENTID;
308 	client->client_id.size = strlen(MQTT_CLIENTID);
309 	client->password = NULL;
310 	client->user_name = NULL;
311 	client->protocol_version = MQTT_VERSION_3_1_1;
312 
313 	/* MQTT buffers configuration */
314 	client->rx_buf = rx_buffer;
315 	client->rx_buf_size = sizeof(rx_buffer);
316 	client->tx_buf = tx_buffer;
317 	client->tx_buf_size = sizeof(tx_buffer);
318 
319 	/* MQTT transport configuration */
320 #if defined(CONFIG_MQTT_LIB_TLS)
321 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
322 	client->transport.type = MQTT_TRANSPORT_SECURE_WEBSOCKET;
323 #else
324 	client->transport.type = MQTT_TRANSPORT_SECURE;
325 #endif
326 
327 	struct mqtt_sec_config *tls_config = &client->transport.tls.config;
328 
329 	tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
330 	tls_config->cipher_list = NULL;
331 	tls_config->sec_tag_list = m_sec_tags;
332 	tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
333 #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
334 	tls_config->hostname = TLS_SNI_HOSTNAME;
335 #else
336 	tls_config->hostname = NULL;
337 #endif
338 
339 #else
340 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
341 	client->transport.type = MQTT_TRANSPORT_NON_SECURE_WEBSOCKET;
342 #else
343 	client->transport.type = MQTT_TRANSPORT_NON_SECURE;
344 #endif
345 #endif
346 
347 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
348 	client->transport.websocket.config.host = SERVER_ADDR;
349 	client->transport.websocket.config.url = "/mqtt";
350 	client->transport.websocket.config.tmp_buf = temp_ws_rx_buf;
351 	client->transport.websocket.config.tmp_buf_len =
352 						sizeof(temp_ws_rx_buf);
353 	client->transport.websocket.timeout = 5 * MSEC_PER_SEC;
354 #endif
355 
356 #if defined(CONFIG_SOCKS)
357 	mqtt_client_set_proxy(client, &socks5_proxy,
358 			      socks5_proxy.sa_family == AF_INET ?
359 			      sizeof(struct sockaddr_in) :
360 			      sizeof(struct sockaddr_in6));
361 #endif
362 }
363 
364 /* In this routine we block until the connected variable is 1 */
try_to_connect(struct mqtt_client * client)365 static int try_to_connect(struct mqtt_client *client)
366 {
367 	int rc, i = 0;
368 
369 	while (i++ < APP_CONNECT_TRIES && !connected) {
370 
371 		client_init(client);
372 
373 		rc = mqtt_connect(client);
374 		if (rc != 0) {
375 			PRINT_RESULT("mqtt_connect", rc);
376 			k_sleep(K_MSEC(APP_SLEEP_MSECS));
377 			continue;
378 		}
379 
380 		prepare_fds(client);
381 
382 		if (wait(APP_CONNECT_TIMEOUT_MS)) {
383 			mqtt_input(client);
384 		}
385 
386 		if (!connected) {
387 			mqtt_abort(client);
388 		}
389 	}
390 
391 	if (connected) {
392 		return 0;
393 	}
394 
395 	return -EINVAL;
396 }
397 
process_mqtt_and_sleep(struct mqtt_client * client,int timeout)398 static int process_mqtt_and_sleep(struct mqtt_client *client, int timeout)
399 {
400 	int64_t remaining = timeout;
401 	int64_t start_time = k_uptime_get();
402 	int rc;
403 
404 	while (remaining > 0 && connected) {
405 		if (wait(remaining)) {
406 			rc = mqtt_input(client);
407 			if (rc != 0) {
408 				PRINT_RESULT("mqtt_input", rc);
409 				return rc;
410 			}
411 		}
412 
413 		rc = mqtt_live(client);
414 		if (rc != 0 && rc != -EAGAIN) {
415 			PRINT_RESULT("mqtt_live", rc);
416 			return rc;
417 		} else if (rc == 0) {
418 			rc = mqtt_input(client);
419 			if (rc != 0) {
420 				PRINT_RESULT("mqtt_input", rc);
421 				return rc;
422 			}
423 		}
424 
425 		remaining = timeout + start_time - k_uptime_get();
426 	}
427 
428 	return 0;
429 }
430 
431 #define SUCCESS_OR_EXIT(rc) { if (rc != 0) { return 1; } }
432 #define SUCCESS_OR_BREAK(rc) { if (rc != 0) { break; } }
433 
publisher(void)434 static int publisher(void)
435 {
436 	int i, rc, r = 0;
437 
438 	LOG_INF("attempting to connect: ");
439 	rc = try_to_connect(&client_ctx);
440 	PRINT_RESULT("try_to_connect", rc);
441 	SUCCESS_OR_EXIT(rc);
442 
443 	i = 0;
444 	while (i++ < CONFIG_NET_SAMPLE_APP_MAX_ITERATIONS && connected) {
445 		r = -1;
446 
447 		rc = mqtt_ping(&client_ctx);
448 		PRINT_RESULT("mqtt_ping", rc);
449 		SUCCESS_OR_BREAK(rc);
450 
451 		rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
452 		SUCCESS_OR_BREAK(rc);
453 
454 		rc = publish(&client_ctx, MQTT_QOS_0_AT_MOST_ONCE);
455 		PRINT_RESULT("mqtt_publish", rc);
456 		SUCCESS_OR_BREAK(rc);
457 
458 		rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
459 		SUCCESS_OR_BREAK(rc);
460 
461 		rc = publish(&client_ctx, MQTT_QOS_1_AT_LEAST_ONCE);
462 		PRINT_RESULT("mqtt_publish", rc);
463 		SUCCESS_OR_BREAK(rc);
464 
465 		rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
466 		SUCCESS_OR_BREAK(rc);
467 
468 		rc = publish(&client_ctx, MQTT_QOS_2_EXACTLY_ONCE);
469 		PRINT_RESULT("mqtt_publish", rc);
470 		SUCCESS_OR_BREAK(rc);
471 
472 		rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
473 		SUCCESS_OR_BREAK(rc);
474 
475 		r = 0;
476 	}
477 
478 	rc = mqtt_disconnect(&client_ctx);
479 	PRINT_RESULT("mqtt_disconnect", rc);
480 
481 	LOG_INF("Bye!");
482 
483 	return r;
484 }
485 
start_app(void)486 static int start_app(void)
487 {
488 	int r = 0, i = 0;
489 
490 	while (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS ||
491 	       i++ < CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) {
492 		r = publisher();
493 
494 		if (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) {
495 			k_sleep(K_MSEC(5000));
496 		}
497 	}
498 
499 	return r;
500 }
501 
502 #if defined(CONFIG_USERSPACE)
503 #define STACK_SIZE 2048
504 
505 #if defined(CONFIG_NET_TC_THREAD_COOPERATIVE)
506 #define THREAD_PRIORITY K_PRIO_COOP(CONFIG_NUM_COOP_PRIORITIES - 1)
507 #else
508 #define THREAD_PRIORITY K_PRIO_PREEMPT(8)
509 #endif
510 
511 K_THREAD_DEFINE(app_thread, STACK_SIZE,
512 		start_app, NULL, NULL, NULL,
513 		THREAD_PRIORITY, K_USER, -1);
514 
515 static K_HEAP_DEFINE(app_mem_pool, 1024 * 2);
516 #endif
517 
main(void)518 int main(void)
519 {
520 	wait_for_network();
521 
522 #if defined(CONFIG_MQTT_LIB_TLS)
523 	int rc;
524 
525 	rc = tls_init();
526 	PRINT_RESULT("tls_init", rc);
527 #endif
528 
529 #if defined(CONFIG_USERSPACE)
530 	int ret;
531 
532 	struct k_mem_partition *parts[] = {
533 #if Z_LIBC_PARTITION_EXISTS
534 		&z_libc_partition,
535 #endif
536 		&app_partition
537 	};
538 
539 	ret = k_mem_domain_init(&app_domain, ARRAY_SIZE(parts), parts);
540 	__ASSERT(ret == 0, "k_mem_domain_init() failed %d", ret);
541 	ARG_UNUSED(ret);
542 
543 	k_mem_domain_add_thread(&app_domain, app_thread);
544 	k_thread_heap_assign(app_thread, &app_mem_pool);
545 
546 	k_thread_start(app_thread);
547 	k_thread_join(app_thread, K_FOREVER);
548 #else
549 	exit(start_app());
550 #endif
551 	return 0;
552 }
553