1 /*
2  * Copyright (c) 2022 grandcentrix GmbH
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 /** @file mqtt_sn.c
8  *
9  * @brief MQTT-SN Client API Implementation.
10  */
11 
12 #include "mqtt_sn_msg.h"
13 
14 #include <zephyr/logging/log.h>
15 #include <zephyr/random/random.h>
16 #include <zephyr/net/mqtt_sn.h>
17 LOG_MODULE_REGISTER(net_mqtt_sn, CONFIG_MQTT_SN_LOG_LEVEL);
18 
19 #define MQTT_SN_NET_BUFS (CONFIG_MQTT_SN_LIB_MAX_PUBLISH)
20 
21 NET_BUF_POOL_FIXED_DEFINE(mqtt_sn_messages, MQTT_SN_NET_BUFS, CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE,
22 			  0, NULL);
23 
24 /**
25  * A struct to track attempts for actions that require acknowledgment,
26  * i.e. topic registering, subscribing, or unsubscribing.
27  */
28 struct mqtt_sn_confirmable {
29 	int64_t last_attempt;
30 	uint16_t msg_id;
31 	uint8_t retries;
32 };
33 
34 struct mqtt_sn_publish {
35 	struct mqtt_sn_confirmable con;
36 	sys_snode_t next;
37 	struct mqtt_sn_topic *topic;
38 	uint8_t pubdata[CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE];
39 	size_t datalen;
40 	enum mqtt_sn_qos qos;
41 	bool retain;
42 };
43 
44 enum mqtt_sn_topic_state {
45 	MQTT_SN_TOPIC_STATE_REGISTER,      /*!< Topic requested to be registered */
46 	MQTT_SN_TOPIC_STATE_REGISTERING,   /*!< Topic in progress of registering */
47 	MQTT_SN_TOPIC_STATE_REGISTERED,    /*!< Topic registered */
48 	MQTT_SN_TOPIC_STATE_SUBSCRIBE,     /*!< Topic requested to subscribe */
49 	MQTT_SN_TOPIC_STATE_SUBSCRIBING,   /*!< Topic in progress of subscribing */
50 	MQTT_SN_TOPIC_STATE_SUBSCRIBED,    /*!< Topic subscribed */
51 	MQTT_SN_TOPIC_STATE_UNSUBSCRIBE,   /*!< Topic requested to unsubscribe */
52 	MQTT_SN_TOPIC_STATE_UNSUBSCRIBING, /*!< Topic in progress of unsubscribing */
53 };
54 
55 struct mqtt_sn_topic {
56 	struct mqtt_sn_confirmable con;
57 	sys_snode_t next;
58 	char name[CONFIG_MQTT_SN_LIB_MAX_TOPIC_SIZE];
59 	size_t namelen;
60 	uint16_t topic_id;
61 	enum mqtt_sn_qos qos;
62 	enum mqtt_sn_topic_type type;
63 	enum mqtt_sn_topic_state state;
64 };
65 
66 struct mqtt_sn_gateway {
67 	sys_snode_t next;
68 	char gw_id;
69 	int64_t adv_timer;
70 	char addr[CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE];
71 	size_t addr_len;
72 };
73 
74 K_MEM_SLAB_DEFINE_STATIC(publishes, sizeof(struct mqtt_sn_publish), CONFIG_MQTT_SN_LIB_MAX_PUBLISH,
75 			 4);
76 K_MEM_SLAB_DEFINE_STATIC(topics, sizeof(struct mqtt_sn_topic), CONFIG_MQTT_SN_LIB_MAX_TOPICS, 4);
77 K_MEM_SLAB_DEFINE_STATIC(gateways, sizeof(struct mqtt_sn_gateway), CONFIG_MQTT_SN_LIB_MAX_GATEWAYS,
78 			 4);
79 
80 enum mqtt_sn_client_state {
81 	MQTT_SN_CLIENT_DISCONNECTED,
82 	MQTT_SN_CLIENT_ACTIVE,
83 	MQTT_SN_CLIENT_ASLEEP,
84 	MQTT_SN_CLIENT_AWAKE
85 };
86 
mqtt_sn_set_state(struct mqtt_sn_client * client,enum mqtt_sn_client_state state)87 static void mqtt_sn_set_state(struct mqtt_sn_client *client, enum mqtt_sn_client_state state)
88 {
89 	int prev_state = client->state;
90 
91 	client->state = state;
92 	LOG_DBG("Client %p state (%d) -> (%d)", client, prev_state, state);
93 }
94 
95 #define T_SEARCHGW_MSEC  (CONFIG_MQTT_SN_LIB_T_SEARCHGW * MSEC_PER_SEC)
96 #define T_GWINFO_MSEC    (CONFIG_MQTT_SN_LIB_T_GWINFO * MSEC_PER_SEC)
97 #define T_RETRY_MSEC     (CONFIG_MQTT_SN_LIB_T_RETRY * MSEC_PER_SEC)
98 #define N_RETRY          (CONFIG_MQTT_SN_LIB_N_RETRY)
99 #define T_KEEPALIVE_MSEC (CONFIG_MQTT_SN_KEEPALIVE * MSEC_PER_SEC)
100 
next_msg_id(void)101 static uint16_t next_msg_id(void)
102 {
103 	static uint16_t msg_id;
104 
105 	return ++msg_id;
106 }
107 
encode_and_send(struct mqtt_sn_client * client,struct mqtt_sn_param * p,uint8_t broadcast_radius)108 static int encode_and_send(struct mqtt_sn_client *client, struct mqtt_sn_param *p,
109 			   uint8_t broadcast_radius)
110 {
111 	int err;
112 
113 	err = mqtt_sn_encode_msg(&client->tx, p);
114 	if (err) {
115 		goto end;
116 	}
117 
118 	LOG_HEXDUMP_DBG(client->tx.data, client->tx.len, "Send message");
119 
120 	if (!client->transport->sendto) {
121 		LOG_ERR("Can't send: no callback");
122 		err = -ENOTSUP;
123 		goto end;
124 	}
125 
126 	if (!client->tx.len) {
127 		LOG_WRN("Can't send: empty");
128 		err = -ENOMSG;
129 		goto end;
130 	}
131 
132 	if (broadcast_radius) {
133 		err = client->transport->sendto(client, client->tx.data, client->tx.len, NULL,
134 						broadcast_radius);
135 	} else {
136 		struct mqtt_sn_gateway *gw;
137 
138 		gw = SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
139 		if (gw == NULL || gw->addr_len == 0) {
140 			LOG_WRN("No Gateway Address");
141 			err = -ENXIO;
142 			goto end;
143 		}
144 		err = client->transport->sendto(client, client->tx.data, client->tx.len, gw->addr,
145 						gw->addr_len);
146 	}
147 
148 end:
149 	if (err) {
150 		LOG_ERR("Error during send: %d", err);
151 	}
152 	net_buf_simple_reset(&client->tx);
153 
154 	return err;
155 }
156 
mqtt_sn_con_init(struct mqtt_sn_confirmable * con)157 static void mqtt_sn_con_init(struct mqtt_sn_confirmable *con)
158 {
159 	con->last_attempt = 0;
160 	con->retries = N_RETRY;
161 	con->msg_id = next_msg_id();
162 }
163 
mqtt_sn_publish_destroy(struct mqtt_sn_client * client,struct mqtt_sn_publish * pub)164 static void mqtt_sn_publish_destroy(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub)
165 {
166 	sys_slist_find_and_remove(&client->publish, &pub->next);
167 	k_mem_slab_free(&publishes, (void *)pub);
168 }
169 
mqtt_sn_publish_destroy_all(struct mqtt_sn_client * client)170 static void mqtt_sn_publish_destroy_all(struct mqtt_sn_client *client)
171 {
172 	struct mqtt_sn_publish *pub;
173 	sys_snode_t *next;
174 
175 	while ((next = sys_slist_get(&client->publish)) != NULL) {
176 		pub = SYS_SLIST_CONTAINER(next, pub, next);
177 		k_mem_slab_free(&publishes, (void *)pub);
178 	}
179 }
180 
mqtt_sn_publish_create(struct mqtt_sn_data * data)181 static struct mqtt_sn_publish *mqtt_sn_publish_create(struct mqtt_sn_data *data)
182 {
183 	struct mqtt_sn_publish *pub;
184 
185 	if (k_mem_slab_alloc(&publishes, (void **)&pub, K_NO_WAIT)) {
186 		LOG_ERR("Can't create PUB: no free slot");
187 		return NULL;
188 	}
189 
190 	memset(pub, 0, sizeof(*pub));
191 
192 	if (data && data->data && data->size) {
193 		if (data->size > sizeof(pub->pubdata)) {
194 			LOG_ERR("Can't create PUB: Too much data (%zu)", data->size);
195 			return NULL;
196 		}
197 
198 		memcpy(pub->pubdata, data->data, data->size);
199 		pub->datalen = data->size;
200 	}
201 
202 	mqtt_sn_con_init(&pub->con);
203 
204 	return pub;
205 }
206 
mqtt_sn_publish_find_by_msg_id(struct mqtt_sn_client * client,uint16_t msg_id)207 static struct mqtt_sn_publish *mqtt_sn_publish_find_by_msg_id(struct mqtt_sn_client *client,
208 							      uint16_t msg_id)
209 {
210 	struct mqtt_sn_publish *pub;
211 
212 	SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
213 		if (pub->con.msg_id == msg_id) {
214 			return pub;
215 		}
216 	}
217 
218 	return NULL;
219 }
220 
mqtt_sn_publish_find_by_topic(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)221 static struct mqtt_sn_publish *mqtt_sn_publish_find_by_topic(struct mqtt_sn_client *client,
222 							     struct mqtt_sn_topic *topic)
223 {
224 	struct mqtt_sn_publish *pub;
225 
226 	SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
227 		if (pub->topic == topic) {
228 			return pub;
229 		}
230 	}
231 
232 	return NULL;
233 }
234 
mqtt_sn_topic_create(struct mqtt_sn_data * name)235 static struct mqtt_sn_topic *mqtt_sn_topic_create(struct mqtt_sn_data *name)
236 {
237 	struct mqtt_sn_topic *topic;
238 
239 	if (k_mem_slab_alloc(&topics, (void **)&topic, K_NO_WAIT)) {
240 		LOG_ERR("Can't create topic: no free slot");
241 		return NULL;
242 	}
243 
244 	memset(topic, 0, sizeof(*topic));
245 
246 	if (!name || !name->data || !name->size) {
247 		LOG_ERR("Can't create topic with empty name");
248 		return NULL;
249 	}
250 
251 	if (name->size > sizeof(topic->name)) {
252 		LOG_ERR("Can't create topic: name too long (%zu)", name->size);
253 		return NULL;
254 	}
255 
256 	memcpy(topic->name, name->data, name->size);
257 	topic->namelen = name->size;
258 
259 	mqtt_sn_con_init(&topic->con);
260 
261 	return topic;
262 }
263 
mqtt_sn_topic_find_by_name(struct mqtt_sn_client * client,struct mqtt_sn_data * topic_name)264 static struct mqtt_sn_topic *mqtt_sn_topic_find_by_name(struct mqtt_sn_client *client,
265 							struct mqtt_sn_data *topic_name)
266 {
267 	struct mqtt_sn_topic *topic;
268 
269 	SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
270 		if (topic->namelen == topic_name->size &&
271 		    memcmp(topic->name, topic_name->data, topic_name->size) == 0) {
272 			return topic;
273 		}
274 	}
275 
276 	return NULL;
277 }
278 
mqtt_sn_topic_find_by_msg_id(struct mqtt_sn_client * client,uint16_t msg_id)279 static struct mqtt_sn_topic *mqtt_sn_topic_find_by_msg_id(struct mqtt_sn_client *client,
280 							  uint16_t msg_id)
281 {
282 	struct mqtt_sn_topic *topic;
283 
284 	SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
285 		if (topic->con.msg_id == msg_id) {
286 			return topic;
287 		}
288 	}
289 
290 	return NULL;
291 }
292 
mqtt_sn_topic_destroy(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)293 static void mqtt_sn_topic_destroy(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
294 {
295 	struct mqtt_sn_publish *pub;
296 
297 	/* Destroy all pubs referencing this topic */
298 	while ((pub = mqtt_sn_publish_find_by_topic(client, topic)) != NULL) {
299 		LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
300 		mqtt_sn_publish_destroy(client, pub);
301 	}
302 
303 	sys_slist_find_and_remove(&client->topic, &topic->next);
304 }
305 
mqtt_sn_topic_destroy_all(struct mqtt_sn_client * client)306 static void mqtt_sn_topic_destroy_all(struct mqtt_sn_client *client)
307 {
308 	struct mqtt_sn_topic *topic;
309 	struct mqtt_sn_publish *pub;
310 	sys_snode_t *next;
311 
312 	while ((next = sys_slist_get(&client->topic)) != NULL) {
313 		topic = SYS_SLIST_CONTAINER(next, topic, next);
314 		/* Destroy all pubs referencing this topic */
315 		while ((pub = mqtt_sn_publish_find_by_topic(client, topic)) != NULL) {
316 			LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
317 			mqtt_sn_publish_destroy(client, pub);
318 		}
319 
320 		k_mem_slab_free(&topics, (void *)topic);
321 	}
322 }
323 
mqtt_sn_gw_destroy(struct mqtt_sn_client * client,struct mqtt_sn_gateway * gw)324 static void mqtt_sn_gw_destroy(struct mqtt_sn_client *client, struct mqtt_sn_gateway *gw)
325 {
326 	LOG_DBG("Destroying gateway %d", gw->gw_id);
327 	sys_slist_find_and_remove(&client->gateway, &gw->next);
328 	k_mem_slab_free(&gateways, (void *)gw);
329 }
330 
mqtt_sn_gw_destroy_all(struct mqtt_sn_client * client)331 static void mqtt_sn_gw_destroy_all(struct mqtt_sn_client *client)
332 {
333 	struct mqtt_sn_gateway *gw;
334 	sys_snode_t *next;
335 
336 	while ((next = sys_slist_get(&client->gateway)) != NULL) {
337 		gw = SYS_SLIST_CONTAINER(next, gw, next);
338 		sys_slist_find_and_remove(&client->gateway, next);
339 		k_mem_slab_free(&gateways, (void *)gw);
340 	}
341 }
342 
mqtt_sn_gw_create(uint8_t gw_id,short duration,struct mqtt_sn_data gw_addr)343 static struct mqtt_sn_gateway *mqtt_sn_gw_create(uint8_t gw_id, short duration,
344 						 struct mqtt_sn_data gw_addr)
345 {
346 	struct mqtt_sn_gateway *gw;
347 
348 	LOG_DBG("Free GW slots: %d", k_mem_slab_num_free_get(&gateways));
349 	if (k_mem_slab_alloc(&gateways, (void **)&gw, K_NO_WAIT)) {
350 		LOG_WRN("Can't create GW: no free slot");
351 		return NULL;
352 	}
353 
354 	__ASSERT(gw_addr.size < CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE,
355 		 "Gateway address is larger than allowed by CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE");
356 
357 	memset(gw, 0, sizeof(*gw));
358 	memcpy(gw->addr, gw_addr.data, gw_addr.size);
359 	gw->addr_len = gw_addr.size;
360 	gw->gw_id = gw_id;
361 	if (duration == -1) {
362 		gw->adv_timer = duration;
363 	} else {
364 		gw->adv_timer =
365 			k_uptime_get() + (duration * CONFIG_MQTT_SN_LIB_N_ADV * MSEC_PER_SEC);
366 	}
367 
368 	return gw;
369 }
370 
mqtt_sn_gw_find_by_id(struct mqtt_sn_client * client,uint16_t gw_id)371 static struct mqtt_sn_gateway *mqtt_sn_gw_find_by_id(struct mqtt_sn_client *client, uint16_t gw_id)
372 {
373 	struct mqtt_sn_gateway *gw;
374 
375 	SYS_SLIST_FOR_EACH_CONTAINER(&client->gateway, gw, next) {
376 		if (gw->gw_id == gw_id) {
377 			return gw;
378 		}
379 	}
380 
381 	return NULL;
382 }
383 
mqtt_sn_disconnect_internal(struct mqtt_sn_client * client)384 static void mqtt_sn_disconnect_internal(struct mqtt_sn_client *client)
385 {
386 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
387 
388 	mqtt_sn_set_state(client, MQTT_SN_CLIENT_DISCONNECTED);
389 	if (client->evt_cb) {
390 		client->evt_cb(client, &evt);
391 	}
392 
393 	/*
394 	 * Remove all publishes, but keep topics
395 	 * Topics are removed on deinit or when connect is called with
396 	 * clean-session = true
397 	 */
398 	mqtt_sn_publish_destroy_all(client);
399 
400 	k_work_cancel_delayable(&client->process_work);
401 }
402 
mqtt_sn_sleep_internal(struct mqtt_sn_client * client)403 static void mqtt_sn_sleep_internal(struct mqtt_sn_client *client)
404 {
405 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
406 
407 	mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
408 	if (client->evt_cb) {
409 		client->evt_cb(client, &evt);
410 	}
411 }
412 
413 /**
414  * @brief Internal function to send a SUBSCRIBE message for a topic.
415  *
416  * @param client
417  * @param topic
418  * @param dup DUP flag - see MQTT-SN spec
419  */
mqtt_sn_do_subscribe(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic,bool dup)420 static void mqtt_sn_do_subscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic,
421 				 bool dup)
422 {
423 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_SUBSCRIBE};
424 
425 	if (!client || !topic) {
426 		return;
427 	}
428 
429 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
430 		LOG_ERR("Cannot subscribe: not connected");
431 		return;
432 	}
433 
434 	p.params.subscribe.msg_id = topic->con.msg_id;
435 	p.params.subscribe.qos = topic->qos;
436 	p.params.subscribe.topic_type = topic->type;
437 	p.params.subscribe.dup = dup;
438 	switch (topic->type) {
439 	case MQTT_SN_TOPIC_TYPE_NORMAL:
440 		p.params.subscribe.topic.topic_name.data = topic->name;
441 		p.params.subscribe.topic.topic_name.size = topic->namelen;
442 		break;
443 	case MQTT_SN_TOPIC_TYPE_PREDEF:
444 	case MQTT_SN_TOPIC_TYPE_SHORT:
445 		p.params.subscribe.topic.topic_id = topic->topic_id;
446 		break;
447 	default:
448 		LOG_ERR("Unexpected topic type %d", topic->type);
449 		return;
450 	}
451 
452 	encode_and_send(client, &p, 0);
453 }
454 
455 /**
456  * @brief Internal function to send an UNSUBSCRIBE message for a topic.
457  *
458  * @param client
459  * @param topic
460  */
mqtt_sn_do_unsubscribe(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)461 static void mqtt_sn_do_unsubscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
462 {
463 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_UNSUBSCRIBE};
464 
465 	if (!client || !topic) {
466 		return;
467 	}
468 
469 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
470 		LOG_ERR("Cannot unsubscribe: not connected");
471 		return;
472 	}
473 
474 	p.params.unsubscribe.msg_id = topic->con.msg_id;
475 	p.params.unsubscribe.topic_type = topic->type;
476 	switch (topic->type) {
477 	case MQTT_SN_TOPIC_TYPE_NORMAL:
478 		p.params.unsubscribe.topic.topic_name.data = topic->name;
479 		p.params.unsubscribe.topic.topic_name.size = topic->namelen;
480 		break;
481 	case MQTT_SN_TOPIC_TYPE_PREDEF:
482 	case MQTT_SN_TOPIC_TYPE_SHORT:
483 		p.params.unsubscribe.topic.topic_id = topic->topic_id;
484 		break;
485 	default:
486 		LOG_ERR("Unexpected topic type %d", topic->type);
487 		return;
488 	}
489 
490 	encode_and_send(client, &p, 0);
491 }
492 
493 /**
494  * @brief Internal function to a register a topic with the MQTT-SN gateway.
495  *
496  * @param client
497  * @param topic
498  */
mqtt_sn_do_register(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)499 static void mqtt_sn_do_register(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
500 {
501 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_REGISTER};
502 
503 	if (!client || !topic) {
504 		return;
505 	}
506 
507 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
508 		LOG_ERR("Cannot register: not connected");
509 		return;
510 	}
511 
512 	p.params.reg.msg_id = topic->con.msg_id;
513 	switch (topic->type) {
514 	case MQTT_SN_TOPIC_TYPE_NORMAL:
515 		LOG_HEXDUMP_INF(topic->name, topic->namelen, "Registering topic");
516 		p.params.reg.topic.data = topic->name;
517 		p.params.reg.topic.size = topic->namelen;
518 		break;
519 	default:
520 		LOG_ERR("Unexpected topic type %d", topic->type);
521 		return;
522 	}
523 
524 	encode_and_send(client, &p, 0);
525 }
526 
527 /**
528  * @brief Internal function to send a PUBLISH message.
529  *
530  * Note that this function does not do sanity checks regarding the pub's topic.
531  *
532  * @param client
533  * @param pub
534  * @param dup DUP flag - see MQTT-SN spec.
535  */
mqtt_sn_do_publish(struct mqtt_sn_client * client,struct mqtt_sn_publish * pub,bool dup)536 static void mqtt_sn_do_publish(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub, bool dup)
537 {
538 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PUBLISH};
539 
540 	if (!client || !pub) {
541 		return;
542 	}
543 
544 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
545 		LOG_ERR("Cannot subscribe: not connected");
546 		return;
547 	}
548 
549 	LOG_INF("Publishing to topic ID %d", pub->topic->topic_id);
550 
551 	p.params.publish.data.data = pub->pubdata;
552 	p.params.publish.data.size = pub->datalen;
553 	p.params.publish.msg_id = pub->con.msg_id;
554 	p.params.publish.retain = pub->retain;
555 	p.params.publish.topic_id = pub->topic->topic_id;
556 	p.params.publish.topic_type = pub->topic->type;
557 	p.params.publish.qos = pub->qos;
558 	p.params.publish.dup = dup;
559 
560 	encode_and_send(client, &p, 0);
561 }
562 
563 /**
564  * @brief Internal function to send a SEARCHGW message.
565  *
566  * @param client
567  */
mqtt_sn_do_searchgw(struct mqtt_sn_client * client)568 static void mqtt_sn_do_searchgw(struct mqtt_sn_client *client)
569 {
570 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_SEARCHGW};
571 
572 	p.params.searchgw.radius = CONFIG_MQTT_SN_LIB_BROADCAST_RADIUS;
573 
574 	encode_and_send(client, &p, CONFIG_MQTT_SN_LIB_BROADCAST_RADIUS);
575 }
576 
577 /**
578  * @brief Internal function to send a GWINFO message.
579  *
580  * @param client
581  */
mqtt_sn_do_gwinfo(struct mqtt_sn_client * client)582 static void mqtt_sn_do_gwinfo(struct mqtt_sn_client *client)
583 {
584 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_GWINFO};
585 	struct mqtt_sn_gateway *gw;
586 	struct mqtt_sn_data addr;
587 
588 	gw = SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
589 
590 	if (gw == NULL || gw->addr_len == 0) {
591 		LOG_WRN("No Gateway Address");
592 		return;
593 	}
594 
595 	response.params.gwinfo.gw_id = gw->gw_id;
596 	addr.data = gw->addr;
597 	addr.size = gw->addr_len;
598 	response.params.gwinfo.gw_add = addr;
599 
600 	encode_and_send(client, &response, client->radius_gwinfo);
601 }
602 
603 /**
604  * @brief Internal function to send a PINGREQ message.
605  *
606  * @param client
607  */
mqtt_sn_do_ping(struct mqtt_sn_client * client)608 static void mqtt_sn_do_ping(struct mqtt_sn_client *client)
609 {
610 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PINGREQ};
611 
612 	switch (client->state) {
613 	case MQTT_SN_CLIENT_ASLEEP:
614 		/*
615 		 * From the spec regarding PINGREQ:
616 		 * ClientId: contains the client id; this field is optional
617 		 * and is included by a “sleeping” client when it goes to the
618 		 * “awake” state and is waiting for messages sent by the
619 		 * server/gateway, see Section 6.14 for further details.
620 		 */
621 		p.params.pingreq.client_id.data = client->client_id.data;
622 		p.params.pingreq.client_id.size = client->client_id.size;
623 	case MQTT_SN_CLIENT_ACTIVE:
624 		encode_and_send(client, &p, 0);
625 		break;
626 	default:
627 		LOG_WRN("Can't ping in state %d", client->state);
628 		break;
629 	}
630 }
631 
632 /**
633  * @brief Process all publish tasks in the queue.
634  *
635  * @param client
636  * @param next_cycle will be set to the time when the next action is required
637  *
638  * @retval 0 on success
639  * @retval -ETIMEDOUT when a publish task ran out of retries
640  */
process_pubs(struct mqtt_sn_client * client,int64_t * next_cycle)641 static int process_pubs(struct mqtt_sn_client *client, int64_t *next_cycle)
642 {
643 	struct mqtt_sn_publish *pub, *pubs;
644 	const int64_t now = k_uptime_get();
645 	int64_t next_attempt;
646 	bool dup; /* dup flag if message is resent */
647 
648 	SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->publish, pub, pubs, next) {
649 		LOG_HEXDUMP_DBG(pub->topic->name, pub->topic->namelen,
650 				"Processing publish for topic");
651 		LOG_HEXDUMP_DBG(pub->pubdata, pub->datalen, "Processing publish data");
652 
653 		if (pub->con.last_attempt == 0) {
654 			next_attempt = 0;
655 			dup = false;
656 		} else {
657 			next_attempt = pub->con.last_attempt + T_RETRY_MSEC;
658 			dup = true;
659 		}
660 
661 		/* Check if action is due */
662 		if (next_attempt <= now) {
663 			switch (pub->topic->state) {
664 			case MQTT_SN_TOPIC_STATE_REGISTERED:
665 			case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
666 				if (!pub->con.retries--) {
667 					LOG_WRN("PUB ran out of retries, disconnecting");
668 					mqtt_sn_disconnect_internal(client);
669 					return -ETIMEDOUT;
670 				}
671 				mqtt_sn_do_publish(client, pub, dup);
672 				if (pub->qos == MQTT_SN_QOS_0 || pub->qos == MQTT_SN_QOS_M1) {
673 					/* We are done, remove this */
674 					mqtt_sn_publish_destroy(client, pub);
675 					continue;
676 				} else {
677 					/* Wait for ack */
678 					pub->con.last_attempt = now;
679 					next_attempt = now + T_RETRY_MSEC;
680 				}
681 				break;
682 			default:
683 				LOG_INF("Can't publish; topic is not ready");
684 				break;
685 			}
686 		}
687 
688 		/* Remember time when next action is due */
689 		if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
690 			*next_cycle = next_attempt;
691 		}
692 	}
693 
694 	LOG_DBG("next_cycle: %lld", *next_cycle);
695 
696 	return 0;
697 }
698 
699 /**
700  * @brief Process all topic tasks in the queue.
701  *
702  * @param client
703  * @param next_cycle will be set to the time when the next action is required
704  *
705  * @retval 0 on success
706  * @retval -ETIMEDOUT when a publish task ran out of retries
707  */
process_topics(struct mqtt_sn_client * client,int64_t * next_cycle)708 static int process_topics(struct mqtt_sn_client *client, int64_t *next_cycle)
709 {
710 	struct mqtt_sn_topic *topic;
711 	const int64_t now = k_uptime_get();
712 	int64_t next_attempt;
713 
714 	bool subscribing = false;
715 	bool registering = false;
716 
717 	bool dup; /* dup flag if message is resent */
718 
719 	/* First pass to check for REGISTERING, SUBSCRIBING, UNSUBSCRIBING */
720 	SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
721 		switch (topic->state) {
722 		case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
723 		case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
724 			subscribing = true;
725 			break;
726 		case MQTT_SN_TOPIC_STATE_REGISTERING:
727 			registering = true;
728 			break;
729 		default:
730 			break;
731 		}
732 	}
733 
734 	/* Second pass */
735 	SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
736 		LOG_HEXDUMP_DBG(topic->name, topic->namelen, "Processing topic");
737 
738 		if (topic->con.last_attempt == 0) {
739 			next_attempt = 0;
740 			dup = false;
741 		} else {
742 			next_attempt = topic->con.last_attempt + T_RETRY_MSEC;
743 			dup = true;
744 		}
745 
746 		/* Check if action is due */
747 		if (next_attempt <= now) {
748 			switch (topic->state) {
749 			case MQTT_SN_TOPIC_STATE_SUBSCRIBE:
750 				if (subscribing) {
751 					/*
752 					 * Only one topic can be subscribing or unsubscribing
753 					 * at the same time
754 					 */
755 					break;
756 				}
757 				topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBING;
758 				LOG_INF("Topic subscription now in progress");
759 				__fallthrough;
760 			case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
761 				subscribing = true;
762 				if (!topic->con.retries--) {
763 					LOG_WRN("Topic ran out of retries, disconnecting");
764 					mqtt_sn_disconnect_internal(client);
765 					return -ETIMEDOUT;
766 				}
767 
768 				mqtt_sn_do_subscribe(client, topic, dup);
769 				topic->con.last_attempt = now;
770 				next_attempt = now + T_RETRY_MSEC;
771 				break;
772 			case MQTT_SN_TOPIC_STATE_REGISTER:
773 				if (registering) {
774 					/* Only one topic can be registering at the same time */
775 					break;
776 				}
777 				topic->state = MQTT_SN_TOPIC_STATE_REGISTERING;
778 				LOG_INF("Topic registration now in progress");
779 				__fallthrough;
780 			case MQTT_SN_TOPIC_STATE_REGISTERING:
781 				registering = true;
782 				if (!topic->con.retries--) {
783 					LOG_WRN("Topic ran out of retries, disconnecting");
784 					mqtt_sn_disconnect_internal(client);
785 					return -ETIMEDOUT;
786 				}
787 
788 				mqtt_sn_do_register(client, topic);
789 				topic->con.last_attempt = now;
790 				next_attempt = now + T_RETRY_MSEC;
791 				break;
792 			case MQTT_SN_TOPIC_STATE_UNSUBSCRIBE:
793 				if (subscribing) {
794 					/*
795 					 * Only one topic can be subscribing or unsubscribing
796 					 * at the same time
797 					 */
798 					break;
799 				}
800 				topic->state = MQTT_SN_TOPIC_STATE_UNSUBSCRIBING;
801 				LOG_INF("Topic unsubscription now in progress");
802 				__fallthrough;
803 			case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
804 				subscribing = true;
805 				if (!topic->con.retries--) {
806 					LOG_WRN("Topic ran out of retries, disconnecting");
807 					mqtt_sn_disconnect_internal(client);
808 					return -ETIMEDOUT;
809 				}
810 				mqtt_sn_do_unsubscribe(client, topic);
811 				topic->con.last_attempt = now;
812 				next_attempt = now + T_RETRY_MSEC;
813 				break;
814 			case MQTT_SN_TOPIC_STATE_REGISTERED:
815 			case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
816 				break;
817 			}
818 		}
819 
820 		/* Remember time when next action is due */
821 		if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
822 			*next_cycle = next_attempt;
823 		}
824 	}
825 
826 	LOG_DBG("next_cycle: %lld", *next_cycle);
827 
828 	return 0;
829 }
830 
831 /**
832  * @brief Housekeeping task for the client ping
833  *
834  * @param client
835  * @param next_cycle will be set to the time when the next action is required
836  * @retval 0 on success
837  * @retval -ETIMEDOUT if client ran out of ping retries
838  */
process_ping(struct mqtt_sn_client * client,int64_t * next_cycle)839 static int process_ping(struct mqtt_sn_client *client, int64_t *next_cycle)
840 {
841 	const int64_t now = k_uptime_get();
842 	struct mqtt_sn_gateway *gw = NULL;
843 	int64_t next_ping;
844 
845 	if (client->ping_retries == N_RETRY) {
846 		/* Last ping was acked */
847 		next_ping = client->last_ping + T_KEEPALIVE_MSEC;
848 	} else {
849 		next_ping = client->last_ping + T_RETRY_MSEC;
850 	}
851 
852 	if (next_ping < now) {
853 		if (!client->ping_retries--) {
854 			LOG_WRN("Ping ran out of retries");
855 			mqtt_sn_disconnect_internal(client);
856 			SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
857 			LOG_DBG("Removing non-responsive GW 0x%08x", gw->gw_id);
858 			mqtt_sn_gw_destroy(client, gw);
859 			return -ETIMEDOUT;
860 		}
861 
862 		LOG_DBG("Sending PINGREQ");
863 		mqtt_sn_do_ping(client);
864 		client->last_ping = now;
865 		next_ping = now + T_RETRY_MSEC;
866 	}
867 
868 	if (*next_cycle == 0 || next_ping < *next_cycle) {
869 		*next_cycle = next_ping;
870 	}
871 
872 	LOG_DBG("next_cycle: %lld", *next_cycle);
873 
874 	return 0;
875 }
876 
877 /**
878  * @brief Housekeeping task for the gateway search
879  *
880  * @param client
881  * @param next_cycle will be set to the time when the next action is required
882  * @retval 0 on success
883  */
process_search(struct mqtt_sn_client * client,int64_t * next_cycle)884 static int process_search(struct mqtt_sn_client *client, int64_t *next_cycle)
885 {
886 	const int64_t now = k_uptime_get();
887 
888 	LOG_DBG("ts_searchgw: %lld", client->ts_searchgw);
889 	LOG_DBG("ts_gwinfo: %lld", client->ts_gwinfo);
890 
891 	if (client->ts_searchgw != 0 && client->ts_searchgw <= now) {
892 		LOG_DBG("Sending SEARCHGW");
893 		mqtt_sn_do_searchgw(client);
894 		client->ts_searchgw = 0;
895 	}
896 
897 	if (client->ts_gwinfo != 0 && client->ts_gwinfo <= now) {
898 		LOG_DBG("Sending GWINFO");
899 		mqtt_sn_do_gwinfo(client);
900 		client->ts_gwinfo = 0;
901 	}
902 
903 	if (*next_cycle == 0 || (client->ts_searchgw != 0 && client->ts_searchgw < *next_cycle)) {
904 		*next_cycle = client->ts_searchgw;
905 	}
906 	if (*next_cycle == 0 || (client->ts_gwinfo != 0 && client->ts_gwinfo < *next_cycle)) {
907 		*next_cycle = client->ts_gwinfo;
908 	}
909 
910 	LOG_DBG("next_cycle: %lld", *next_cycle);
911 
912 	return 0;
913 }
914 
915 /**
916  * @brief Housekeeping task for gateway advertisements
917  *
918  * @param client
919  * @param next_cycle will be set to the time when the next action is required
920  * @return int
921  */
process_advertise(struct mqtt_sn_client * client,int64_t * next_cycle)922 static int process_advertise(struct mqtt_sn_client *client, int64_t *next_cycle)
923 {
924 	const int64_t now = k_uptime_get();
925 	struct mqtt_sn_gateway *gw;
926 	struct mqtt_sn_gateway *gw_next;
927 
928 	SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->gateway, gw, gw_next, next) {
929 		LOG_DBG("Checking if GW 0x%02x is old", gw->gw_id);
930 		if (gw->adv_timer != -1 && gw->adv_timer <= now) {
931 			LOG_DBG("Removing non-responsive GW 0x%08x", gw->gw_id);
932 			if (client->gateway.head == &gw->next) {
933 				mqtt_sn_disconnect(client);
934 			}
935 			mqtt_sn_gw_destroy(client, gw);
936 		}
937 		if (gw->adv_timer != -1 && (*next_cycle == 0 || gw->adv_timer < *next_cycle)) {
938 			*next_cycle = gw->adv_timer;
939 		}
940 	}
941 	LOG_DBG("next_cycle: %lld", *next_cycle);
942 
943 	return 0;
944 }
945 
946 /**
947  * @brief Housekeeping task that is called by workqueue item
948  *
949  * @param wrk The work item
950  */
process_work(struct k_work * wrk)951 static void process_work(struct k_work *wrk)
952 {
953 	struct mqtt_sn_client *client;
954 	struct k_work_delayable *dwork;
955 	int64_t next_cycle = 0;
956 	int err;
957 
958 	dwork = k_work_delayable_from_work(wrk);
959 	client = CONTAINER_OF(dwork, struct mqtt_sn_client, process_work);
960 
961 	LOG_DBG("Executing work of client %p in state %d at time %lld", client, client->state,
962 		k_uptime_get());
963 
964 	/* Clean up old advertised gateways from list */
965 	err = process_advertise(client, &next_cycle);
966 	if (err) {
967 		return;
968 	}
969 
970 	/* Handle GW search process timers */
971 	err = process_search(client, &next_cycle);
972 	if (err) {
973 		return;
974 	}
975 
976 	if (client->state == MQTT_SN_CLIENT_ACTIVE) {
977 		err = process_topics(client, &next_cycle);
978 		if (err) {
979 			return;
980 		}
981 
982 		err = process_pubs(client, &next_cycle);
983 		if (err) {
984 			return;
985 		}
986 
987 		err = process_ping(client, &next_cycle);
988 		if (err) {
989 			return;
990 		}
991 	}
992 
993 	if (next_cycle > 0) {
994 		LOG_DBG("next_cycle: %lld", next_cycle);
995 		k_work_schedule(dwork, K_MSEC(next_cycle - k_uptime_get()));
996 	}
997 }
998 
mqtt_sn_client_init(struct mqtt_sn_client * client,const struct mqtt_sn_data * client_id,struct mqtt_sn_transport * transport,mqtt_sn_evt_cb_t evt_cb,void * tx,size_t txsz,void * rx,size_t rxsz)999 int mqtt_sn_client_init(struct mqtt_sn_client *client, const struct mqtt_sn_data *client_id,
1000 			struct mqtt_sn_transport *transport, mqtt_sn_evt_cb_t evt_cb, void *tx,
1001 			size_t txsz, void *rx, size_t rxsz)
1002 {
1003 	if (!client || !client_id || !transport || !evt_cb || !tx || !rx) {
1004 		return -EINVAL;
1005 	}
1006 
1007 	memset(client, 0, sizeof(*client));
1008 
1009 	client->client_id.data = client_id->data;
1010 	client->client_id.size = client_id->size;
1011 	client->transport = transport;
1012 	client->evt_cb = evt_cb;
1013 
1014 	net_buf_simple_init_with_data(&client->tx, tx, txsz);
1015 	net_buf_simple_reset(&client->tx);
1016 
1017 	net_buf_simple_init_with_data(&client->rx, rx, rxsz);
1018 	net_buf_simple_reset(&client->rx);
1019 
1020 	k_work_init_delayable(&client->process_work, process_work);
1021 
1022 	if (transport->init) {
1023 		transport->init(transport);
1024 	}
1025 
1026 	return 0;
1027 }
1028 
mqtt_sn_client_deinit(struct mqtt_sn_client * client)1029 void mqtt_sn_client_deinit(struct mqtt_sn_client *client)
1030 {
1031 	if (!client) {
1032 		return;
1033 	}
1034 
1035 	mqtt_sn_publish_destroy_all(client);
1036 	mqtt_sn_topic_destroy_all(client);
1037 	mqtt_sn_gw_destroy_all(client);
1038 
1039 	if (client->transport && client->transport->deinit) {
1040 		client->transport->deinit(client->transport);
1041 	}
1042 
1043 	k_work_cancel_delayable(&client->process_work);
1044 }
1045 
mqtt_sn_add_gw(struct mqtt_sn_client * client,uint8_t gw_id,struct mqtt_sn_data gw_addr)1046 int mqtt_sn_add_gw(struct mqtt_sn_client *client, uint8_t gw_id, struct mqtt_sn_data gw_addr)
1047 {
1048 	struct mqtt_sn_gateway *gw;
1049 
1050 	gw = mqtt_sn_gw_find_by_id(client, gw_id);
1051 
1052 	if (gw != NULL) {
1053 		mqtt_sn_gw_destroy(client, gw);
1054 	}
1055 
1056 	gw = mqtt_sn_gw_create(gw_id, -1, gw_addr);
1057 	if (!gw) {
1058 		return -ENOMEM;
1059 	}
1060 
1061 	sys_slist_append(&client->gateway, &gw->next);
1062 
1063 	return 0;
1064 }
1065 
mqtt_sn_search(struct mqtt_sn_client * client,uint8_t radius)1066 int mqtt_sn_search(struct mqtt_sn_client *client, uint8_t radius)
1067 {
1068 	if (!client) {
1069 		return -EINVAL;
1070 	}
1071 	/* Set SEARCHGW transmission timer */
1072 	client->ts_searchgw = k_uptime_get() + (T_SEARCHGW_MSEC * sys_rand8_get() / 255);
1073 	k_work_schedule(&client->process_work, K_NO_WAIT);
1074 	LOG_DBG("Requested SEARCHGW for time %lld at time %lld", client->ts_searchgw,
1075 		k_uptime_get());
1076 
1077 	return 0;
1078 }
1079 
mqtt_sn_connect(struct mqtt_sn_client * client,bool will,bool clean_session)1080 int mqtt_sn_connect(struct mqtt_sn_client *client, bool will, bool clean_session)
1081 {
1082 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_CONNECT};
1083 
1084 	if (!client) {
1085 		return -EINVAL;
1086 	}
1087 
1088 	if (will && (!client->will_msg.data || !client->will_topic.data)) {
1089 		LOG_ERR("will set to true, but no will data in client");
1090 		return -EINVAL;
1091 	}
1092 
1093 	if (clean_session) {
1094 		mqtt_sn_topic_destroy_all(client);
1095 	}
1096 
1097 	p.params.connect.clean_session = clean_session;
1098 	p.params.connect.will = will;
1099 	p.params.connect.duration = CONFIG_MQTT_SN_KEEPALIVE;
1100 	p.params.connect.client_id.data = client->client_id.data;
1101 	p.params.connect.client_id.size = client->client_id.size;
1102 
1103 	client->last_ping = k_uptime_get();
1104 
1105 	return encode_and_send(client, &p, 0);
1106 }
1107 
mqtt_sn_disconnect(struct mqtt_sn_client * client)1108 int mqtt_sn_disconnect(struct mqtt_sn_client *client)
1109 {
1110 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
1111 	int err;
1112 
1113 	if (!client) {
1114 		return -EINVAL;
1115 	}
1116 
1117 	p.params.disconnect.duration = 0;
1118 
1119 	err = encode_and_send(client, &p, 0);
1120 	mqtt_sn_disconnect_internal(client);
1121 
1122 	return err;
1123 }
1124 
mqtt_sn_sleep(struct mqtt_sn_client * client,uint16_t duration)1125 int mqtt_sn_sleep(struct mqtt_sn_client *client, uint16_t duration)
1126 {
1127 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
1128 	int err;
1129 
1130 	if (!client || !duration) {
1131 		return -EINVAL;
1132 	}
1133 
1134 	p.params.disconnect.duration = duration;
1135 
1136 	err = encode_and_send(client, &p, 0);
1137 	mqtt_sn_sleep_internal(client);
1138 
1139 	return err;
1140 }
1141 
mqtt_sn_subscribe(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name)1142 int mqtt_sn_subscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
1143 		      struct mqtt_sn_data *topic_name)
1144 {
1145 	struct mqtt_sn_topic *topic;
1146 	int err;
1147 
1148 	if (!client || !topic_name || !topic_name->data || !topic_name->size) {
1149 		return -EINVAL;
1150 	}
1151 
1152 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1153 		LOG_ERR("Cannot subscribe: not connected");
1154 		return -ENOTCONN;
1155 	}
1156 
1157 	topic = mqtt_sn_topic_find_by_name(client, topic_name);
1158 	if (!topic) {
1159 		topic = mqtt_sn_topic_create(topic_name);
1160 		if (!topic) {
1161 			return -ENOMEM;
1162 		}
1163 
1164 		topic->qos = qos;
1165 		topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBE;
1166 		sys_slist_append(&client->topic, &topic->next);
1167 	}
1168 
1169 	err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1170 	if (err < 0) {
1171 		return err;
1172 	}
1173 
1174 	return 0;
1175 }
1176 
mqtt_sn_unsubscribe(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name)1177 int mqtt_sn_unsubscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
1178 			struct mqtt_sn_data *topic_name)
1179 {
1180 	struct mqtt_sn_topic *topic;
1181 	int err;
1182 
1183 	if (!client || !topic_name) {
1184 		return -EINVAL;
1185 	}
1186 
1187 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1188 		LOG_ERR("Cannot unsubscribe: not connected");
1189 		return -ENOTCONN;
1190 	}
1191 
1192 	topic = mqtt_sn_topic_find_by_name(client, topic_name);
1193 	if (!topic) {
1194 		LOG_HEXDUMP_ERR(topic_name->data, topic_name->size, "Topic not found");
1195 		return -ENOENT;
1196 	}
1197 
1198 	if (topic->state != MQTT_SN_TOPIC_STATE_SUBSCRIBED) {
1199 		LOG_ERR("Cannot unsubscribe: not subscribed");
1200 		return -EAGAIN;
1201 	}
1202 
1203 	topic->state = MQTT_SN_TOPIC_STATE_UNSUBSCRIBE;
1204 	mqtt_sn_con_init(&topic->con);
1205 
1206 	err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1207 	if (err < 0) {
1208 		return err;
1209 	}
1210 
1211 	return 0;
1212 }
1213 
mqtt_sn_publish(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name,bool retain,struct mqtt_sn_data * data)1214 int mqtt_sn_publish(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
1215 		    struct mqtt_sn_data *topic_name, bool retain, struct mqtt_sn_data *data)
1216 {
1217 	struct mqtt_sn_publish *pub;
1218 	struct mqtt_sn_topic *topic;
1219 	int err;
1220 
1221 	if (!client || !topic_name) {
1222 		return -EINVAL;
1223 	}
1224 
1225 	if (qos == MQTT_SN_QOS_M1) {
1226 		LOG_ERR("QoS -1 not supported");
1227 		return -ENOTSUP;
1228 	}
1229 
1230 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1231 		LOG_ERR("Cannot publish: disconnected");
1232 		return -ENOTCONN;
1233 	}
1234 
1235 	topic = mqtt_sn_topic_find_by_name(client, topic_name);
1236 	if (!topic) {
1237 		topic = mqtt_sn_topic_create(topic_name);
1238 		if (!topic) {
1239 			return -ENOMEM;
1240 		}
1241 
1242 		topic->qos = qos;
1243 		topic->state = MQTT_SN_TOPIC_STATE_REGISTER;
1244 		sys_slist_append(&client->topic, &topic->next);
1245 	}
1246 
1247 	pub = mqtt_sn_publish_create(data);
1248 	if (!pub) {
1249 		k_work_reschedule(&client->process_work, K_NO_WAIT);
1250 		return -ENOMEM;
1251 	}
1252 
1253 	pub->qos = qos;
1254 	pub->retain = retain;
1255 	pub->topic = topic;
1256 
1257 	sys_slist_append(&client->publish, &pub->next);
1258 
1259 	err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1260 	if (err < 0) {
1261 		return err;
1262 	}
1263 
1264 	return 0;
1265 }
1266 
handle_advertise(struct mqtt_sn_client * client,struct mqtt_sn_param_advertise * p,struct mqtt_sn_data rx_addr)1267 static void handle_advertise(struct mqtt_sn_client *client, struct mqtt_sn_param_advertise *p,
1268 			     struct mqtt_sn_data rx_addr)
1269 {
1270 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_ADVERTISE};
1271 	struct mqtt_sn_gateway *gw;
1272 
1273 	gw = mqtt_sn_gw_find_by_id(client, p->gw_id);
1274 
1275 	if (gw == NULL) {
1276 		LOG_DBG("Creating GW 0x%02x with duration %d", p->gw_id, p->duration);
1277 		gw = mqtt_sn_gw_create(p->gw_id, p->duration, rx_addr);
1278 		if (!gw) {
1279 			return;
1280 		}
1281 		sys_slist_append(&client->gateway, &gw->next);
1282 	} else {
1283 		LOG_DBG("Updating timer for GW 0x%02x with duration %d", p->gw_id, p->duration);
1284 		gw->adv_timer =
1285 			k_uptime_get() + (p->duration * CONFIG_MQTT_SN_LIB_N_ADV * MSEC_PER_SEC);
1286 	}
1287 
1288 	k_work_schedule(&client->process_work, K_NO_WAIT);
1289 	if (client->evt_cb) {
1290 		client->evt_cb(client, &evt);
1291 	}
1292 }
1293 
handle_searchgw(struct mqtt_sn_client * client,struct mqtt_sn_param_searchgw * p)1294 static void handle_searchgw(struct mqtt_sn_client *client, struct mqtt_sn_param_searchgw *p)
1295 {
1296 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_SEARCHGW};
1297 
1298 	/* Increment SEARCHGW transmission timestamp if waiting */
1299 	if (client->ts_searchgw != 0) {
1300 		client->ts_searchgw = k_uptime_get() + (T_SEARCHGW_MSEC * sys_rand8_get() / 255);
1301 	}
1302 
1303 	/* Set transmission timestamp to respond to SEARCHGW if we have a GW */
1304 	if (sys_slist_len(&client->gateway) > 0) {
1305 		client->ts_gwinfo = k_uptime_get() + (T_GWINFO_MSEC * sys_rand8_get() / 255);
1306 	}
1307 	client->radius_gwinfo = p->radius;
1308 	k_work_schedule(&client->process_work, K_NO_WAIT);
1309 
1310 	if (client->evt_cb) {
1311 		client->evt_cb(client, &evt);
1312 	}
1313 }
1314 
handle_gwinfo(struct mqtt_sn_client * client,struct mqtt_sn_param_gwinfo * p,struct mqtt_sn_data rx_addr)1315 static void handle_gwinfo(struct mqtt_sn_client *client, struct mqtt_sn_param_gwinfo *p,
1316 			  struct mqtt_sn_data rx_addr)
1317 {
1318 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_GWINFO};
1319 	struct mqtt_sn_gateway *gw;
1320 
1321 	/* Clear SEARCHGW and GWINFO transmission if waiting */
1322 	client->ts_searchgw = 0;
1323 	client->ts_gwinfo = 0;
1324 	k_work_schedule(&client->process_work, K_NO_WAIT);
1325 
1326 	/* Extract GW info and store */
1327 	if (p->gw_add.size > 0) {
1328 		rx_addr.data = p->gw_add.data;
1329 		rx_addr.size = p->gw_add.size;
1330 	} else {
1331 	}
1332 	gw = mqtt_sn_gw_create(p->gw_id, -1, rx_addr);
1333 
1334 	if (!gw) {
1335 		return;
1336 	}
1337 
1338 	sys_slist_append(&client->gateway, &gw->next);
1339 
1340 	if (client->evt_cb) {
1341 		client->evt_cb(client, &evt);
1342 	}
1343 }
1344 
handle_connack(struct mqtt_sn_client * client,struct mqtt_sn_param_connack * p)1345 static void handle_connack(struct mqtt_sn_client *client, struct mqtt_sn_param_connack *p)
1346 {
1347 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_CONNECTED};
1348 
1349 	if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1350 		LOG_INF("MQTT_SN client connected");
1351 		switch (client->state) {
1352 		case MQTT_SN_CLIENT_DISCONNECTED:
1353 		case MQTT_SN_CLIENT_ASLEEP:
1354 		case MQTT_SN_CLIENT_AWAKE:
1355 			mqtt_sn_set_state(client, MQTT_SN_CLIENT_ACTIVE);
1356 			if (client->evt_cb) {
1357 				client->evt_cb(client, &evt);
1358 			}
1359 			client->ping_retries = N_RETRY;
1360 			break;
1361 		default:
1362 			LOG_ERR("Client received CONNACK but was in state %d", client->state);
1363 			return;
1364 		}
1365 	} else {
1366 		LOG_WRN("CONNACK ret code %d", p->ret_code);
1367 		mqtt_sn_disconnect_internal(client);
1368 	}
1369 
1370 	k_work_schedule(&client->process_work, K_NO_WAIT);
1371 }
1372 
handle_willtopicreq(struct mqtt_sn_client * client)1373 static void handle_willtopicreq(struct mqtt_sn_client *client)
1374 {
1375 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLTOPIC};
1376 
1377 	response.params.willtopic.qos = client->will_qos;
1378 	response.params.willtopic.retain = client->will_retain;
1379 	response.params.willtopic.topic.data = client->will_topic.data;
1380 	response.params.willtopic.topic.size = client->will_topic.size;
1381 
1382 	encode_and_send(client, &response, 0);
1383 }
1384 
handle_willmsgreq(struct mqtt_sn_client * client)1385 static void handle_willmsgreq(struct mqtt_sn_client *client)
1386 {
1387 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLMSG};
1388 
1389 	response.params.willmsg.msg.data = client->will_msg.data;
1390 	response.params.willmsg.msg.size = client->will_msg.size;
1391 
1392 	encode_and_send(client, &response, 0);
1393 }
1394 
handle_register(struct mqtt_sn_client * client,struct mqtt_sn_param_register * p)1395 static void handle_register(struct mqtt_sn_client *client, struct mqtt_sn_param_register *p)
1396 {
1397 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_REGACK};
1398 	struct mqtt_sn_topic *topic;
1399 
1400 	topic = mqtt_sn_topic_create(&p->topic);
1401 	if (!topic) {
1402 		return;
1403 	}
1404 
1405 	topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
1406 	topic->topic_id = p->topic_id;
1407 	topic->type = MQTT_SN_TOPIC_TYPE_NORMAL;
1408 
1409 	sys_slist_append(&client->topic, &topic->next);
1410 
1411 	response.params.regack.ret_code = MQTT_SN_CODE_ACCEPTED;
1412 	response.params.regack.topic_id = p->topic_id;
1413 	response.params.regack.msg_id = p->msg_id;
1414 
1415 	encode_and_send(client, &response, 0);
1416 }
1417 
handle_regack(struct mqtt_sn_client * client,struct mqtt_sn_param_regack * p)1418 static void handle_regack(struct mqtt_sn_client *client, struct mqtt_sn_param_regack *p)
1419 {
1420 	struct mqtt_sn_topic *topic = mqtt_sn_topic_find_by_msg_id(client, p->msg_id);
1421 
1422 	if (!topic) {
1423 		LOG_ERR("Can't REGACK, no topic found");
1424 		return;
1425 	}
1426 
1427 	if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1428 		topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
1429 		topic->topic_id = p->topic_id;
1430 	} else {
1431 		LOG_WRN("Gateway could not register topic ID %u, code %d", p->topic_id,
1432 			p->ret_code);
1433 	}
1434 }
1435 
handle_publish(struct mqtt_sn_client * client,struct mqtt_sn_param_publish * p)1436 static void handle_publish(struct mqtt_sn_client *client, struct mqtt_sn_param_publish *p)
1437 {
1438 	struct mqtt_sn_param response;
1439 	struct mqtt_sn_evt evt = {.param.publish = {.data = p->data,
1440 						    .topic_id = p->topic_id,
1441 						    .topic_type = p->topic_type},
1442 				  .type = MQTT_SN_EVT_PUBLISH};
1443 
1444 	if (p->qos == MQTT_SN_QOS_1) {
1445 		response.type = MQTT_SN_MSG_TYPE_PUBACK;
1446 		response.params.puback.topic_id = p->topic_id;
1447 		response.params.puback.msg_id = p->msg_id;
1448 		response.params.puback.ret_code = MQTT_SN_CODE_ACCEPTED;
1449 
1450 		encode_and_send(client, &response, 0);
1451 	} else if (p->qos == MQTT_SN_QOS_2) {
1452 		response.type = MQTT_SN_MSG_TYPE_PUBREC;
1453 		response.params.pubrec.msg_id = p->msg_id;
1454 
1455 		encode_and_send(client, &response, 0);
1456 	}
1457 
1458 	if (client->evt_cb) {
1459 		client->evt_cb(client, &evt);
1460 	}
1461 }
1462 
handle_puback(struct mqtt_sn_client * client,struct mqtt_sn_param_puback * p)1463 static void handle_puback(struct mqtt_sn_client *client, struct mqtt_sn_param_puback *p)
1464 {
1465 	struct mqtt_sn_publish *pub = mqtt_sn_publish_find_by_msg_id(client, p->msg_id);
1466 
1467 	if (!pub) {
1468 		LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1469 		return;
1470 	}
1471 
1472 	mqtt_sn_publish_destroy(client, pub);
1473 }
1474 
handle_pubrec(struct mqtt_sn_client * client,struct mqtt_sn_param_pubrec * p)1475 static void handle_pubrec(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrec *p)
1476 {
1477 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBREL};
1478 	struct mqtt_sn_publish *pub = mqtt_sn_publish_find_by_msg_id(client, p->msg_id);
1479 
1480 	if (!pub) {
1481 		LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1482 		return;
1483 	}
1484 
1485 	pub->con.last_attempt = k_uptime_get();
1486 	pub->con.retries = N_RETRY;
1487 
1488 	response.params.pubrel.msg_id = p->msg_id;
1489 
1490 	encode_and_send(client, &response, 0);
1491 }
1492 
handle_pubrel(struct mqtt_sn_client * client,struct mqtt_sn_param_pubrel * p)1493 static void handle_pubrel(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrel *p)
1494 {
1495 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBCOMP};
1496 
1497 	response.params.pubcomp.msg_id = p->msg_id;
1498 
1499 	encode_and_send(client, &response, 0);
1500 }
1501 
handle_pubcomp(struct mqtt_sn_client * client,struct mqtt_sn_param_pubcomp * p)1502 static void handle_pubcomp(struct mqtt_sn_client *client, struct mqtt_sn_param_pubcomp *p)
1503 {
1504 	struct mqtt_sn_publish *pub = mqtt_sn_publish_find_by_msg_id(client, p->msg_id);
1505 
1506 	if (!pub) {
1507 		LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1508 		return;
1509 	}
1510 
1511 	mqtt_sn_publish_destroy(client, pub);
1512 }
1513 
handle_suback(struct mqtt_sn_client * client,struct mqtt_sn_param_suback * p)1514 static void handle_suback(struct mqtt_sn_client *client, struct mqtt_sn_param_suback *p)
1515 {
1516 	struct mqtt_sn_topic *topic = mqtt_sn_topic_find_by_msg_id(client, p->msg_id);
1517 
1518 	if (!topic) {
1519 		LOG_ERR("No matching SUBSCRIBE found for msg id %u", p->msg_id);
1520 		return;
1521 	}
1522 
1523 	if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1524 		topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBED;
1525 		topic->topic_id = p->topic_id;
1526 		topic->qos = p->qos;
1527 	} else {
1528 		LOG_WRN("SUBACK with ret code %d", p->ret_code);
1529 	}
1530 }
1531 
handle_unsuback(struct mqtt_sn_client * client,struct mqtt_sn_param_unsuback * p)1532 static void handle_unsuback(struct mqtt_sn_client *client, struct mqtt_sn_param_unsuback *p)
1533 {
1534 	struct mqtt_sn_topic *topic = mqtt_sn_topic_find_by_msg_id(client, p->msg_id);
1535 
1536 	if (!topic || topic->state != MQTT_SN_TOPIC_STATE_UNSUBSCRIBING) {
1537 		LOG_ERR("No matching UNSUBSCRIBE found for msg id %u", p->msg_id);
1538 		return;
1539 	}
1540 
1541 	mqtt_sn_topic_destroy(client, topic);
1542 }
1543 
handle_pingreq(struct mqtt_sn_client * client)1544 static void handle_pingreq(struct mqtt_sn_client *client)
1545 {
1546 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PINGRESP};
1547 
1548 	encode_and_send(client, &response, 0);
1549 }
1550 
handle_pingresp(struct mqtt_sn_client * client)1551 static void handle_pingresp(struct mqtt_sn_client *client)
1552 {
1553 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_PINGRESP};
1554 
1555 	if (client->evt_cb) {
1556 		client->evt_cb(client, &evt);
1557 	}
1558 
1559 	if (client->state == MQTT_SN_CLIENT_AWAKE) {
1560 		mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
1561 	}
1562 
1563 	client->ping_retries = N_RETRY;
1564 }
1565 
handle_disconnect(struct mqtt_sn_client * client,struct mqtt_sn_param_disconnect * p)1566 static void handle_disconnect(struct mqtt_sn_client *client, struct mqtt_sn_param_disconnect *p)
1567 {
1568 	LOG_INF("Received DISCONNECT");
1569 	mqtt_sn_disconnect_internal(client);
1570 }
1571 
handle_msg(struct mqtt_sn_client * client,struct mqtt_sn_data rx_addr)1572 static int handle_msg(struct mqtt_sn_client *client, struct mqtt_sn_data rx_addr)
1573 {
1574 	int err;
1575 	struct mqtt_sn_param p;
1576 
1577 	err = mqtt_sn_decode_msg(&client->rx, &p);
1578 	if (err) {
1579 		return err;
1580 	}
1581 
1582 	LOG_INF("Got message of type %d", p.type);
1583 
1584 	switch (p.type) {
1585 	case MQTT_SN_MSG_TYPE_ADVERTISE:
1586 		handle_advertise(client, &p.params.advertise, rx_addr);
1587 		break;
1588 	case MQTT_SN_MSG_TYPE_SEARCHGW:
1589 		handle_searchgw(client, &p.params.searchgw);
1590 		break;
1591 	case MQTT_SN_MSG_TYPE_GWINFO:
1592 		handle_gwinfo(client, &p.params.gwinfo, rx_addr);
1593 		break;
1594 	case MQTT_SN_MSG_TYPE_CONNACK:
1595 		handle_connack(client, &p.params.connack);
1596 		break;
1597 	case MQTT_SN_MSG_TYPE_WILLTOPICREQ:
1598 		handle_willtopicreq(client);
1599 		break;
1600 	case MQTT_SN_MSG_TYPE_WILLMSGREQ:
1601 		handle_willmsgreq(client);
1602 		break;
1603 	case MQTT_SN_MSG_TYPE_REGISTER:
1604 		handle_register(client, &p.params.reg);
1605 		break;
1606 	case MQTT_SN_MSG_TYPE_REGACK:
1607 		handle_regack(client, &p.params.regack);
1608 		break;
1609 	case MQTT_SN_MSG_TYPE_PUBLISH:
1610 		handle_publish(client, &p.params.publish);
1611 		break;
1612 	case MQTT_SN_MSG_TYPE_PUBACK:
1613 		handle_puback(client, &p.params.puback);
1614 		break;
1615 	case MQTT_SN_MSG_TYPE_PUBREC:
1616 		handle_pubrec(client, &p.params.pubrec);
1617 		break;
1618 	case MQTT_SN_MSG_TYPE_PUBREL:
1619 		handle_pubrel(client, &p.params.pubrel);
1620 		break;
1621 	case MQTT_SN_MSG_TYPE_PUBCOMP:
1622 		handle_pubcomp(client, &p.params.pubcomp);
1623 		break;
1624 	case MQTT_SN_MSG_TYPE_SUBACK:
1625 		handle_suback(client, &p.params.suback);
1626 		break;
1627 	case MQTT_SN_MSG_TYPE_UNSUBACK:
1628 		handle_unsuback(client, &p.params.unsuback);
1629 		break;
1630 	case MQTT_SN_MSG_TYPE_PINGREQ:
1631 		handle_pingreq(client);
1632 		break;
1633 	case MQTT_SN_MSG_TYPE_PINGRESP:
1634 		handle_pingresp(client);
1635 		break;
1636 	case MQTT_SN_MSG_TYPE_DISCONNECT:
1637 		handle_disconnect(client, &p.params.disconnect);
1638 		break;
1639 	case MQTT_SN_MSG_TYPE_WILLTOPICRESP:
1640 		break;
1641 	case MQTT_SN_MSG_TYPE_WILLMSGRESP:
1642 		break;
1643 	default:
1644 		LOG_ERR("Unexpected message type %d", p.type);
1645 		break;
1646 	}
1647 
1648 	k_work_reschedule(&client->process_work, K_NO_WAIT);
1649 
1650 	return 0;
1651 }
1652 
mqtt_sn_input(struct mqtt_sn_client * client)1653 int mqtt_sn_input(struct mqtt_sn_client *client)
1654 {
1655 	ssize_t next_frame_size;
1656 	char addr[CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE];
1657 	struct mqtt_sn_data rx_addr = {.data = addr, .size = CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE};
1658 	int err;
1659 
1660 	if (!client || !client->transport || !client->transport->recvfrom) {
1661 		return -EINVAL;
1662 	}
1663 
1664 	if (client->transport->poll) {
1665 		next_frame_size = client->transport->poll(client);
1666 		if (next_frame_size <= 0) {
1667 			return next_frame_size;
1668 		}
1669 	}
1670 
1671 	net_buf_simple_reset(&client->rx);
1672 
1673 	next_frame_size = client->transport->recvfrom(client, client->rx.data, client->rx.size,
1674 						      (void *)rx_addr.data, &rx_addr.size);
1675 	if (next_frame_size <= 0) {
1676 		return next_frame_size;
1677 	}
1678 
1679 	if (next_frame_size > client->rx.size) {
1680 		return -ENOBUFS;
1681 	}
1682 
1683 	client->rx.len = next_frame_size;
1684 
1685 	LOG_HEXDUMP_DBG(client->rx.data, client->rx.len, "Received data");
1686 
1687 	err = handle_msg(client, rx_addr);
1688 
1689 	if (err) {
1690 		return err;
1691 	}
1692 
1693 	/* Should be zero */
1694 	return -client->rx.len;
1695 }
1696 
mqtt_sn_get_topic_name(struct mqtt_sn_client * client,uint16_t id,struct mqtt_sn_data * topic_name)1697 int mqtt_sn_get_topic_name(struct mqtt_sn_client *client, uint16_t id,
1698 			   struct mqtt_sn_data *topic_name)
1699 {
1700 	struct mqtt_sn_topic *topic;
1701 
1702 	if (!client || !topic_name) {
1703 		return -EINVAL;
1704 	}
1705 
1706 	SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
1707 		if (topic->topic_id == id) {
1708 			topic_name->data = (const uint8_t *)topic->name;
1709 			topic_name->size = topic->namelen;
1710 			return 0;
1711 		}
1712 	}
1713 	return -ENOENT;
1714 }
1715