1 /*
2  * Copyright (c) 2022 grandcentrix GmbH
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 /** @file mqtt_sn.c
8  *
9  * @brief MQTT-SN Client API Implementation.
10  */
11 
12 #include "mqtt_sn_msg.h"
13 
14 #include <zephyr/logging/log.h>
15 #include <zephyr/net/mqtt_sn.h>
16 LOG_MODULE_REGISTER(net_mqtt_sn, CONFIG_MQTT_SN_LOG_LEVEL);
17 
18 #define MQTT_SN_NET_BUFS (CONFIG_MQTT_SN_LIB_MAX_PUBLISH)
19 
20 NET_BUF_POOL_FIXED_DEFINE(mqtt_sn_messages, MQTT_SN_NET_BUFS, CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE,
21 			  0, NULL);
22 
23 struct mqtt_sn_confirmable {
24 	int64_t last_attempt;
25 	uint16_t msg_id;
26 	uint8_t retries;
27 };
28 
29 struct mqtt_sn_publish {
30 	struct mqtt_sn_confirmable con;
31 	sys_snode_t next;
32 	struct mqtt_sn_topic *topic;
33 	uint8_t pubdata[CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE];
34 	size_t datalen;
35 	enum mqtt_sn_qos qos;
36 	bool retain;
37 };
38 
39 enum mqtt_sn_topic_state {
40 	MQTT_SN_TOPIC_STATE_REGISTERING,
41 	MQTT_SN_TOPIC_STATE_REGISTERED,
42 	MQTT_SN_TOPIC_STATE_SUBSCRIBING,
43 	MQTT_SN_TOPIC_STATE_SUBSCRIBED,
44 	MQTT_SN_TOPIC_STATE_UNSUBSCRIBING,
45 };
46 
47 struct mqtt_sn_topic {
48 	struct mqtt_sn_confirmable con;
49 	sys_snode_t next;
50 	char name[CONFIG_MQTT_SN_LIB_MAX_TOPIC_SIZE];
51 	size_t namelen;
52 	uint16_t topic_id;
53 	enum mqtt_sn_qos qos;
54 	enum mqtt_sn_topic_type type;
55 	enum mqtt_sn_topic_state state;
56 };
57 
58 K_MEM_SLAB_DEFINE_STATIC(publishes, sizeof(struct mqtt_sn_publish),
59 			 CONFIG_MQTT_SN_LIB_MAX_PUBLISH, 4);
60 K_MEM_SLAB_DEFINE_STATIC(topics, sizeof(struct mqtt_sn_topic), CONFIG_MQTT_SN_LIB_MAX_TOPICS, 4);
61 
62 enum mqtt_sn_client_state {
63 	MQTT_SN_CLIENT_DISCONNECTED,
64 	MQTT_SN_CLIENT_ACTIVE,
65 	MQTT_SN_CLIENT_ASLEEP,
66 	MQTT_SN_CLIENT_AWAKE
67 };
68 
mqtt_sn_set_state(struct mqtt_sn_client * client,enum mqtt_sn_client_state state)69 static void mqtt_sn_set_state(struct mqtt_sn_client *client, enum mqtt_sn_client_state state)
70 {
71 	int prev_state = client->state;
72 
73 	client->state = state;
74 	LOG_DBG("Client %p state (%d) -> (%d)", client, prev_state, state);
75 }
76 
77 #define T_RETRY_MSEC (CONFIG_MQTT_SN_LIB_T_RETRY * MSEC_PER_SEC)
78 #define N_RETRY (CONFIG_MQTT_SN_LIB_N_RETRY)
79 #define T_KEEPALIVE_MSEC (CONFIG_MQTT_SN_KEEPALIVE * MSEC_PER_SEC)
80 
next_msg_id(void)81 static uint16_t next_msg_id(void)
82 {
83 	static uint16_t msg_id;
84 
85 	return ++msg_id;
86 }
87 
encode_and_send(struct mqtt_sn_client * client,struct mqtt_sn_param * p)88 static int encode_and_send(struct mqtt_sn_client *client, struct mqtt_sn_param *p)
89 {
90 	int err;
91 
92 	err = mqtt_sn_encode_msg(&client->tx, p);
93 	if (err) {
94 		goto end;
95 	}
96 
97 	LOG_HEXDUMP_DBG(client->tx.data, client->tx.len, "Send message");
98 
99 	if (!client->transport->msg_send) {
100 		LOG_ERR("Can't send: no callback");
101 		err = -ENOTSUP;
102 		goto end;
103 	}
104 
105 	if (!client->tx.len) {
106 		LOG_WRN("Can't send: empty");
107 		err = -ENOMSG;
108 		goto end;
109 	}
110 
111 	err = client->transport->msg_send(client, client->tx.data, client->tx.len);
112 	if (err) {
113 		LOG_ERR("Error during send: %d", err);
114 		goto end;
115 	}
116 
117 end:
118 	net_buf_simple_reset(&client->tx);
119 
120 	return err;
121 }
122 
mqtt_sn_con_init(struct mqtt_sn_confirmable * con)123 static void mqtt_sn_con_init(struct mqtt_sn_confirmable *con)
124 {
125 	con->last_attempt = 0;
126 	con->retries = N_RETRY;
127 	con->msg_id = next_msg_id();
128 }
129 
mqtt_sn_publish_destroy(struct mqtt_sn_client * client,struct mqtt_sn_publish * pub)130 static void mqtt_sn_publish_destroy(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub)
131 {
132 	sys_slist_find_and_remove(&client->publish, &pub->next);
133 	k_mem_slab_free(&publishes, (void *)pub);
134 }
135 
mqtt_sn_publish_destroy_all(struct mqtt_sn_client * client)136 static void mqtt_sn_publish_destroy_all(struct mqtt_sn_client *client)
137 {
138 	struct mqtt_sn_publish *pub;
139 	sys_snode_t *next;
140 
141 	while ((next = sys_slist_get(&client->publish)) != NULL) {
142 		pub = SYS_SLIST_CONTAINER(next, pub, next);
143 		k_mem_slab_free(&publishes, (void *)pub);
144 	}
145 }
146 
mqtt_sn_publish_create(struct mqtt_sn_data * data)147 static struct mqtt_sn_publish *mqtt_sn_publish_create(struct mqtt_sn_data *data)
148 {
149 	struct mqtt_sn_publish *pub;
150 
151 	if (k_mem_slab_alloc(&publishes, (void **)&pub, K_NO_WAIT)) {
152 		LOG_ERR("Can't create PUB: no free slot");
153 		return NULL;
154 	}
155 
156 	memset(pub, 0, sizeof(*pub));
157 
158 	if (data && data->data && data->size) {
159 		if (data->size > sizeof(pub->pubdata)) {
160 			LOG_ERR("Can't create PUB: Too much data (%" PRIu16 ")", data->size);
161 			return NULL;
162 		}
163 
164 		memcpy(pub->pubdata, data->data, data->size);
165 		pub->datalen = data->size;
166 	}
167 
168 	mqtt_sn_con_init(&pub->con);
169 
170 	return pub;
171 }
172 
mqtt_sn_publish_find_msg_id(struct mqtt_sn_client * client,uint16_t msg_id)173 static struct mqtt_sn_publish *mqtt_sn_publish_find_msg_id(struct mqtt_sn_client *client,
174 							   uint16_t msg_id)
175 {
176 	struct mqtt_sn_publish *pub;
177 
178 	SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
179 		if (pub->con.msg_id == msg_id) {
180 			return pub;
181 		}
182 	}
183 
184 	return NULL;
185 }
186 
mqtt_sn_publish_find_topic(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)187 static struct mqtt_sn_publish *mqtt_sn_publish_find_topic(struct mqtt_sn_client *client,
188 							  struct mqtt_sn_topic *topic)
189 {
190 	struct mqtt_sn_publish *pub;
191 
192 	SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
193 		if (pub->topic == topic) {
194 			return pub;
195 		}
196 	}
197 
198 	return NULL;
199 }
200 
mqtt_sn_topic_create(struct mqtt_sn_data * name)201 static struct mqtt_sn_topic *mqtt_sn_topic_create(struct mqtt_sn_data *name)
202 {
203 	struct mqtt_sn_topic *topic;
204 
205 	if (k_mem_slab_alloc(&topics, (void **)&topic, K_NO_WAIT)) {
206 		LOG_ERR("Can't create topic: no free slot");
207 		return NULL;
208 	}
209 
210 	memset(topic, 0, sizeof(*topic));
211 
212 	if (!name || !name->data || !name->size) {
213 		LOG_ERR("Can't create topic with empty name");
214 		return NULL;
215 	}
216 
217 	if (name->size > sizeof(topic->name)) {
218 		LOG_ERR("Can't create topic: name too long (%" PRIu16 ")", name->size);
219 		return NULL;
220 	}
221 
222 	memcpy(topic->name, name->data, name->size);
223 	topic->namelen = name->size;
224 
225 	mqtt_sn_con_init(&topic->con);
226 
227 	return topic;
228 }
229 
mqtt_sn_topic_find_name(struct mqtt_sn_client * client,struct mqtt_sn_data * topic_name)230 static struct mqtt_sn_topic *mqtt_sn_topic_find_name(struct mqtt_sn_client *client,
231 							 struct mqtt_sn_data *topic_name)
232 {
233 	struct mqtt_sn_topic *topic;
234 
235 	SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
236 		if (topic->namelen == topic_name->size &&
237 			memcmp(topic->name, topic_name->data, topic_name->size) == 0) {
238 			return topic;
239 		}
240 	}
241 
242 	return NULL;
243 }
244 
mqtt_sn_topic_find_msg_id(struct mqtt_sn_client * client,uint16_t msg_id)245 static struct mqtt_sn_topic *mqtt_sn_topic_find_msg_id(struct mqtt_sn_client *client,
246 							   uint16_t msg_id)
247 {
248 	struct mqtt_sn_topic *topic;
249 
250 	SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
251 		if (topic->con.msg_id == msg_id) {
252 			return topic;
253 		}
254 	}
255 
256 	return NULL;
257 }
258 
mqtt_sn_topic_destroy(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)259 static void mqtt_sn_topic_destroy(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
260 {
261 	struct mqtt_sn_publish *pub;
262 
263 	/* Destroy all pubs referencing this topic */
264 	while ((pub = mqtt_sn_publish_find_topic(client, topic)) != NULL) {
265 		LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
266 		mqtt_sn_publish_destroy(client, pub);
267 	}
268 
269 	sys_slist_find_and_remove(&client->topic, &topic->next);
270 }
271 
mqtt_sn_topic_destroy_all(struct mqtt_sn_client * client)272 static void mqtt_sn_topic_destroy_all(struct mqtt_sn_client *client)
273 {
274 	struct mqtt_sn_topic *topic;
275 	struct mqtt_sn_publish *pub;
276 	sys_snode_t *next;
277 
278 	while ((next = sys_slist_get(&client->topic)) != NULL) {
279 		topic = SYS_SLIST_CONTAINER(next, topic, next);
280 		/* Destroy all pubs referencing this topic */
281 		while ((pub = mqtt_sn_publish_find_topic(client, topic)) != NULL) {
282 			LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
283 			mqtt_sn_publish_destroy(client, pub);
284 		}
285 
286 		k_mem_slab_free(&topics, (void *)topic);
287 	}
288 }
289 
mqtt_sn_disconnect_internal(struct mqtt_sn_client * client)290 static void mqtt_sn_disconnect_internal(struct mqtt_sn_client *client)
291 {
292 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
293 
294 	mqtt_sn_set_state(client, MQTT_SN_CLIENT_DISCONNECTED);
295 	if (client->evt_cb) {
296 		client->evt_cb(client, &evt);
297 	}
298 
299 	/*
300 	 * Remove all publishes, but keep topics
301 	 * Topics are removed on deinit or when connect is called with
302 	 * clean-session = true
303 	 */
304 	mqtt_sn_publish_destroy_all(client);
305 
306 	k_work_cancel_delayable(&client->process_work);
307 }
308 
mqtt_sn_sleep_internal(struct mqtt_sn_client * client)309 static void mqtt_sn_sleep_internal(struct mqtt_sn_client *client)
310 {
311 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
312 
313 	mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
314 	if (client->evt_cb) {
315 		client->evt_cb(client, &evt);
316 	}
317 }
318 
mqtt_sn_do_subscribe(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic,bool dup)319 static void mqtt_sn_do_subscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic,
320 				 bool dup)
321 {
322 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_SUBSCRIBE};
323 
324 	if (!client || !topic) {
325 		return;
326 	}
327 
328 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
329 		LOG_ERR("Cannot subscribe: not connected");
330 		return;
331 	}
332 
333 	p.params.subscribe.msg_id = topic->con.msg_id;
334 	p.params.subscribe.qos = topic->qos;
335 	p.params.subscribe.topic_type = topic->type;
336 	p.params.subscribe.dup = dup;
337 	switch (topic->type) {
338 	case MQTT_SN_TOPIC_TYPE_NORMAL:
339 		p.params.subscribe.topic.topic_name.data = topic->name;
340 		p.params.subscribe.topic.topic_name.size = topic->namelen;
341 		break;
342 	case MQTT_SN_TOPIC_TYPE_PREDEF:
343 	case MQTT_SN_TOPIC_TYPE_SHORT:
344 		p.params.subscribe.topic.topic_id = topic->topic_id;
345 		break;
346 	default:
347 		LOG_ERR("Unexpected topic type %d", topic->type);
348 		return;
349 	}
350 
351 	encode_and_send(client, &p);
352 }
353 
mqtt_sn_do_unsubscribe(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)354 static void mqtt_sn_do_unsubscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
355 {
356 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_UNSUBSCRIBE};
357 
358 	if (!client || !topic) {
359 		return;
360 	}
361 
362 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
363 		LOG_ERR("Cannot unsubscribe: not connected");
364 		return;
365 	}
366 
367 	p.params.unsubscribe.msg_id = topic->con.msg_id;
368 	p.params.unsubscribe.topic_type = topic->type;
369 	switch (topic->type) {
370 	case MQTT_SN_TOPIC_TYPE_NORMAL:
371 		p.params.unsubscribe.topic.topic_name.data = topic->name;
372 		p.params.unsubscribe.topic.topic_name.size = topic->namelen;
373 		break;
374 	case MQTT_SN_TOPIC_TYPE_PREDEF:
375 	case MQTT_SN_TOPIC_TYPE_SHORT:
376 		p.params.unsubscribe.topic.topic_id = topic->topic_id;
377 		break;
378 	default:
379 		LOG_ERR("Unexpected topic type %d", topic->type);
380 		return;
381 	}
382 
383 	encode_and_send(client, &p);
384 }
385 
mqtt_sn_do_register(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)386 static void mqtt_sn_do_register(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
387 {
388 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_REGISTER};
389 
390 	if (!client || !topic) {
391 		return;
392 	}
393 
394 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
395 		LOG_ERR("Cannot register: not connected");
396 		return;
397 	}
398 
399 	p.params.reg.msg_id = topic->con.msg_id;
400 	switch (topic->type) {
401 	case MQTT_SN_TOPIC_TYPE_NORMAL:
402 		LOG_HEXDUMP_INF(topic->name, topic->namelen, "Registering topic");
403 		p.params.reg.topic.data = topic->name;
404 		p.params.reg.topic.size = topic->namelen;
405 		break;
406 	default:
407 		LOG_ERR("Unexpected topic type %d", topic->type);
408 		return;
409 	}
410 
411 	encode_and_send(client, &p);
412 }
413 
mqtt_sn_do_publish(struct mqtt_sn_client * client,struct mqtt_sn_publish * pub,bool dup)414 static void mqtt_sn_do_publish(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub, bool dup)
415 {
416 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PUBLISH};
417 
418 	if (!client || !pub) {
419 		return;
420 	}
421 
422 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
423 		LOG_ERR("Cannot subscribe: not connected");
424 		return;
425 	}
426 
427 	LOG_INF("Publishing to topic ID %d", pub->topic->topic_id);
428 
429 	p.params.publish.data.data = pub->pubdata;
430 	p.params.publish.data.size = pub->datalen;
431 	p.params.publish.msg_id = pub->con.msg_id;
432 	p.params.publish.retain = pub->retain;
433 	p.params.publish.topic_id = pub->topic->topic_id;
434 	p.params.publish.topic_type = pub->topic->type;
435 	p.params.publish.qos = pub->qos;
436 	p.params.publish.dup = dup;
437 
438 	encode_and_send(client, &p);
439 }
440 
mqtt_sn_do_ping(struct mqtt_sn_client * client)441 static void mqtt_sn_do_ping(struct mqtt_sn_client *client)
442 {
443 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PINGREQ};
444 
445 	switch (client->state) {
446 	case MQTT_SN_CLIENT_ASLEEP:
447 		/*
448 		 * From the spec regarding PINGREQ:
449 		 * ClientId: contains the client id; this field is optional
450 		 * and is included by a “sleeping” client when it goes to the
451 		 * “awake” state and is waiting for messages sent by the
452 		 * server/gateway, see Section 6.14 for further details.
453 		 */
454 		p.params.pingreq.client_id.data = client->client_id.data;
455 		p.params.pingreq.client_id.size = client->client_id.size;
456 	case MQTT_SN_CLIENT_ACTIVE:
457 		encode_and_send(client, &p);
458 		break;
459 	default:
460 		LOG_WRN("Can't ping in state %d", client->state);
461 		break;
462 	}
463 }
464 
process_pubs(struct mqtt_sn_client * client,int64_t * next_cycle)465 static int process_pubs(struct mqtt_sn_client *client, int64_t *next_cycle)
466 {
467 	struct mqtt_sn_publish *pub, *pubs;
468 	const int64_t now = k_uptime_get();
469 	int64_t next_attempt;
470 	bool dup;
471 
472 	*next_cycle = 0;
473 
474 	SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->publish, pub, pubs, next) {
475 		LOG_HEXDUMP_DBG(pub->topic->name, pub->topic->namelen,
476 				"Processing publish for topic");
477 		LOG_HEXDUMP_DBG(pub->pubdata, pub->datalen, "Processing publish data");
478 
479 		if (pub->con.last_attempt == 0) {
480 			next_attempt = 0;
481 			dup = false;
482 		} else {
483 			next_attempt = pub->con.last_attempt + T_RETRY_MSEC;
484 			dup = true;
485 		}
486 
487 		if (next_attempt <= now) {
488 			switch (pub->topic->state) {
489 			case MQTT_SN_TOPIC_STATE_REGISTERING:
490 			case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
491 			case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
492 				LOG_INF("Can't publish; topic is not ready");
493 				break;
494 			case MQTT_SN_TOPIC_STATE_REGISTERED:
495 			case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
496 				if (!pub->con.retries--) {
497 					LOG_WRN("PUB ran out of retries, disconnecting");
498 					mqtt_sn_disconnect_internal(client);
499 					return -ETIMEDOUT;
500 				}
501 				mqtt_sn_do_publish(client, pub, dup);
502 				if (pub->qos == MQTT_SN_QOS_0 || pub->qos == MQTT_SN_QOS_M1) {
503 					/* We are done, remove this */
504 					mqtt_sn_publish_destroy(client, pub);
505 					continue;
506 				} else {
507 					/* Wait for ack */
508 					pub->con.last_attempt = now;
509 					next_attempt = now + T_RETRY_MSEC;
510 				}
511 				break;
512 			}
513 		}
514 
515 		if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
516 			*next_cycle = next_attempt;
517 		}
518 	}
519 
520 	return 0;
521 }
522 
process_topics(struct mqtt_sn_client * client,int64_t * next_cycle)523 static int process_topics(struct mqtt_sn_client *client, int64_t *next_cycle)
524 {
525 	struct mqtt_sn_topic *topic;
526 	const int64_t now = k_uptime_get();
527 	int64_t next_attempt;
528 	bool dup;
529 
530 	SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
531 		LOG_HEXDUMP_DBG(topic->name, topic->namelen, "Processing topic");
532 
533 		if (topic->con.last_attempt == 0) {
534 			next_attempt = 0;
535 			dup = false;
536 		} else {
537 			next_attempt = topic->con.last_attempt + T_RETRY_MSEC;
538 			dup = true;
539 		}
540 
541 		if (next_attempt <= now) {
542 			switch (topic->state) {
543 			case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
544 				if (!topic->con.retries--) {
545 					LOG_WRN("Topic ran out of retries, disconnecting");
546 					mqtt_sn_disconnect_internal(client);
547 					return -ETIMEDOUT;
548 				}
549 
550 				mqtt_sn_do_subscribe(client, topic, dup);
551 				topic->con.last_attempt = now;
552 				next_attempt = now + T_RETRY_MSEC;
553 				break;
554 			case MQTT_SN_TOPIC_STATE_REGISTERING:
555 				if (!topic->con.retries--) {
556 					LOG_WRN("Topic ran out of retries, disconnecting");
557 					mqtt_sn_disconnect_internal(client);
558 					return -ETIMEDOUT;
559 				}
560 
561 				mqtt_sn_do_register(client, topic);
562 				topic->con.last_attempt = now;
563 				next_attempt = now + T_RETRY_MSEC;
564 				break;
565 			case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
566 				if (!topic->con.retries--) {
567 					LOG_WRN("Topic ran out of retries, disconnecting");
568 					mqtt_sn_disconnect_internal(client);
569 					return -ETIMEDOUT;
570 				}
571 				mqtt_sn_do_unsubscribe(client, topic);
572 				topic->con.last_attempt = now;
573 				next_attempt = now + T_RETRY_MSEC;
574 				break;
575 			case MQTT_SN_TOPIC_STATE_REGISTERED:
576 			case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
577 				break;
578 			}
579 		}
580 
581 		if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
582 			*next_cycle = next_attempt;
583 		}
584 	}
585 
586 	return 0;
587 }
588 
process_ping(struct mqtt_sn_client * client,int64_t * next_cycle)589 static int process_ping(struct mqtt_sn_client *client, int64_t *next_cycle)
590 {
591 	const int64_t now = k_uptime_get();
592 	int64_t next_ping;
593 
594 	if (client->ping_retries == N_RETRY) {
595 		/* Last ping was acked */
596 		next_ping = client->last_ping + T_KEEPALIVE_MSEC;
597 	} else {
598 		next_ping = client->last_ping + T_RETRY_MSEC;
599 	}
600 
601 	if (next_ping < now) {
602 		if (!client->ping_retries--) {
603 			LOG_WRN("Ping ran out of retries");
604 			mqtt_sn_disconnect_internal(client);
605 			return -ETIMEDOUT;
606 		}
607 
608 		LOG_DBG("Sending PINGREQ");
609 		mqtt_sn_do_ping(client);
610 		client->last_ping = now;
611 		next_ping = now + T_RETRY_MSEC;
612 	}
613 
614 	if (*next_cycle == 0 || next_ping < *next_cycle) {
615 		*next_cycle = next_ping;
616 	}
617 
618 	return 0;
619 }
620 
process_work(struct k_work * wrk)621 static void process_work(struct k_work *wrk)
622 {
623 	struct mqtt_sn_client *client;
624 	struct k_work_delayable *dwork;
625 	int64_t next_cycle = 0;
626 	int err;
627 
628 	dwork = k_work_delayable_from_work(wrk);
629 	client = CONTAINER_OF(dwork, struct mqtt_sn_client, process_work);
630 
631 	LOG_DBG("Executing work of client %p in state %d", client, client->state);
632 
633 	if (client->state == MQTT_SN_CLIENT_DISCONNECTED) {
634 		LOG_WRN("%s called while disconnected: Nothing to do", __func__);
635 		return;
636 	}
637 
638 	if (client->state == MQTT_SN_CLIENT_ACTIVE) {
639 		err = process_topics(client, &next_cycle);
640 		if (err) {
641 			return;
642 		}
643 
644 		err = process_pubs(client, &next_cycle);
645 		if (err) {
646 			return;
647 		}
648 
649 		err = process_ping(client, &next_cycle);
650 		if (err) {
651 			return;
652 		}
653 	}
654 
655 	if (next_cycle > 0) {
656 		k_work_schedule(dwork, K_MSEC(next_cycle - k_uptime_get()));
657 	}
658 }
659 
mqtt_sn_client_init(struct mqtt_sn_client * client,const struct mqtt_sn_data * client_id,struct mqtt_sn_transport * transport,mqtt_sn_evt_cb_t evt_cb,void * tx,size_t txsz,void * rx,size_t rxsz)660 int mqtt_sn_client_init(struct mqtt_sn_client *client, const struct mqtt_sn_data *client_id,
661 			struct mqtt_sn_transport *transport, mqtt_sn_evt_cb_t evt_cb, void *tx,
662 			size_t txsz, void *rx, size_t rxsz)
663 {
664 	if (!client || !client_id || !transport || !evt_cb || !tx || !rx) {
665 		return -EINVAL;
666 	}
667 
668 	memset(client, 0, sizeof(*client));
669 
670 	client->client_id.data = client_id->data;
671 	client->client_id.size = client_id->size;
672 	client->transport = transport;
673 	client->evt_cb = evt_cb;
674 
675 	net_buf_simple_init_with_data(&client->tx, tx, txsz);
676 	net_buf_simple_reset(&client->tx);
677 
678 	net_buf_simple_init_with_data(&client->rx, rx, rxsz);
679 	net_buf_simple_reset(&client->rx);
680 
681 	k_work_init_delayable(&client->process_work, process_work);
682 
683 	if (transport->init) {
684 		transport->init(transport);
685 	}
686 
687 	return 0;
688 }
689 
mqtt_sn_client_deinit(struct mqtt_sn_client * client)690 void mqtt_sn_client_deinit(struct mqtt_sn_client *client)
691 {
692 	if (!client) {
693 		return;
694 	}
695 
696 	mqtt_sn_publish_destroy_all(client);
697 	mqtt_sn_topic_destroy_all(client);
698 
699 	if (client->transport && client->transport->deinit) {
700 		client->transport->deinit(client->transport);
701 	}
702 
703 	k_work_cancel_delayable(&client->process_work);
704 }
705 
mqtt_sn_connect(struct mqtt_sn_client * client,bool will,bool clean_session)706 int mqtt_sn_connect(struct mqtt_sn_client *client, bool will, bool clean_session)
707 {
708 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_CONNECT};
709 
710 	if (!client) {
711 		return -EINVAL;
712 	}
713 
714 	if (will && (!client->will_msg.data || !client->will_topic.data)) {
715 		LOG_ERR("will set to true, but no will data in client");
716 		return -EINVAL;
717 	}
718 
719 	if (clean_session) {
720 		mqtt_sn_topic_destroy_all(client);
721 	}
722 
723 	p.params.connect.clean_session = clean_session;
724 	p.params.connect.will = will;
725 	p.params.connect.duration = CONFIG_MQTT_SN_KEEPALIVE;
726 	p.params.connect.client_id.data = client->client_id.data;
727 	p.params.connect.client_id.size = client->client_id.size;
728 
729 	client->last_ping = k_uptime_get();
730 
731 	return encode_and_send(client, &p);
732 }
733 
mqtt_sn_disconnect(struct mqtt_sn_client * client)734 int mqtt_sn_disconnect(struct mqtt_sn_client *client)
735 {
736 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
737 	int err;
738 
739 	if (!client) {
740 		return -EINVAL;
741 	}
742 
743 	p.params.disconnect.duration = 0;
744 
745 	err = encode_and_send(client, &p);
746 	mqtt_sn_disconnect_internal(client);
747 
748 	return err;
749 }
750 
mqtt_sn_sleep(struct mqtt_sn_client * client,uint16_t duration)751 int mqtt_sn_sleep(struct mqtt_sn_client *client, uint16_t duration)
752 {
753 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
754 	int err;
755 
756 	if (!client || !duration) {
757 		return -EINVAL;
758 	}
759 
760 	p.params.disconnect.duration = duration;
761 
762 	err = encode_and_send(client, &p);
763 	mqtt_sn_sleep_internal(client);
764 
765 	return err;
766 }
767 
mqtt_sn_subscribe(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name)768 int mqtt_sn_subscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
769 			  struct mqtt_sn_data *topic_name)
770 {
771 	struct mqtt_sn_topic *topic;
772 	int err;
773 
774 	if (!client || !topic_name || !topic_name->data || !topic_name->size) {
775 		return -EINVAL;
776 	}
777 
778 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
779 		LOG_ERR("Cannot subscribe: not connected");
780 		return -ENOTCONN;
781 	}
782 
783 	topic = mqtt_sn_topic_find_name(client, topic_name);
784 	if (!topic) {
785 		topic = mqtt_sn_topic_create(topic_name);
786 		if (!topic) {
787 			return -ENOMEM;
788 		}
789 
790 		topic->qos = qos;
791 		topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBING;
792 		sys_slist_append(&client->topic, &topic->next);
793 	}
794 
795 	err = k_work_reschedule(&client->process_work, K_NO_WAIT);
796 	if (err < 0) {
797 		return err;
798 	}
799 
800 	return 0;
801 }
802 
mqtt_sn_unsubscribe(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name)803 int mqtt_sn_unsubscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
804 			struct mqtt_sn_data *topic_name)
805 {
806 	struct mqtt_sn_topic *topic;
807 	int err;
808 
809 	if (!client || !topic_name) {
810 		return -EINVAL;
811 	}
812 
813 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
814 		LOG_ERR("Cannot unsubscribe: not connected");
815 		return -ENOTCONN;
816 	}
817 
818 	topic = mqtt_sn_topic_find_name(client, topic_name);
819 	if (!topic) {
820 		LOG_HEXDUMP_ERR(topic_name->data, topic_name->size, "Topic not found");
821 		return -ENOENT;
822 	}
823 
824 	topic->state = MQTT_SN_TOPIC_STATE_UNSUBSCRIBING;
825 	mqtt_sn_con_init(&topic->con);
826 
827 	err = k_work_reschedule(&client->process_work, K_NO_WAIT);
828 	if (err < 0) {
829 		return err;
830 	}
831 
832 	return 0;
833 }
834 
mqtt_sn_publish(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name,bool retain,struct mqtt_sn_data * data)835 int mqtt_sn_publish(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
836 			struct mqtt_sn_data *topic_name, bool retain, struct mqtt_sn_data *data)
837 {
838 	struct mqtt_sn_publish *pub;
839 	struct mqtt_sn_topic *topic;
840 	int err;
841 
842 	if (!client || !topic_name) {
843 		return -EINVAL;
844 	}
845 
846 	if (qos == MQTT_SN_QOS_M1) {
847 		LOG_ERR("QoS -1 not supported");
848 		return -ENOTSUP;
849 	}
850 
851 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
852 		LOG_ERR("Cannot publish: disconnected");
853 		return -ENOTCONN;
854 	}
855 
856 	topic = mqtt_sn_topic_find_name(client, topic_name);
857 	if (!topic) {
858 		topic = mqtt_sn_topic_create(topic_name);
859 		if (!topic) {
860 			return -ENOMEM;
861 		}
862 
863 		topic->qos = qos;
864 		topic->state = MQTT_SN_TOPIC_STATE_REGISTERING;
865 		sys_slist_append(&client->topic, &topic->next);
866 	}
867 
868 	pub = mqtt_sn_publish_create(data);
869 	if (!pub) {
870 		k_work_reschedule(&client->process_work, K_NO_WAIT);
871 		return -ENOMEM;
872 	}
873 
874 	pub->qos = qos;
875 	pub->retain = retain;
876 	pub->topic = topic;
877 
878 	sys_slist_append(&client->publish, &pub->next);
879 
880 	err = k_work_reschedule(&client->process_work, K_NO_WAIT);
881 	if (err < 0) {
882 		return err;
883 	}
884 
885 	return 0;
886 }
887 
handle_connack(struct mqtt_sn_client * client,struct mqtt_sn_param_connack * p)888 static void handle_connack(struct mqtt_sn_client *client, struct mqtt_sn_param_connack *p)
889 {
890 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_CONNECTED};
891 
892 	if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
893 		LOG_INF("MQTT_SN client connected");
894 		switch (client->state) {
895 		case MQTT_SN_CLIENT_DISCONNECTED:
896 		case MQTT_SN_CLIENT_ASLEEP:
897 		case MQTT_SN_CLIENT_AWAKE:
898 			mqtt_sn_set_state(client, MQTT_SN_CLIENT_ACTIVE);
899 			if (client->evt_cb) {
900 				client->evt_cb(client, &evt);
901 			}
902 			client->ping_retries = N_RETRY;
903 			break;
904 		default:
905 			LOG_ERR("Client received CONNACK but was in state %d", client->state);
906 			return;
907 		}
908 	} else {
909 		LOG_WRN("CONNACK ret code %d", p->ret_code);
910 		mqtt_sn_disconnect_internal(client);
911 	}
912 
913 	k_work_schedule(&client->process_work, K_NO_WAIT);
914 }
915 
handle_willtopicreq(struct mqtt_sn_client * client)916 static void handle_willtopicreq(struct mqtt_sn_client *client)
917 {
918 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLTOPIC};
919 
920 	response.params.willtopic.qos = client->will_qos;
921 	response.params.willtopic.retain = client->will_retain;
922 	response.params.willtopic.topic.data = client->will_topic.data;
923 	response.params.willtopic.topic.size = client->will_topic.size;
924 
925 	encode_and_send(client, &response);
926 }
927 
handle_willmsgreq(struct mqtt_sn_client * client)928 static void handle_willmsgreq(struct mqtt_sn_client *client)
929 {
930 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLMSG};
931 
932 	response.params.willmsg.msg.data = client->will_msg.data;
933 	response.params.willmsg.msg.size = client->will_msg.size;
934 
935 	encode_and_send(client, &response);
936 }
937 
handle_register(struct mqtt_sn_client * client,struct mqtt_sn_param_register * p)938 static void handle_register(struct mqtt_sn_client *client, struct mqtt_sn_param_register *p)
939 {
940 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_REGACK};
941 	struct mqtt_sn_topic *topic;
942 
943 	topic = mqtt_sn_topic_create(&p->topic);
944 	if (!topic) {
945 		return;
946 	}
947 
948 	topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
949 	topic->topic_id = p->topic_id;
950 	topic->type = MQTT_SN_TOPIC_TYPE_NORMAL;
951 
952 	response.params.regack.ret_code = MQTT_SN_CODE_ACCEPTED;
953 	response.params.regack.topic_id = p->topic_id;
954 	response.params.regack.msg_id = p->msg_id;
955 
956 	encode_and_send(client, &response);
957 }
958 
handle_regack(struct mqtt_sn_client * client,struct mqtt_sn_param_regack * p)959 static void handle_regack(struct mqtt_sn_client *client, struct mqtt_sn_param_regack *p)
960 {
961 	struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
962 
963 	if (!topic) {
964 		LOG_ERR("Can't REGACK, no topic found");
965 		return;
966 	}
967 
968 	if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
969 		topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
970 		topic->topic_id = p->topic_id;
971 	} else {
972 		LOG_WRN("Gateway could not register topic ID %u, code %d", p->topic_id,
973 			p->ret_code);
974 	}
975 }
976 
handle_publish(struct mqtt_sn_client * client,struct mqtt_sn_param_publish * p)977 static void handle_publish(struct mqtt_sn_client *client, struct mqtt_sn_param_publish *p)
978 {
979 	struct mqtt_sn_param response;
980 	struct mqtt_sn_evt evt = {.param.publish = {.data = p->data,
981 							.topic_id = p->topic_id,
982 							.topic_type = p->topic_type},
983 				  .type = MQTT_SN_EVT_PUBLISH};
984 
985 	if (p->qos == MQTT_SN_QOS_1) {
986 		response.type = MQTT_SN_MSG_TYPE_PUBACK;
987 		response.params.puback.topic_id = p->topic_id;
988 		response.params.puback.msg_id = p->msg_id;
989 		response.params.puback.ret_code = MQTT_SN_CODE_ACCEPTED;
990 
991 		encode_and_send(client, &response);
992 	} else if (p->qos == MQTT_SN_QOS_2) {
993 		response.type = MQTT_SN_MSG_TYPE_PUBREC;
994 		response.params.pubrec.msg_id = p->msg_id;
995 
996 		encode_and_send(client, &response);
997 	}
998 
999 	if (client->evt_cb) {
1000 		client->evt_cb(client, &evt);
1001 	}
1002 }
1003 
handle_puback(struct mqtt_sn_client * client,struct mqtt_sn_param_puback * p)1004 static void handle_puback(struct mqtt_sn_client *client, struct mqtt_sn_param_puback *p)
1005 {
1006 	struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
1007 
1008 	if (!pub) {
1009 		LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1010 		return;
1011 	}
1012 
1013 	mqtt_sn_publish_destroy(client, pub);
1014 }
1015 
handle_pubrec(struct mqtt_sn_client * client,struct mqtt_sn_param_pubrec * p)1016 static void handle_pubrec(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrec *p)
1017 {
1018 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBREL};
1019 	struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
1020 
1021 	if (!pub) {
1022 		LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1023 		return;
1024 	}
1025 
1026 	pub->con.last_attempt = k_uptime_get();
1027 	pub->con.retries = N_RETRY;
1028 
1029 	response.params.pubrel.msg_id = p->msg_id;
1030 
1031 	encode_and_send(client, &response);
1032 }
1033 
handle_pubrel(struct mqtt_sn_client * client,struct mqtt_sn_param_pubrel * p)1034 static void handle_pubrel(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrel *p)
1035 {
1036 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBCOMP};
1037 
1038 	response.params.pubcomp.msg_id = p->msg_id;
1039 
1040 	encode_and_send(client, &response);
1041 }
1042 
handle_pubcomp(struct mqtt_sn_client * client,struct mqtt_sn_param_pubcomp * p)1043 static void handle_pubcomp(struct mqtt_sn_client *client, struct mqtt_sn_param_pubcomp *p)
1044 {
1045 	struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
1046 
1047 	if (!pub) {
1048 		LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1049 		return;
1050 	}
1051 
1052 	mqtt_sn_publish_destroy(client, pub);
1053 }
1054 
handle_suback(struct mqtt_sn_client * client,struct mqtt_sn_param_suback * p)1055 static void handle_suback(struct mqtt_sn_client *client, struct mqtt_sn_param_suback *p)
1056 {
1057 	struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
1058 
1059 	if (!topic) {
1060 		LOG_ERR("No matching SUBSCRIBE found for msg id %u", p->msg_id);
1061 		return;
1062 	}
1063 
1064 	if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1065 		topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBED;
1066 		topic->topic_id = p->topic_id;
1067 		topic->qos = p->qos;
1068 	} else {
1069 		LOG_WRN("SUBACK with ret code %d", p->ret_code);
1070 	}
1071 }
1072 
handle_unsuback(struct mqtt_sn_client * client,struct mqtt_sn_param_unsuback * p)1073 static void handle_unsuback(struct mqtt_sn_client *client, struct mqtt_sn_param_unsuback *p)
1074 {
1075 	struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
1076 
1077 	if (!topic || topic->state != MQTT_SN_TOPIC_STATE_UNSUBSCRIBING) {
1078 		LOG_ERR("No matching UNSUBSCRIBE found for msg id %u", p->msg_id);
1079 		return;
1080 	}
1081 
1082 	mqtt_sn_topic_destroy(client, topic);
1083 }
1084 
handle_pingreq(struct mqtt_sn_client * client)1085 static void handle_pingreq(struct mqtt_sn_client *client)
1086 {
1087 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PINGRESP};
1088 
1089 	encode_and_send(client, &response);
1090 }
1091 
handle_pingresp(struct mqtt_sn_client * client)1092 static void handle_pingresp(struct mqtt_sn_client *client)
1093 {
1094 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_PINGRESP};
1095 
1096 	if (client->evt_cb) {
1097 		client->evt_cb(client, &evt);
1098 	}
1099 
1100 	if (client->state == MQTT_SN_CLIENT_AWAKE) {
1101 		mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
1102 	}
1103 
1104 	client->ping_retries = N_RETRY;
1105 }
1106 
handle_disconnect(struct mqtt_sn_client * client,struct mqtt_sn_param_disconnect * p)1107 static void handle_disconnect(struct mqtt_sn_client *client, struct mqtt_sn_param_disconnect *p)
1108 {
1109 	LOG_INF("Received DISCONNECT");
1110 	mqtt_sn_disconnect_internal(client);
1111 }
1112 
handle_msg(struct mqtt_sn_client * client)1113 static int handle_msg(struct mqtt_sn_client *client)
1114 {
1115 	int err;
1116 	struct mqtt_sn_param p;
1117 
1118 	err = mqtt_sn_decode_msg(&client->rx, &p);
1119 	if (err) {
1120 		return err;
1121 	}
1122 
1123 	LOG_INF("Got message of type %d", p.type);
1124 
1125 	switch (p.type) {
1126 	case MQTT_SN_MSG_TYPE_GWINFO:
1127 		break;
1128 	case MQTT_SN_MSG_TYPE_CONNACK:
1129 		handle_connack(client, &p.params.connack);
1130 		break;
1131 	case MQTT_SN_MSG_TYPE_WILLTOPICREQ:
1132 		handle_willtopicreq(client);
1133 		break;
1134 	case MQTT_SN_MSG_TYPE_WILLMSGREQ:
1135 		handle_willmsgreq(client);
1136 		break;
1137 	case MQTT_SN_MSG_TYPE_REGISTER:
1138 		handle_register(client, &p.params.reg);
1139 		break;
1140 	case MQTT_SN_MSG_TYPE_REGACK:
1141 		handle_regack(client, &p.params.regack);
1142 		break;
1143 	case MQTT_SN_MSG_TYPE_PUBLISH:
1144 		handle_publish(client, &p.params.publish);
1145 		break;
1146 	case MQTT_SN_MSG_TYPE_PUBACK:
1147 		handle_puback(client, &p.params.puback);
1148 		break;
1149 	case MQTT_SN_MSG_TYPE_PUBREC:
1150 		handle_pubrec(client, &p.params.pubrec);
1151 		break;
1152 	case MQTT_SN_MSG_TYPE_PUBREL:
1153 		handle_pubrel(client, &p.params.pubrel);
1154 		break;
1155 	case MQTT_SN_MSG_TYPE_PUBCOMP:
1156 		handle_pubcomp(client, &p.params.pubcomp);
1157 		break;
1158 	case MQTT_SN_MSG_TYPE_SUBACK:
1159 		handle_suback(client, &p.params.suback);
1160 		break;
1161 	case MQTT_SN_MSG_TYPE_UNSUBACK:
1162 		handle_unsuback(client, &p.params.unsuback);
1163 		break;
1164 	case MQTT_SN_MSG_TYPE_PINGREQ:
1165 		handle_pingreq(client);
1166 		break;
1167 	case MQTT_SN_MSG_TYPE_PINGRESP:
1168 		handle_pingresp(client);
1169 		break;
1170 	case MQTT_SN_MSG_TYPE_DISCONNECT:
1171 		handle_disconnect(client, &p.params.disconnect);
1172 		break;
1173 	case MQTT_SN_MSG_TYPE_WILLTOPICRESP:
1174 		break;
1175 	case MQTT_SN_MSG_TYPE_WILLMSGRESP:
1176 		break;
1177 	default:
1178 		LOG_ERR("Unexpected message type %d", p.type);
1179 		break;
1180 	}
1181 
1182 	k_work_reschedule(&client->process_work, K_NO_WAIT);
1183 
1184 	return 0;
1185 }
1186 
mqtt_sn_input(struct mqtt_sn_client * client)1187 int mqtt_sn_input(struct mqtt_sn_client *client)
1188 {
1189 	ssize_t next_frame_size;
1190 	int err;
1191 
1192 	if (!client || !client->transport || !client->transport->recv) {
1193 		return -EINVAL;
1194 	}
1195 
1196 	if (client->transport->poll) {
1197 		next_frame_size = client->transport->poll(client);
1198 		if (next_frame_size <= 0) {
1199 			return next_frame_size;
1200 		}
1201 	}
1202 
1203 	net_buf_simple_reset(&client->rx);
1204 
1205 	next_frame_size = client->transport->recv(client, client->rx.data, client->rx.size);
1206 	if (next_frame_size <= 0) {
1207 		return next_frame_size;
1208 	}
1209 
1210 	if (next_frame_size > client->rx.size) {
1211 		return -ENOBUFS;
1212 	}
1213 
1214 	client->rx.len = next_frame_size;
1215 
1216 	LOG_HEXDUMP_DBG(client->rx.data, client->rx.len, "Received data");
1217 
1218 	err = handle_msg(client);
1219 
1220 	if (err) {
1221 		return err;
1222 	}
1223 
1224 	/* Should be zero */
1225 	return -client->rx.len;
1226 }
1227