1 /*
2 * Copyright (c) 2018 Nordic Semiconductor ASA
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <zephyr/logging/log.h>
8 LOG_MODULE_REGISTER(net_mqtt_rx, CONFIG_MQTT_LOG_LEVEL);
9
10 #include "mqtt_internal.h"
11 #include "mqtt_transport.h"
12 #include "mqtt_os.h"
13
14 /** @file mqtt_rx.c
15 *
16 * @brief MQTT Received data handling.
17 */
18
mqtt_handle_packet(struct mqtt_client * client,uint8_t type_and_flags,uint32_t var_length,struct buf_ctx * buf)19 static int mqtt_handle_packet(struct mqtt_client *client,
20 uint8_t type_and_flags,
21 uint32_t var_length,
22 struct buf_ctx *buf)
23 {
24 int err_code = 0;
25 bool notify_event = true;
26 struct mqtt_evt evt;
27
28 /* Success by default, overwritten in special cases. */
29 evt.result = 0;
30
31 switch (type_and_flags & 0xF0) {
32 case MQTT_PKT_TYPE_CONNACK:
33 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_CONNACK!", client);
34
35 evt.type = MQTT_EVT_CONNACK;
36 err_code = connect_ack_decode(client, buf, &evt.param.connack);
37 if (err_code == 0) {
38 NET_DBG("[CID %p]: return_code: %d", client,
39 evt.param.connack.return_code);
40
41 if (evt.param.connack.return_code ==
42 MQTT_CONNECTION_ACCEPTED) {
43 /* Set state. */
44 MQTT_SET_STATE(client, MQTT_STATE_CONNECTED);
45 } else {
46 err_code = -ECONNREFUSED;
47 }
48
49 evt.result = evt.param.connack.return_code;
50 } else {
51 evt.result = err_code;
52 }
53
54 break;
55
56 case MQTT_PKT_TYPE_PUBLISH:
57 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBLISH", client);
58
59 evt.type = MQTT_EVT_PUBLISH;
60 err_code = publish_decode(type_and_flags, var_length, buf,
61 &evt.param.publish);
62 evt.result = err_code;
63
64 client->internal.remaining_payload =
65 evt.param.publish.message.payload.len;
66
67 NET_DBG("PUB QoS:%02x, message len %08x, topic len %08x",
68 evt.param.publish.message.topic.qos,
69 evt.param.publish.message.payload.len,
70 evt.param.publish.message.topic.topic.size);
71
72 break;
73
74 case MQTT_PKT_TYPE_PUBACK:
75 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBACK!", client);
76
77 evt.type = MQTT_EVT_PUBACK;
78 err_code = publish_ack_decode(buf, &evt.param.puback);
79 evt.result = err_code;
80 break;
81
82 case MQTT_PKT_TYPE_PUBREC:
83 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREC!", client);
84
85 evt.type = MQTT_EVT_PUBREC;
86 err_code = publish_receive_decode(buf, &evt.param.pubrec);
87 evt.result = err_code;
88 break;
89
90 case MQTT_PKT_TYPE_PUBREL:
91 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREL!", client);
92
93 evt.type = MQTT_EVT_PUBREL;
94 err_code = publish_release_decode(buf, &evt.param.pubrel);
95 evt.result = err_code;
96 break;
97
98 case MQTT_PKT_TYPE_PUBCOMP:
99 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBCOMP!", client);
100
101 evt.type = MQTT_EVT_PUBCOMP;
102 err_code = publish_complete_decode(buf, &evt.param.pubcomp);
103 evt.result = err_code;
104 break;
105
106 case MQTT_PKT_TYPE_SUBACK:
107 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_SUBACK!", client);
108
109 evt.type = MQTT_EVT_SUBACK;
110 err_code = subscribe_ack_decode(buf, &evt.param.suback);
111 evt.result = err_code;
112 break;
113
114 case MQTT_PKT_TYPE_UNSUBACK:
115 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_UNSUBACK!", client);
116
117 evt.type = MQTT_EVT_UNSUBACK;
118 err_code = unsubscribe_ack_decode(buf, &evt.param.unsuback);
119 evt.result = err_code;
120 break;
121
122 case MQTT_PKT_TYPE_PINGRSP:
123 NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PINGRSP!", client);
124
125 if (client->unacked_ping <= 0) {
126 NET_WARN("Unexpected PINGRSP");
127 client->unacked_ping = 0;
128 } else {
129 client->unacked_ping--;
130 }
131
132 evt.type = MQTT_EVT_PINGRESP;
133 break;
134
135 default:
136 /* Nothing to notify. */
137 notify_event = false;
138 break;
139 }
140
141 if (notify_event == true) {
142 event_notify(client, &evt);
143 }
144
145 return err_code;
146 }
147
mqtt_read_message_chunk(struct mqtt_client * client,struct buf_ctx * buf,uint32_t length)148 static int mqtt_read_message_chunk(struct mqtt_client *client,
149 struct buf_ctx *buf, uint32_t length)
150 {
151 uint32_t remaining;
152 int len;
153
154 /* In case all data requested has already been buffered, return. */
155 if (length <= (buf->end - buf->cur)) {
156 return 0;
157 }
158
159 /* Calculate how much data we need to read from the transport,
160 * given the already buffered data.
161 */
162 remaining = length - (buf->end - buf->cur);
163
164 /* Check if read does not exceed the buffer. */
165 if ((buf->end + remaining > client->rx_buf + client->rx_buf_size) ||
166 (buf->end + remaining < client->rx_buf)) {
167 NET_ERR("[CID %p]: Read would exceed RX buffer bounds.",
168 client);
169 return -ENOMEM;
170 }
171
172 len = mqtt_transport_read(client, buf->end, remaining, false);
173 if (len < 0) {
174 if (len != -EAGAIN) {
175 NET_ERR("[CID %p]: Transport read error: %d", client, len);
176 }
177 return len;
178 }
179
180 if (len == 0) {
181 NET_ERR("[CID %p]: Connection closed.", client);
182 return -ENOTCONN;
183 }
184
185 client->internal.rx_buf_datalen += len;
186 buf->end += len;
187
188 if (len < remaining) {
189 NET_ERR("[CID %p]: Message partially received.", client);
190 return -EAGAIN;
191 }
192
193 return 0;
194 }
195
mqtt_read_publish_var_header(struct mqtt_client * client,uint8_t type_and_flags,struct buf_ctx * buf)196 static int mqtt_read_publish_var_header(struct mqtt_client *client,
197 uint8_t type_and_flags,
198 struct buf_ctx *buf)
199 {
200 uint8_t qos = (type_and_flags & MQTT_HEADER_QOS_MASK) >> 1;
201 int err_code;
202 uint32_t variable_header_length;
203
204 /* Read topic length field. */
205 err_code = mqtt_read_message_chunk(client, buf, sizeof(uint16_t));
206 if (err_code < 0) {
207 return err_code;
208 }
209
210 variable_header_length = *buf->cur << 8; /* MSB */
211 variable_header_length |= *(buf->cur + 1); /* LSB */
212
213 /* Add two bytes for topic length field. */
214 variable_header_length += sizeof(uint16_t);
215
216 /* Add two bytes for message_id, if needed. */
217 if (qos > MQTT_QOS_0_AT_MOST_ONCE) {
218 variable_header_length += sizeof(uint16_t);
219 }
220
221 /* Now we can read the whole header. */
222 err_code = mqtt_read_message_chunk(client, buf,
223 variable_header_length);
224 if (err_code < 0) {
225 return err_code;
226 }
227
228 return 0;
229 }
230
mqtt_read_and_parse_fixed_header(struct mqtt_client * client,uint8_t * type_and_flags,uint32_t * var_length,struct buf_ctx * buf)231 static int mqtt_read_and_parse_fixed_header(struct mqtt_client *client,
232 uint8_t *type_and_flags,
233 uint32_t *var_length,
234 struct buf_ctx *buf)
235 {
236 /* Read the mandatory part of the fixed header in first iteration. */
237 uint8_t chunk_size = MQTT_FIXED_HEADER_MIN_SIZE;
238 int err_code;
239
240 do {
241 err_code = mqtt_read_message_chunk(client, buf, chunk_size);
242 if (err_code < 0) {
243 return err_code;
244 }
245
246 /* Reset to pointer to the beginning of the frame. */
247 buf->cur = client->rx_buf;
248 chunk_size = 1U;
249
250 err_code = fixed_header_decode(buf, type_and_flags, var_length);
251 } while (err_code == -EAGAIN);
252
253 return err_code;
254 }
255
mqtt_handle_rx(struct mqtt_client * client)256 int mqtt_handle_rx(struct mqtt_client *client)
257 {
258 int err_code;
259 uint8_t type_and_flags;
260 uint32_t var_length;
261 struct buf_ctx buf;
262
263 buf.cur = client->rx_buf;
264 buf.end = client->rx_buf + client->internal.rx_buf_datalen;
265
266 err_code = mqtt_read_and_parse_fixed_header(client, &type_and_flags,
267 &var_length, &buf);
268 if (err_code < 0) {
269 return (err_code == -EAGAIN) ? 0 : err_code;
270 }
271
272 if ((type_and_flags & 0xF0) == MQTT_PKT_TYPE_PUBLISH) {
273 err_code = mqtt_read_publish_var_header(client, type_and_flags,
274 &buf);
275 } else {
276 err_code = mqtt_read_message_chunk(client, &buf, var_length);
277 }
278
279 if (err_code < 0) {
280 return (err_code == -EAGAIN) ? 0 : err_code;
281 }
282
283 /* At this point, packet is ready to be passed to the application. */
284 err_code = mqtt_handle_packet(client, type_and_flags, var_length, &buf);
285 if (err_code < 0) {
286 return err_code;
287 }
288
289 client->internal.rx_buf_datalen = 0U;
290
291 return 0;
292 }
293