1 /*
2  * Copyright (c) 2018 Nordic Semiconductor ASA
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/logging/log.h>
8 LOG_MODULE_REGISTER(net_mqtt_rx, CONFIG_MQTT_LOG_LEVEL);
9 
10 #include "mqtt_internal.h"
11 #include "mqtt_transport.h"
12 #include "mqtt_os.h"
13 
14 /** @file mqtt_rx.c
15  *
16  * @brief MQTT Received data handling.
17  */
18 
mqtt_handle_packet(struct mqtt_client * client,uint8_t type_and_flags,uint32_t var_length,struct buf_ctx * buf)19 static int mqtt_handle_packet(struct mqtt_client *client,
20 			      uint8_t type_and_flags,
21 			      uint32_t var_length,
22 			      struct buf_ctx *buf)
23 {
24 	int err_code = 0;
25 	bool notify_event = true;
26 	struct mqtt_evt evt;
27 
28 	/* Success by default, overwritten in special cases. */
29 	evt.result = 0;
30 
31 	switch (type_and_flags & 0xF0) {
32 	case MQTT_PKT_TYPE_CONNACK:
33 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_CONNACK!", client);
34 
35 		evt.type = MQTT_EVT_CONNACK;
36 		err_code = connect_ack_decode(client, buf, &evt.param.connack);
37 		if (err_code == 0) {
38 			NET_DBG("[CID %p]: return_code: %d", client,
39 				 evt.param.connack.return_code);
40 
41 			if (evt.param.connack.return_code ==
42 						MQTT_CONNECTION_ACCEPTED) {
43 				/* Set state. */
44 				MQTT_SET_STATE(client, MQTT_STATE_CONNECTED);
45 			} else {
46 				err_code = -ECONNREFUSED;
47 			}
48 
49 			evt.result = evt.param.connack.return_code;
50 		} else {
51 			evt.result = err_code;
52 		}
53 
54 		break;
55 
56 	case MQTT_PKT_TYPE_PUBLISH:
57 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBLISH", client);
58 
59 		evt.type = MQTT_EVT_PUBLISH;
60 		err_code = publish_decode(type_and_flags, var_length, buf,
61 					  &evt.param.publish);
62 		evt.result = err_code;
63 
64 		client->internal.remaining_payload =
65 					evt.param.publish.message.payload.len;
66 
67 		NET_DBG("PUB QoS:%02x, message len %08x, topic len %08x",
68 			 evt.param.publish.message.topic.qos,
69 			 evt.param.publish.message.payload.len,
70 			 evt.param.publish.message.topic.topic.size);
71 
72 		break;
73 
74 	case MQTT_PKT_TYPE_PUBACK:
75 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBACK!", client);
76 
77 		evt.type = MQTT_EVT_PUBACK;
78 		err_code = publish_ack_decode(buf, &evt.param.puback);
79 		evt.result = err_code;
80 		break;
81 
82 	case MQTT_PKT_TYPE_PUBREC:
83 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREC!", client);
84 
85 		evt.type = MQTT_EVT_PUBREC;
86 		err_code = publish_receive_decode(buf, &evt.param.pubrec);
87 		evt.result = err_code;
88 		break;
89 
90 	case MQTT_PKT_TYPE_PUBREL:
91 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBREL!", client);
92 
93 		evt.type = MQTT_EVT_PUBREL;
94 		err_code = publish_release_decode(buf, &evt.param.pubrel);
95 		evt.result = err_code;
96 		break;
97 
98 	case MQTT_PKT_TYPE_PUBCOMP:
99 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PUBCOMP!", client);
100 
101 		evt.type = MQTT_EVT_PUBCOMP;
102 		err_code = publish_complete_decode(buf, &evt.param.pubcomp);
103 		evt.result = err_code;
104 		break;
105 
106 	case MQTT_PKT_TYPE_SUBACK:
107 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_SUBACK!", client);
108 
109 		evt.type = MQTT_EVT_SUBACK;
110 		err_code = subscribe_ack_decode(buf, &evt.param.suback);
111 		evt.result = err_code;
112 		break;
113 
114 	case MQTT_PKT_TYPE_UNSUBACK:
115 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_UNSUBACK!", client);
116 
117 		evt.type = MQTT_EVT_UNSUBACK;
118 		err_code = unsubscribe_ack_decode(buf, &evt.param.unsuback);
119 		evt.result = err_code;
120 		break;
121 
122 	case MQTT_PKT_TYPE_PINGRSP:
123 		NET_DBG("[CID %p]: Received MQTT_PKT_TYPE_PINGRSP!", client);
124 
125 		if (client->unacked_ping <= 0) {
126 			NET_WARN("Unexpected PINGRSP");
127 			client->unacked_ping = 0;
128 		} else {
129 			client->unacked_ping--;
130 		}
131 
132 		evt.type = MQTT_EVT_PINGRESP;
133 		break;
134 
135 	default:
136 		/* Nothing to notify. */
137 		notify_event = false;
138 		break;
139 	}
140 
141 	if (notify_event == true) {
142 		event_notify(client, &evt);
143 	}
144 
145 	return err_code;
146 }
147 
mqtt_read_message_chunk(struct mqtt_client * client,struct buf_ctx * buf,uint32_t length)148 static int mqtt_read_message_chunk(struct mqtt_client *client,
149 				   struct buf_ctx *buf, uint32_t length)
150 {
151 	uint32_t remaining;
152 	int len;
153 
154 	/* In case all data requested has already been buffered, return. */
155 	if (length <= (buf->end - buf->cur)) {
156 		return 0;
157 	}
158 
159 	/* Calculate how much data we need to read from the transport,
160 	 * given the already buffered data.
161 	 */
162 	remaining = length - (buf->end - buf->cur);
163 
164 	/* Check if read does not exceed the buffer. */
165 	if ((buf->end + remaining > client->rx_buf + client->rx_buf_size) ||
166 	    (buf->end + remaining < client->rx_buf)) {
167 		NET_ERR("[CID %p]: Read would exceed RX buffer bounds.",
168 			 client);
169 		return -ENOMEM;
170 	}
171 
172 	len = mqtt_transport_read(client, buf->end, remaining, false);
173 	if (len < 0) {
174 		if (len != -EAGAIN) {
175 			NET_ERR("[CID %p]: Transport read error: %d", client, len);
176 		}
177 		return len;
178 	}
179 
180 	if (len == 0) {
181 		NET_ERR("[CID %p]: Connection closed.", client);
182 		return -ENOTCONN;
183 	}
184 
185 	client->internal.rx_buf_datalen += len;
186 	buf->end += len;
187 
188 	if (len < remaining) {
189 		NET_ERR("[CID %p]: Message partially received.", client);
190 		return -EAGAIN;
191 	}
192 
193 	return 0;
194 }
195 
mqtt_read_publish_var_header(struct mqtt_client * client,uint8_t type_and_flags,struct buf_ctx * buf)196 static int mqtt_read_publish_var_header(struct mqtt_client *client,
197 					uint8_t type_and_flags,
198 					struct buf_ctx *buf)
199 {
200 	uint8_t qos = (type_and_flags & MQTT_HEADER_QOS_MASK) >> 1;
201 	int err_code;
202 	uint32_t variable_header_length;
203 
204 	/* Read topic length field. */
205 	err_code = mqtt_read_message_chunk(client, buf, sizeof(uint16_t));
206 	if (err_code < 0) {
207 		return err_code;
208 	}
209 
210 	variable_header_length = *buf->cur << 8; /* MSB */
211 	variable_header_length |= *(buf->cur + 1); /* LSB */
212 
213 	/* Add two bytes for topic length field. */
214 	variable_header_length += sizeof(uint16_t);
215 
216 	/* Add two bytes for message_id, if needed. */
217 	if (qos > MQTT_QOS_0_AT_MOST_ONCE) {
218 		variable_header_length += sizeof(uint16_t);
219 	}
220 
221 	/* Now we can read the whole header. */
222 	err_code = mqtt_read_message_chunk(client, buf,
223 					   variable_header_length);
224 	if (err_code < 0) {
225 		return err_code;
226 	}
227 
228 	return 0;
229 }
230 
mqtt_read_and_parse_fixed_header(struct mqtt_client * client,uint8_t * type_and_flags,uint32_t * var_length,struct buf_ctx * buf)231 static int mqtt_read_and_parse_fixed_header(struct mqtt_client *client,
232 					    uint8_t *type_and_flags,
233 					    uint32_t *var_length,
234 					    struct buf_ctx *buf)
235 {
236 	/* Read the mandatory part of the fixed header in first iteration. */
237 	uint8_t chunk_size = MQTT_FIXED_HEADER_MIN_SIZE;
238 	int err_code;
239 
240 	do {
241 		err_code = mqtt_read_message_chunk(client, buf, chunk_size);
242 		if (err_code < 0) {
243 			return err_code;
244 		}
245 
246 		/* Reset to pointer to the beginning of the frame. */
247 		buf->cur = client->rx_buf;
248 		chunk_size = 1U;
249 
250 		err_code = fixed_header_decode(buf, type_and_flags, var_length);
251 	} while (err_code == -EAGAIN);
252 
253 	return err_code;
254 }
255 
mqtt_handle_rx(struct mqtt_client * client)256 int mqtt_handle_rx(struct mqtt_client *client)
257 {
258 	int err_code;
259 	uint8_t type_and_flags;
260 	uint32_t var_length;
261 	struct buf_ctx buf;
262 
263 	buf.cur = client->rx_buf;
264 	buf.end = client->rx_buf + client->internal.rx_buf_datalen;
265 
266 	err_code = mqtt_read_and_parse_fixed_header(client, &type_and_flags,
267 						    &var_length, &buf);
268 	if (err_code < 0) {
269 		return (err_code == -EAGAIN) ? 0 : err_code;
270 	}
271 
272 	if ((type_and_flags & 0xF0) == MQTT_PKT_TYPE_PUBLISH) {
273 		err_code = mqtt_read_publish_var_header(client, type_and_flags,
274 							&buf);
275 	} else {
276 		err_code = mqtt_read_message_chunk(client, &buf, var_length);
277 	}
278 
279 	if (err_code < 0) {
280 		return (err_code == -EAGAIN) ? 0 : err_code;
281 	}
282 
283 	/* At this point, packet is ready to be passed to the application. */
284 	err_code = mqtt_handle_packet(client, type_and_flags, var_length, &buf);
285 	if (err_code < 0) {
286 		return err_code;
287 	}
288 
289 	client->internal.rx_buf_datalen = 0U;
290 
291 	return 0;
292 }
293