1 /*
2 * Copyright (c) 2017 Intel Corporation
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <zephyr/logging/log.h>
8 LOG_MODULE_REGISTER(net_mqtt_publisher_sample, LOG_LEVEL_DBG);
9
10 #include <zephyr/kernel.h>
11 #include <zephyr/net/socket.h>
12 #include <zephyr/net/mqtt.h>
13 #include <zephyr/random/random.h>
14
15 #include <string.h>
16 #include <errno.h>
17
18 #include "config.h"
19 #include "net_sample_common.h"
20
21 #if defined(CONFIG_USERSPACE)
22 #include <zephyr/app_memory/app_memdomain.h>
23 K_APPMEM_PARTITION_DEFINE(app_partition);
24 struct k_mem_domain app_domain;
25 #define APP_BMEM K_APP_BMEM(app_partition)
26 #define APP_DMEM K_APP_DMEM(app_partition)
27 #else
28 #define APP_BMEM
29 #define APP_DMEM
30 #endif
31
32 /* Buffers for MQTT client. */
33 static APP_BMEM uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
34 static APP_BMEM uint8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
35
36 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
37 /* Making RX buffer large enough that the full IPv6 packet can fit into it */
38 #define MQTT_LIB_WEBSOCKET_RECV_BUF_LEN 1280
39
40 /* Websocket needs temporary buffer to store partial packets */
41 static APP_BMEM uint8_t temp_ws_rx_buf[MQTT_LIB_WEBSOCKET_RECV_BUF_LEN];
42 #endif
43
44 /* The mqtt client struct */
45 static APP_BMEM struct mqtt_client client_ctx;
46
47 /* MQTT Broker details. */
48 static APP_BMEM struct sockaddr_storage broker;
49
50 #if defined(CONFIG_SOCKS)
51 static APP_BMEM struct sockaddr socks5_proxy;
52 #endif
53
54 static APP_BMEM struct pollfd fds[1];
55 static APP_BMEM int nfds;
56
57 static APP_BMEM bool connected;
58
59 #if defined(CONFIG_MQTT_LIB_TLS)
60
61 #include "test_certs.h"
62
63 #define TLS_SNI_HOSTNAME "localhost"
64 #define APP_CA_CERT_TAG 1
65 #define APP_PSK_TAG 2
66
67 static APP_DMEM sec_tag_t m_sec_tags[] = {
68 #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
69 APP_CA_CERT_TAG,
70 #endif
71 #if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED)
72 APP_PSK_TAG,
73 #endif
74 };
75
tls_init(void)76 static int tls_init(void)
77 {
78 int err = -EINVAL;
79
80 #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
81 err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
82 ca_certificate, sizeof(ca_certificate));
83 if (err < 0) {
84 LOG_ERR("Failed to register public certificate: %d", err);
85 return err;
86 }
87 #endif
88
89 #if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED)
90 err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK,
91 client_psk, sizeof(client_psk));
92 if (err < 0) {
93 LOG_ERR("Failed to register PSK: %d", err);
94 return err;
95 }
96
97 err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK_ID,
98 client_psk_id, sizeof(client_psk_id) - 1);
99 if (err < 0) {
100 LOG_ERR("Failed to register PSK ID: %d", err);
101 }
102 #endif
103
104 return err;
105 }
106
107 #endif /* CONFIG_MQTT_LIB_TLS */
108
prepare_fds(struct mqtt_client * client)109 static void prepare_fds(struct mqtt_client *client)
110 {
111 if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
112 fds[0].fd = client->transport.tcp.sock;
113 }
114 #if defined(CONFIG_MQTT_LIB_TLS)
115 else if (client->transport.type == MQTT_TRANSPORT_SECURE) {
116 fds[0].fd = client->transport.tls.sock;
117 }
118 #endif
119
120 fds[0].events = POLLIN;
121 nfds = 1;
122 }
123
clear_fds(void)124 static void clear_fds(void)
125 {
126 nfds = 0;
127 }
128
wait(int timeout)129 static int wait(int timeout)
130 {
131 int ret = 0;
132
133 if (nfds > 0) {
134 ret = poll(fds, nfds, timeout);
135 if (ret < 0) {
136 LOG_ERR("poll error: %d", errno);
137 }
138 }
139
140 return ret;
141 }
142
mqtt_evt_handler(struct mqtt_client * const client,const struct mqtt_evt * evt)143 void mqtt_evt_handler(struct mqtt_client *const client,
144 const struct mqtt_evt *evt)
145 {
146 int err;
147
148 switch (evt->type) {
149 case MQTT_EVT_CONNACK:
150 if (evt->result != 0) {
151 LOG_ERR("MQTT connect failed %d", evt->result);
152 break;
153 }
154
155 connected = true;
156 LOG_INF("MQTT client connected!");
157
158 break;
159
160 case MQTT_EVT_DISCONNECT:
161 LOG_INF("MQTT client disconnected %d", evt->result);
162
163 connected = false;
164 clear_fds();
165
166 break;
167
168 case MQTT_EVT_PUBACK:
169 if (evt->result != 0) {
170 LOG_ERR("MQTT PUBACK error %d", evt->result);
171 break;
172 }
173
174 LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id);
175
176 break;
177
178 case MQTT_EVT_PUBREC:
179 if (evt->result != 0) {
180 LOG_ERR("MQTT PUBREC error %d", evt->result);
181 break;
182 }
183
184 LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id);
185
186 const struct mqtt_pubrel_param rel_param = {
187 .message_id = evt->param.pubrec.message_id
188 };
189
190 err = mqtt_publish_qos2_release(client, &rel_param);
191 if (err != 0) {
192 LOG_ERR("Failed to send MQTT PUBREL: %d", err);
193 }
194
195 break;
196
197 case MQTT_EVT_PUBCOMP:
198 if (evt->result != 0) {
199 LOG_ERR("MQTT PUBCOMP error %d", evt->result);
200 break;
201 }
202
203 LOG_INF("PUBCOMP packet id: %u",
204 evt->param.pubcomp.message_id);
205
206 break;
207
208 case MQTT_EVT_PINGRESP:
209 LOG_INF("PINGRESP packet");
210 break;
211
212 default:
213 break;
214 }
215 }
216
get_mqtt_payload(enum mqtt_qos qos)217 static char *get_mqtt_payload(enum mqtt_qos qos)
218 {
219 #if APP_BLUEMIX_TOPIC
220 static APP_BMEM char payload[30];
221
222 snprintk(payload, sizeof(payload), "{d:{temperature:%d}}",
223 sys_rand8_get());
224 #else
225 static APP_DMEM char payload[] = "DOORS:OPEN_QoSx";
226
227 payload[strlen(payload) - 1] = '0' + qos;
228 #endif
229
230 return payload;
231 }
232
get_mqtt_topic(void)233 static char *get_mqtt_topic(void)
234 {
235 #if APP_BLUEMIX_TOPIC
236 return "iot-2/type/"BLUEMIX_DEVTYPE"/id/"BLUEMIX_DEVID
237 "/evt/"BLUEMIX_EVENT"/fmt/"BLUEMIX_FORMAT;
238 #else
239 return "sensors";
240 #endif
241 }
242
publish(struct mqtt_client * client,enum mqtt_qos qos)243 static int publish(struct mqtt_client *client, enum mqtt_qos qos)
244 {
245 struct mqtt_publish_param param;
246
247 param.message.topic.qos = qos;
248 param.message.topic.topic.utf8 = (uint8_t *)get_mqtt_topic();
249 param.message.topic.topic.size =
250 strlen(param.message.topic.topic.utf8);
251 param.message.payload.data = get_mqtt_payload(qos);
252 param.message.payload.len =
253 strlen(param.message.payload.data);
254 param.message_id = sys_rand16_get();
255 param.dup_flag = 0U;
256 param.retain_flag = 0U;
257
258 return mqtt_publish(client, ¶m);
259 }
260
261 #define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR")
262
263 #define PRINT_RESULT(func, rc) \
264 LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc))
265
broker_init(void)266 static void broker_init(void)
267 {
268 #if defined(CONFIG_NET_IPV6)
269 struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker;
270
271 broker6->sin6_family = AF_INET6;
272 broker6->sin6_port = htons(SERVER_PORT);
273 inet_pton(AF_INET6, SERVER_ADDR, &broker6->sin6_addr);
274
275 #if defined(CONFIG_SOCKS)
276 struct sockaddr_in6 *proxy6 = (struct sockaddr_in6 *)&socks5_proxy;
277
278 proxy6->sin6_family = AF_INET6;
279 proxy6->sin6_port = htons(SOCKS5_PROXY_PORT);
280 inet_pton(AF_INET6, SOCKS5_PROXY_ADDR, &proxy6->sin6_addr);
281 #endif
282 #else
283 struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;
284
285 broker4->sin_family = AF_INET;
286 broker4->sin_port = htons(SERVER_PORT);
287 inet_pton(AF_INET, SERVER_ADDR, &broker4->sin_addr);
288 #if defined(CONFIG_SOCKS)
289 struct sockaddr_in *proxy4 = (struct sockaddr_in *)&socks5_proxy;
290
291 proxy4->sin_family = AF_INET;
292 proxy4->sin_port = htons(SOCKS5_PROXY_PORT);
293 inet_pton(AF_INET, SOCKS5_PROXY_ADDR, &proxy4->sin_addr);
294 #endif
295 #endif
296 }
297
client_init(struct mqtt_client * client)298 static void client_init(struct mqtt_client *client)
299 {
300 mqtt_client_init(client);
301
302 broker_init();
303
304 /* MQTT client configuration */
305 client->broker = &broker;
306 client->evt_cb = mqtt_evt_handler;
307 client->client_id.utf8 = (uint8_t *)MQTT_CLIENTID;
308 client->client_id.size = strlen(MQTT_CLIENTID);
309 client->password = NULL;
310 client->user_name = NULL;
311 client->protocol_version = MQTT_VERSION_3_1_1;
312
313 /* MQTT buffers configuration */
314 client->rx_buf = rx_buffer;
315 client->rx_buf_size = sizeof(rx_buffer);
316 client->tx_buf = tx_buffer;
317 client->tx_buf_size = sizeof(tx_buffer);
318
319 /* MQTT transport configuration */
320 #if defined(CONFIG_MQTT_LIB_TLS)
321 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
322 client->transport.type = MQTT_TRANSPORT_SECURE_WEBSOCKET;
323 #else
324 client->transport.type = MQTT_TRANSPORT_SECURE;
325 #endif
326
327 struct mqtt_sec_config *tls_config = &client->transport.tls.config;
328
329 tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
330 tls_config->cipher_list = NULL;
331 tls_config->sec_tag_list = m_sec_tags;
332 tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
333 #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD)
334 tls_config->hostname = TLS_SNI_HOSTNAME;
335 #else
336 tls_config->hostname = NULL;
337 #endif
338
339 #else
340 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
341 client->transport.type = MQTT_TRANSPORT_NON_SECURE_WEBSOCKET;
342 #else
343 client->transport.type = MQTT_TRANSPORT_NON_SECURE;
344 #endif
345 #endif
346
347 #if defined(CONFIG_MQTT_LIB_WEBSOCKET)
348 client->transport.websocket.config.host = SERVER_ADDR;
349 client->transport.websocket.config.url = "/mqtt";
350 client->transport.websocket.config.tmp_buf = temp_ws_rx_buf;
351 client->transport.websocket.config.tmp_buf_len =
352 sizeof(temp_ws_rx_buf);
353 client->transport.websocket.timeout = 5 * MSEC_PER_SEC;
354 #endif
355
356 #if defined(CONFIG_SOCKS)
357 mqtt_client_set_proxy(client, &socks5_proxy,
358 socks5_proxy.sa_family == AF_INET ?
359 sizeof(struct sockaddr_in) :
360 sizeof(struct sockaddr_in6));
361 #endif
362 }
363
364 /* In this routine we block until the connected variable is 1 */
try_to_connect(struct mqtt_client * client)365 static int try_to_connect(struct mqtt_client *client)
366 {
367 int rc, i = 0;
368
369 while (i++ < APP_CONNECT_TRIES && !connected) {
370
371 client_init(client);
372
373 rc = mqtt_connect(client);
374 if (rc != 0) {
375 PRINT_RESULT("mqtt_connect", rc);
376 k_sleep(K_MSEC(APP_SLEEP_MSECS));
377 continue;
378 }
379
380 prepare_fds(client);
381
382 if (wait(APP_CONNECT_TIMEOUT_MS)) {
383 mqtt_input(client);
384 }
385
386 if (!connected) {
387 mqtt_abort(client);
388 }
389 }
390
391 if (connected) {
392 return 0;
393 }
394
395 return -EINVAL;
396 }
397
process_mqtt_and_sleep(struct mqtt_client * client,int timeout)398 static int process_mqtt_and_sleep(struct mqtt_client *client, int timeout)
399 {
400 int64_t remaining = timeout;
401 int64_t start_time = k_uptime_get();
402 int rc;
403
404 while (remaining > 0 && connected) {
405 if (wait(remaining)) {
406 rc = mqtt_input(client);
407 if (rc != 0) {
408 PRINT_RESULT("mqtt_input", rc);
409 return rc;
410 }
411 }
412
413 rc = mqtt_live(client);
414 if (rc != 0 && rc != -EAGAIN) {
415 PRINT_RESULT("mqtt_live", rc);
416 return rc;
417 } else if (rc == 0) {
418 rc = mqtt_input(client);
419 if (rc != 0) {
420 PRINT_RESULT("mqtt_input", rc);
421 return rc;
422 }
423 }
424
425 remaining = timeout + start_time - k_uptime_get();
426 }
427
428 return 0;
429 }
430
431 #define SUCCESS_OR_EXIT(rc) { if (rc != 0) { return 1; } }
432 #define SUCCESS_OR_BREAK(rc) { if (rc != 0) { break; } }
433
publisher(void)434 static int publisher(void)
435 {
436 int i, rc, r = 0;
437
438 LOG_INF("attempting to connect: ");
439 rc = try_to_connect(&client_ctx);
440 PRINT_RESULT("try_to_connect", rc);
441 SUCCESS_OR_EXIT(rc);
442
443 i = 0;
444 while (i++ < CONFIG_NET_SAMPLE_APP_MAX_ITERATIONS && connected) {
445 r = -1;
446
447 rc = mqtt_ping(&client_ctx);
448 PRINT_RESULT("mqtt_ping", rc);
449 SUCCESS_OR_BREAK(rc);
450
451 rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
452 SUCCESS_OR_BREAK(rc);
453
454 rc = publish(&client_ctx, MQTT_QOS_0_AT_MOST_ONCE);
455 PRINT_RESULT("mqtt_publish", rc);
456 SUCCESS_OR_BREAK(rc);
457
458 rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
459 SUCCESS_OR_BREAK(rc);
460
461 rc = publish(&client_ctx, MQTT_QOS_1_AT_LEAST_ONCE);
462 PRINT_RESULT("mqtt_publish", rc);
463 SUCCESS_OR_BREAK(rc);
464
465 rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
466 SUCCESS_OR_BREAK(rc);
467
468 rc = publish(&client_ctx, MQTT_QOS_2_EXACTLY_ONCE);
469 PRINT_RESULT("mqtt_publish", rc);
470 SUCCESS_OR_BREAK(rc);
471
472 rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS);
473 SUCCESS_OR_BREAK(rc);
474
475 r = 0;
476 }
477
478 rc = mqtt_disconnect(&client_ctx);
479 PRINT_RESULT("mqtt_disconnect", rc);
480
481 LOG_INF("Bye!");
482
483 return r;
484 }
485
start_app(void)486 static int start_app(void)
487 {
488 int r = 0, i = 0;
489
490 while (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS ||
491 i++ < CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) {
492 r = publisher();
493
494 if (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) {
495 k_sleep(K_MSEC(5000));
496 }
497 }
498
499 return r;
500 }
501
502 #if defined(CONFIG_USERSPACE)
503 #define STACK_SIZE 2048
504
505 #if defined(CONFIG_NET_TC_THREAD_COOPERATIVE)
506 #define THREAD_PRIORITY K_PRIO_COOP(CONFIG_NUM_COOP_PRIORITIES - 1)
507 #else
508 #define THREAD_PRIORITY K_PRIO_PREEMPT(8)
509 #endif
510
511 K_THREAD_DEFINE(app_thread, STACK_SIZE,
512 start_app, NULL, NULL, NULL,
513 THREAD_PRIORITY, K_USER, -1);
514
515 static K_HEAP_DEFINE(app_mem_pool, 1024 * 2);
516 #endif
517
main(void)518 int main(void)
519 {
520 wait_for_network();
521
522 #if defined(CONFIG_MQTT_LIB_TLS)
523 int rc;
524
525 rc = tls_init();
526 PRINT_RESULT("tls_init", rc);
527 #endif
528
529 #if defined(CONFIG_USERSPACE)
530 int ret;
531
532 struct k_mem_partition *parts[] = {
533 #if Z_LIBC_PARTITION_EXISTS
534 &z_libc_partition,
535 #endif
536 &app_partition
537 };
538
539 ret = k_mem_domain_init(&app_domain, ARRAY_SIZE(parts), parts);
540 __ASSERT(ret == 0, "k_mem_domain_init() failed %d", ret);
541 ARG_UNUSED(ret);
542
543 k_mem_domain_add_thread(&app_domain, app_thread);
544 k_thread_heap_assign(app_thread, &app_mem_pool);
545
546 k_thread_start(app_thread);
547 k_thread_join(app_thread, K_FOREVER);
548 #else
549 exit(start_app());
550 #endif
551 return 0;
552 }
553