/* * Copyright (c) 2017 Intel Corporation * * SPDX-License-Identifier: Apache-2.0 */ #include LOG_MODULE_REGISTER(net_mqtt_publisher_sample, LOG_LEVEL_DBG); #include #include #include #include #include #include #include "config.h" #include "net_sample_common.h" #if defined(CONFIG_USERSPACE) #include K_APPMEM_PARTITION_DEFINE(app_partition); struct k_mem_domain app_domain; #define APP_BMEM K_APP_BMEM(app_partition) #define APP_DMEM K_APP_DMEM(app_partition) #else #define APP_BMEM #define APP_DMEM #endif /* Buffers for MQTT client. */ static APP_BMEM uint8_t rx_buffer[APP_MQTT_BUFFER_SIZE]; static APP_BMEM uint8_t tx_buffer[APP_MQTT_BUFFER_SIZE]; #if defined(CONFIG_MQTT_LIB_WEBSOCKET) /* Making RX buffer large enough that the full IPv6 packet can fit into it */ #define MQTT_LIB_WEBSOCKET_RECV_BUF_LEN 1280 /* Websocket needs temporary buffer to store partial packets */ static APP_BMEM uint8_t temp_ws_rx_buf[MQTT_LIB_WEBSOCKET_RECV_BUF_LEN]; #endif /* The mqtt client struct */ static APP_BMEM struct mqtt_client client_ctx; /* MQTT Broker details. */ static APP_BMEM struct sockaddr_storage broker; #if defined(CONFIG_SOCKS) static APP_BMEM struct sockaddr socks5_proxy; #endif static APP_BMEM struct pollfd fds[1]; static APP_BMEM int nfds; static APP_BMEM bool connected; #if defined(CONFIG_MQTT_LIB_TLS) #include "test_certs.h" #define TLS_SNI_HOSTNAME "localhost" #define APP_CA_CERT_TAG 1 #define APP_PSK_TAG 2 static APP_DMEM sec_tag_t m_sec_tags[] = { #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD) APP_CA_CERT_TAG, #endif #if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED) APP_PSK_TAG, #endif }; static int tls_init(void) { int err = -EINVAL; #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD) err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE, ca_certificate, sizeof(ca_certificate)); if (err < 0) { LOG_ERR("Failed to register public certificate: %d", err); return err; } #endif #if defined(MBEDTLS_KEY_EXCHANGE_SOME_PSK_ENABLED) err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK, client_psk, sizeof(client_psk)); if (err < 0) { LOG_ERR("Failed to register PSK: %d", err); return err; } err = tls_credential_add(APP_PSK_TAG, TLS_CREDENTIAL_PSK_ID, client_psk_id, sizeof(client_psk_id) - 1); if (err < 0) { LOG_ERR("Failed to register PSK ID: %d", err); } #endif return err; } #endif /* CONFIG_MQTT_LIB_TLS */ static void prepare_fds(struct mqtt_client *client) { if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) { fds[0].fd = client->transport.tcp.sock; } #if defined(CONFIG_MQTT_LIB_TLS) else if (client->transport.type == MQTT_TRANSPORT_SECURE) { fds[0].fd = client->transport.tls.sock; } #endif fds[0].events = POLLIN; nfds = 1; } static void clear_fds(void) { nfds = 0; } static int wait(int timeout) { int ret = 0; if (nfds > 0) { ret = poll(fds, nfds, timeout); if (ret < 0) { LOG_ERR("poll error: %d", errno); } } return ret; } void mqtt_evt_handler(struct mqtt_client *const client, const struct mqtt_evt *evt) { int err; switch (evt->type) { case MQTT_EVT_CONNACK: if (evt->result != 0) { LOG_ERR("MQTT connect failed %d", evt->result); break; } connected = true; LOG_INF("MQTT client connected!"); break; case MQTT_EVT_DISCONNECT: LOG_INF("MQTT client disconnected %d", evt->result); connected = false; clear_fds(); break; case MQTT_EVT_PUBACK: if (evt->result != 0) { LOG_ERR("MQTT PUBACK error %d", evt->result); break; } LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id); break; case MQTT_EVT_PUBREC: if (evt->result != 0) { LOG_ERR("MQTT PUBREC error %d", evt->result); break; } LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id); const struct mqtt_pubrel_param rel_param = { .message_id = evt->param.pubrec.message_id }; err = mqtt_publish_qos2_release(client, &rel_param); if (err != 0) { LOG_ERR("Failed to send MQTT PUBREL: %d", err); } break; case MQTT_EVT_PUBCOMP: if (evt->result != 0) { LOG_ERR("MQTT PUBCOMP error %d", evt->result); break; } LOG_INF("PUBCOMP packet id: %u", evt->param.pubcomp.message_id); break; case MQTT_EVT_PINGRESP: LOG_INF("PINGRESP packet"); break; default: break; } } static char *get_mqtt_payload(enum mqtt_qos qos) { #if APP_BLUEMIX_TOPIC static APP_BMEM char payload[30]; snprintk(payload, sizeof(payload), "{d:{temperature:%d}}", sys_rand8_get()); #else static APP_DMEM char payload[] = "DOORS:OPEN_QoSx"; payload[strlen(payload) - 1] = '0' + qos; #endif return payload; } static char *get_mqtt_topic(void) { #if APP_BLUEMIX_TOPIC return "iot-2/type/"BLUEMIX_DEVTYPE"/id/"BLUEMIX_DEVID "/evt/"BLUEMIX_EVENT"/fmt/"BLUEMIX_FORMAT; #else return "sensors"; #endif } static int publish(struct mqtt_client *client, enum mqtt_qos qos) { struct mqtt_publish_param param; param.message.topic.qos = qos; param.message.topic.topic.utf8 = (uint8_t *)get_mqtt_topic(); param.message.topic.topic.size = strlen(param.message.topic.topic.utf8); param.message.payload.data = get_mqtt_payload(qos); param.message.payload.len = strlen(param.message.payload.data); param.message_id = sys_rand16_get(); param.dup_flag = 0U; param.retain_flag = 0U; return mqtt_publish(client, ¶m); } #define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR") #define PRINT_RESULT(func, rc) \ LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc)) static void broker_init(void) { #if defined(CONFIG_NET_IPV6) struct sockaddr_in6 *broker6 = (struct sockaddr_in6 *)&broker; broker6->sin6_family = AF_INET6; broker6->sin6_port = htons(SERVER_PORT); inet_pton(AF_INET6, SERVER_ADDR, &broker6->sin6_addr); #if defined(CONFIG_SOCKS) struct sockaddr_in6 *proxy6 = (struct sockaddr_in6 *)&socks5_proxy; proxy6->sin6_family = AF_INET6; proxy6->sin6_port = htons(SOCKS5_PROXY_PORT); inet_pton(AF_INET6, SOCKS5_PROXY_ADDR, &proxy6->sin6_addr); #endif #else struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker; broker4->sin_family = AF_INET; broker4->sin_port = htons(SERVER_PORT); inet_pton(AF_INET, SERVER_ADDR, &broker4->sin_addr); #if defined(CONFIG_SOCKS) struct sockaddr_in *proxy4 = (struct sockaddr_in *)&socks5_proxy; proxy4->sin_family = AF_INET; proxy4->sin_port = htons(SOCKS5_PROXY_PORT); inet_pton(AF_INET, SOCKS5_PROXY_ADDR, &proxy4->sin_addr); #endif #endif } static void client_init(struct mqtt_client *client) { mqtt_client_init(client); broker_init(); /* MQTT client configuration */ client->broker = &broker; client->evt_cb = mqtt_evt_handler; client->client_id.utf8 = (uint8_t *)MQTT_CLIENTID; client->client_id.size = strlen(MQTT_CLIENTID); client->password = NULL; client->user_name = NULL; client->protocol_version = MQTT_VERSION_3_1_1; /* MQTT buffers configuration */ client->rx_buf = rx_buffer; client->rx_buf_size = sizeof(rx_buffer); client->tx_buf = tx_buffer; client->tx_buf_size = sizeof(tx_buffer); /* MQTT transport configuration */ #if defined(CONFIG_MQTT_LIB_TLS) #if defined(CONFIG_MQTT_LIB_WEBSOCKET) client->transport.type = MQTT_TRANSPORT_SECURE_WEBSOCKET; #else client->transport.type = MQTT_TRANSPORT_SECURE; #endif struct mqtt_sec_config *tls_config = &client->transport.tls.config; tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED; tls_config->cipher_list = NULL; tls_config->sec_tag_list = m_sec_tags; tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags); #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD) tls_config->hostname = TLS_SNI_HOSTNAME; #else tls_config->hostname = NULL; #endif #else #if defined(CONFIG_MQTT_LIB_WEBSOCKET) client->transport.type = MQTT_TRANSPORT_NON_SECURE_WEBSOCKET; #else client->transport.type = MQTT_TRANSPORT_NON_SECURE; #endif #endif #if defined(CONFIG_MQTT_LIB_WEBSOCKET) client->transport.websocket.config.host = SERVER_ADDR; client->transport.websocket.config.url = "/mqtt"; client->transport.websocket.config.tmp_buf = temp_ws_rx_buf; client->transport.websocket.config.tmp_buf_len = sizeof(temp_ws_rx_buf); client->transport.websocket.timeout = 5 * MSEC_PER_SEC; #endif #if defined(CONFIG_SOCKS) mqtt_client_set_proxy(client, &socks5_proxy, socks5_proxy.sa_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)); #endif } /* In this routine we block until the connected variable is 1 */ static int try_to_connect(struct mqtt_client *client) { int rc, i = 0; while (i++ < APP_CONNECT_TRIES && !connected) { client_init(client); rc = mqtt_connect(client); if (rc != 0) { PRINT_RESULT("mqtt_connect", rc); k_sleep(K_MSEC(APP_SLEEP_MSECS)); continue; } prepare_fds(client); if (wait(APP_CONNECT_TIMEOUT_MS)) { mqtt_input(client); } if (!connected) { mqtt_abort(client); } } if (connected) { return 0; } return -EINVAL; } static int process_mqtt_and_sleep(struct mqtt_client *client, int timeout) { int64_t remaining = timeout; int64_t start_time = k_uptime_get(); int rc; while (remaining > 0 && connected) { if (wait(remaining)) { rc = mqtt_input(client); if (rc != 0) { PRINT_RESULT("mqtt_input", rc); return rc; } } rc = mqtt_live(client); if (rc != 0 && rc != -EAGAIN) { PRINT_RESULT("mqtt_live", rc); return rc; } else if (rc == 0) { rc = mqtt_input(client); if (rc != 0) { PRINT_RESULT("mqtt_input", rc); return rc; } } remaining = timeout + start_time - k_uptime_get(); } return 0; } #define SUCCESS_OR_EXIT(rc) { if (rc != 0) { return 1; } } #define SUCCESS_OR_BREAK(rc) { if (rc != 0) { break; } } static int publisher(void) { int i, rc, r = 0; LOG_INF("attempting to connect: "); rc = try_to_connect(&client_ctx); PRINT_RESULT("try_to_connect", rc); SUCCESS_OR_EXIT(rc); i = 0; while (i++ < CONFIG_NET_SAMPLE_APP_MAX_ITERATIONS && connected) { r = -1; rc = mqtt_ping(&client_ctx); PRINT_RESULT("mqtt_ping", rc); SUCCESS_OR_BREAK(rc); rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS); SUCCESS_OR_BREAK(rc); rc = publish(&client_ctx, MQTT_QOS_0_AT_MOST_ONCE); PRINT_RESULT("mqtt_publish", rc); SUCCESS_OR_BREAK(rc); rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS); SUCCESS_OR_BREAK(rc); rc = publish(&client_ctx, MQTT_QOS_1_AT_LEAST_ONCE); PRINT_RESULT("mqtt_publish", rc); SUCCESS_OR_BREAK(rc); rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS); SUCCESS_OR_BREAK(rc); rc = publish(&client_ctx, MQTT_QOS_2_EXACTLY_ONCE); PRINT_RESULT("mqtt_publish", rc); SUCCESS_OR_BREAK(rc); rc = process_mqtt_and_sleep(&client_ctx, APP_SLEEP_MSECS); SUCCESS_OR_BREAK(rc); r = 0; } rc = mqtt_disconnect(&client_ctx); PRINT_RESULT("mqtt_disconnect", rc); LOG_INF("Bye!"); return r; } static int start_app(void) { int r = 0, i = 0; while (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS || i++ < CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) { r = publisher(); if (!CONFIG_NET_SAMPLE_APP_MAX_CONNECTIONS) { k_sleep(K_MSEC(5000)); } } return r; } #if defined(CONFIG_USERSPACE) #define STACK_SIZE 2048 #if defined(CONFIG_NET_TC_THREAD_COOPERATIVE) #define THREAD_PRIORITY K_PRIO_COOP(CONFIG_NUM_COOP_PRIORITIES - 1) #else #define THREAD_PRIORITY K_PRIO_PREEMPT(8) #endif K_THREAD_DEFINE(app_thread, STACK_SIZE, start_app, NULL, NULL, NULL, THREAD_PRIORITY, K_USER, -1); static K_HEAP_DEFINE(app_mem_pool, 1024 * 2); #endif int main(void) { wait_for_network(); #if defined(CONFIG_MQTT_LIB_TLS) int rc; rc = tls_init(); PRINT_RESULT("tls_init", rc); #endif #if defined(CONFIG_USERSPACE) int ret; struct k_mem_partition *parts[] = { #if Z_LIBC_PARTITION_EXISTS &z_libc_partition, #endif &app_partition }; ret = k_mem_domain_init(&app_domain, ARRAY_SIZE(parts), parts); __ASSERT(ret == 0, "k_mem_domain_init() failed %d", ret); ARG_UNUSED(ret); k_mem_domain_add_thread(&app_domain, app_thread); k_thread_heap_assign(app_thread, &app_mem_pool); k_thread_start(app_thread); k_thread_join(app_thread, K_FOREVER); #else exit(start_app()); #endif return 0; }