/* * Copyright (c) 2018 Nordic Semiconductor ASA * * SPDX-License-Identifier: Apache-2.0 */ #include LOG_MODULE_REGISTER(net_mqtt_rx, CONFIG_MQTT_LOG_LEVEL); #include "mqtt_internal.h" #include "mqtt_transport.h" #include "mqtt_os.h" /** @file mqtt_rx.c * * @brief MQTT Received data handling. */ static int mqtt_handle_packet(struct mqtt_client *client, uint8_t type_and_flags, uint32_t var_length, struct buf_ctx *buf) { int err_code = 0; bool notify_event = true; struct mqtt_evt evt; /* Success by default, overwritten in special cases. */ evt.result = 0; switch (type_and_flags & 0xF0) { case MQTT_PKT_TYPE_CONNACK: NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_CONNACK!", client); evt.type = MQTT_EVT_CONNACK; err_code = connect_ack_decode(client, buf, &evt.param.connack); if (err_code == 0) { NET_DBG("[CID %p]: return_code: %d", client, evt.param.connack.return_code); if (evt.param.connack.return_code == MQTT_CONNECTION_ACCEPTED) { /* Set state. */ MQTT_SET_STATE(client, MQTT_STATE_CONNECTED); } else { err_code = -ECONNREFUSED; } evt.result = evt.param.connack.return_code; } else { evt.result = err_code; } break; case MQTT_PKT_TYPE_PUBLISH: NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBLISH", client); evt.type = MQTT_EVT_PUBLISH; err_code = publish_decode(type_and_flags, var_length, buf, &evt.param.publish); evt.result = err_code; client->internal.remaining_payload = evt.param.publish.message.payload.len; NET_DBG("PUB QoS:%02x, message len %08x, topic len %08x", evt.param.publish.message.topic.qos, evt.param.publish.message.payload.len, evt.param.publish.message.topic.topic.size); break; case MQTT_PKT_TYPE_PUBACK: NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBACK!", client); evt.type = MQTT_EVT_PUBACK; err_code = publish_ack_decode(buf, &evt.param.puback); evt.result = err_code; break; case MQTT_PKT_TYPE_PUBREC: NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREC!", client); evt.type = MQTT_EVT_PUBREC; err_code = publish_receive_decode(buf, &evt.param.pubrec); evt.result = err_code; break; case MQTT_PKT_TYPE_PUBREL: NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREL!", client); evt.type = MQTT_EVT_PUBREL; err_code = publish_release_decode(buf, &evt.param.pubrel); evt.result = err_code; break; case MQTT_PKT_TYPE_PUBCOMP: NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBCOMP!", client); evt.type = MQTT_EVT_PUBCOMP; err_code = publish_complete_decode(buf, &evt.param.pubcomp); evt.result = err_code; break; case MQTT_PKT_TYPE_SUBACK: NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_SUBACK!", client); evt.type = MQTT_EVT_SUBACK; err_code = subscribe_ack_decode(buf, &evt.param.suback); evt.result = err_code; break; case MQTT_PKT_TYPE_UNSUBACK: NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_UNSUBACK!", client); evt.type = MQTT_EVT_UNSUBACK; err_code = unsubscribe_ack_decode(buf, &evt.param.unsuback); evt.result = err_code; break; case MQTT_PKT_TYPE_PINGRSP: NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PINGRSP!", client); if (client->unacked_ping <= 0) { NET_WARN("Unexpected PINGRSP"); client->unacked_ping = 0; } else { client->unacked_ping--; } evt.type = MQTT_EVT_PINGRESP; break; default: /* Nothing to notify. */ notify_event = false; break; } if (notify_event == true) { event_notify(client, &evt); } return err_code; } static int mqtt_read_message_chunk(struct mqtt_client *client, struct buf_ctx *buf, uint32_t length) { uint32_t remaining; int len; /* In case all data requested has already been buffered, return. */ if (length <= (buf->end - buf->cur)) { return 0; } /* Calculate how much data we need to read from the transport, * given the already buffered data. */ remaining = length - (buf->end - buf->cur); /* Check if read does not exceed the buffer. */ if ((buf->end + remaining > client->rx_buf + client->rx_buf_size) || (buf->end + remaining < client->rx_buf)) { NET_ERR("[CID %p]: Read would exceed RX buffer bounds.", client); return -ENOMEM; } len = mqtt_transport_read(client, buf->end, remaining, false); if (len < 0) { if (len != -EAGAIN) { NET_ERR("[CID %p]: Transport read error: %d", client, len); } return len; } if (len == 0) { NET_ERR("[CID %p]: Connection closed.", client); return -ENOTCONN; } client->internal.rx_buf_datalen += len; buf->end += len; if (len < remaining) { NET_ERR("[CID %p]: Message partially received.", client); return -EAGAIN; } return 0; } static int mqtt_read_publish_var_header(struct mqtt_client *client, uint8_t type_and_flags, struct buf_ctx *buf) { uint8_t qos = (type_and_flags & MQTT_HEADER_QOS_MASK) >> 1; int err_code; uint32_t variable_header_length; /* Read topic length field. */ err_code = mqtt_read_message_chunk(client, buf, sizeof(uint16_t)); if (err_code < 0) { return err_code; } variable_header_length = *buf->cur << 8; /* MSB */ variable_header_length |= *(buf->cur + 1); /* LSB */ /* Add two bytes for topic length field. */ variable_header_length += sizeof(uint16_t); /* Add two bytes for message_id, if needed. */ if (qos > MQTT_QOS_0_AT_MOST_ONCE) { variable_header_length += sizeof(uint16_t); } /* Now we can read the whole header. */ err_code = mqtt_read_message_chunk(client, buf, variable_header_length); if (err_code < 0) { return err_code; } return 0; } static int mqtt_read_and_parse_fixed_header(struct mqtt_client *client, uint8_t *type_and_flags, uint32_t *var_length, struct buf_ctx *buf) { /* Read the mandatory part of the fixed header in first iteration. */ uint8_t chunk_size = MQTT_FIXED_HEADER_MIN_SIZE; int err_code; do { err_code = mqtt_read_message_chunk(client, buf, chunk_size); if (err_code < 0) { return err_code; } /* Reset to pointer to the beginning of the frame. */ buf->cur = client->rx_buf; chunk_size = 1U; err_code = fixed_header_decode(buf, type_and_flags, var_length); } while (err_code == -EAGAIN); return err_code; } int mqtt_handle_rx(struct mqtt_client *client) { int err_code; uint8_t type_and_flags; uint32_t var_length; struct buf_ctx buf; buf.cur = client->rx_buf; buf.end = client->rx_buf + client->internal.rx_buf_datalen; err_code = mqtt_read_and_parse_fixed_header(client, &type_and_flags, &var_length, &buf); if (err_code < 0) { return (err_code == -EAGAIN) ? 0 : err_code; } if ((type_and_flags & 0xF0) == MQTT_PKT_TYPE_PUBLISH) { err_code = mqtt_read_publish_var_header(client, type_and_flags, &buf); } else { err_code = mqtt_read_message_chunk(client, &buf, var_length); } if (err_code < 0) { return (err_code == -EAGAIN) ? 0 : err_code; } /* At this point, packet is ready to be passed to the application. */ err_code = mqtt_handle_packet(client, type_and_flags, var_length, &buf); if (err_code < 0) { return err_code; } client->internal.rx_buf_datalen = 0U; return 0; }