1 /*
2  * Copyright (c) 2022 grandcentrix GmbH
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 /** @file mqtt_sn.c
8  *
9  * @brief MQTT-SN Client API Implementation.
10  */
11 
12 #include "mqtt_sn_msg.h"
13 
14 #include <zephyr/logging/log.h>
15 #include <zephyr/random/random.h>
16 #include <zephyr/net/mqtt_sn.h>
17 LOG_MODULE_REGISTER(net_mqtt_sn, CONFIG_MQTT_SN_LOG_LEVEL);
18 
19 #define MQTT_SN_NET_BUFS (CONFIG_MQTT_SN_LIB_MAX_PUBLISH)
20 
21 NET_BUF_POOL_FIXED_DEFINE(mqtt_sn_messages, MQTT_SN_NET_BUFS, CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE,
22 			  0, NULL);
23 
24 struct mqtt_sn_confirmable {
25 	int64_t last_attempt;
26 	uint16_t msg_id;
27 	uint8_t retries;
28 };
29 
30 struct mqtt_sn_publish {
31 	struct mqtt_sn_confirmable con;
32 	sys_snode_t next;
33 	struct mqtt_sn_topic *topic;
34 	uint8_t pubdata[CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE];
35 	size_t datalen;
36 	enum mqtt_sn_qos qos;
37 	bool retain;
38 };
39 
40 enum mqtt_sn_topic_state {
41 	MQTT_SN_TOPIC_STATE_REGISTERING,
42 	MQTT_SN_TOPIC_STATE_REGISTERED,
43 	MQTT_SN_TOPIC_STATE_SUBSCRIBING,
44 	MQTT_SN_TOPIC_STATE_SUBSCRIBED,
45 	MQTT_SN_TOPIC_STATE_UNSUBSCRIBING,
46 };
47 
48 struct mqtt_sn_topic {
49 	struct mqtt_sn_confirmable con;
50 	sys_snode_t next;
51 	char name[CONFIG_MQTT_SN_LIB_MAX_TOPIC_SIZE];
52 	size_t namelen;
53 	uint16_t topic_id;
54 	enum mqtt_sn_qos qos;
55 	enum mqtt_sn_topic_type type;
56 	enum mqtt_sn_topic_state state;
57 };
58 
59 struct mqtt_sn_gateway {
60 	sys_snode_t next;
61 	char gw_id;
62 	int64_t adv_timer;
63 	char addr[CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE];
64 	size_t addr_len;
65 };
66 
67 K_MEM_SLAB_DEFINE_STATIC(publishes, sizeof(struct mqtt_sn_publish), CONFIG_MQTT_SN_LIB_MAX_PUBLISH,
68 			 4);
69 K_MEM_SLAB_DEFINE_STATIC(topics, sizeof(struct mqtt_sn_topic), CONFIG_MQTT_SN_LIB_MAX_TOPICS, 4);
70 K_MEM_SLAB_DEFINE_STATIC(gateways, sizeof(struct mqtt_sn_gateway), CONFIG_MQTT_SN_LIB_MAX_GATEWAYS,
71 			 4);
72 
73 enum mqtt_sn_client_state {
74 	MQTT_SN_CLIENT_DISCONNECTED,
75 	MQTT_SN_CLIENT_ACTIVE,
76 	MQTT_SN_CLIENT_ASLEEP,
77 	MQTT_SN_CLIENT_AWAKE
78 };
79 
mqtt_sn_set_state(struct mqtt_sn_client * client,enum mqtt_sn_client_state state)80 static void mqtt_sn_set_state(struct mqtt_sn_client *client, enum mqtt_sn_client_state state)
81 {
82 	int prev_state = client->state;
83 
84 	client->state = state;
85 	LOG_DBG("Client %p state (%d) -> (%d)", client, prev_state, state);
86 }
87 
88 #define T_SEARCHGW_MSEC  (CONFIG_MQTT_SN_LIB_T_SEARCHGW * MSEC_PER_SEC)
89 #define T_GWINFO_MSEC    (CONFIG_MQTT_SN_LIB_T_GWINFO * MSEC_PER_SEC)
90 #define T_RETRY_MSEC     (CONFIG_MQTT_SN_LIB_T_RETRY * MSEC_PER_SEC)
91 #define N_RETRY          (CONFIG_MQTT_SN_LIB_N_RETRY)
92 #define T_KEEPALIVE_MSEC (CONFIG_MQTT_SN_KEEPALIVE * MSEC_PER_SEC)
93 
next_msg_id(void)94 static uint16_t next_msg_id(void)
95 {
96 	static uint16_t msg_id;
97 
98 	return ++msg_id;
99 }
100 
encode_and_send(struct mqtt_sn_client * client,struct mqtt_sn_param * p,uint8_t broadcast_radius)101 static int encode_and_send(struct mqtt_sn_client *client, struct mqtt_sn_param *p,
102 			   uint8_t broadcast_radius)
103 {
104 	int err;
105 
106 	err = mqtt_sn_encode_msg(&client->tx, p);
107 	if (err) {
108 		goto end;
109 	}
110 
111 	LOG_HEXDUMP_DBG(client->tx.data, client->tx.len, "Send message");
112 
113 	if (!client->transport->sendto) {
114 		LOG_ERR("Can't send: no callback");
115 		err = -ENOTSUP;
116 		goto end;
117 	}
118 
119 	if (!client->tx.len) {
120 		LOG_WRN("Can't send: empty");
121 		err = -ENOMSG;
122 		goto end;
123 	}
124 
125 	if (broadcast_radius) {
126 		err = client->transport->sendto(client, client->tx.data, client->tx.len, NULL,
127 						broadcast_radius);
128 	} else {
129 		struct mqtt_sn_gateway *gw;
130 
131 		gw = SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
132 		if (gw == NULL || gw->addr_len == 0) {
133 			LOG_WRN("No Gateway Address");
134 			err = -ENXIO;
135 			goto end;
136 		}
137 		err = client->transport->sendto(client, client->tx.data, client->tx.len, gw->addr,
138 						gw->addr_len);
139 	}
140 
141 end:
142 	if (err) {
143 		LOG_ERR("Error during send: %d", err);
144 	}
145 	net_buf_simple_reset(&client->tx);
146 
147 	return err;
148 }
149 
mqtt_sn_con_init(struct mqtt_sn_confirmable * con)150 static void mqtt_sn_con_init(struct mqtt_sn_confirmable *con)
151 {
152 	con->last_attempt = 0;
153 	con->retries = N_RETRY;
154 	con->msg_id = next_msg_id();
155 }
156 
mqtt_sn_publish_destroy(struct mqtt_sn_client * client,struct mqtt_sn_publish * pub)157 static void mqtt_sn_publish_destroy(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub)
158 {
159 	sys_slist_find_and_remove(&client->publish, &pub->next);
160 	k_mem_slab_free(&publishes, (void *)pub);
161 }
162 
mqtt_sn_publish_destroy_all(struct mqtt_sn_client * client)163 static void mqtt_sn_publish_destroy_all(struct mqtt_sn_client *client)
164 {
165 	struct mqtt_sn_publish *pub;
166 	sys_snode_t *next;
167 
168 	while ((next = sys_slist_get(&client->publish)) != NULL) {
169 		pub = SYS_SLIST_CONTAINER(next, pub, next);
170 		k_mem_slab_free(&publishes, (void *)pub);
171 	}
172 }
173 
mqtt_sn_publish_create(struct mqtt_sn_data * data)174 static struct mqtt_sn_publish *mqtt_sn_publish_create(struct mqtt_sn_data *data)
175 {
176 	struct mqtt_sn_publish *pub;
177 
178 	if (k_mem_slab_alloc(&publishes, (void **)&pub, K_NO_WAIT)) {
179 		LOG_ERR("Can't create PUB: no free slot");
180 		return NULL;
181 	}
182 
183 	memset(pub, 0, sizeof(*pub));
184 
185 	if (data && data->data && data->size) {
186 		if (data->size > sizeof(pub->pubdata)) {
187 			LOG_ERR("Can't create PUB: Too much data (%zu)", data->size);
188 			return NULL;
189 		}
190 
191 		memcpy(pub->pubdata, data->data, data->size);
192 		pub->datalen = data->size;
193 	}
194 
195 	mqtt_sn_con_init(&pub->con);
196 
197 	return pub;
198 }
199 
mqtt_sn_publish_find_msg_id(struct mqtt_sn_client * client,uint16_t msg_id)200 static struct mqtt_sn_publish *mqtt_sn_publish_find_msg_id(struct mqtt_sn_client *client,
201 							   uint16_t msg_id)
202 {
203 	struct mqtt_sn_publish *pub;
204 
205 	SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
206 		if (pub->con.msg_id == msg_id) {
207 			return pub;
208 		}
209 	}
210 
211 	return NULL;
212 }
213 
mqtt_sn_publish_find_topic(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)214 static struct mqtt_sn_publish *mqtt_sn_publish_find_topic(struct mqtt_sn_client *client,
215 							  struct mqtt_sn_topic *topic)
216 {
217 	struct mqtt_sn_publish *pub;
218 
219 	SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
220 		if (pub->topic == topic) {
221 			return pub;
222 		}
223 	}
224 
225 	return NULL;
226 }
227 
mqtt_sn_topic_create(struct mqtt_sn_data * name)228 static struct mqtt_sn_topic *mqtt_sn_topic_create(struct mqtt_sn_data *name)
229 {
230 	struct mqtt_sn_topic *topic;
231 
232 	if (k_mem_slab_alloc(&topics, (void **)&topic, K_NO_WAIT)) {
233 		LOG_ERR("Can't create topic: no free slot");
234 		return NULL;
235 	}
236 
237 	memset(topic, 0, sizeof(*topic));
238 
239 	if (!name || !name->data || !name->size) {
240 		LOG_ERR("Can't create topic with empty name");
241 		return NULL;
242 	}
243 
244 	if (name->size > sizeof(topic->name)) {
245 		LOG_ERR("Can't create topic: name too long (%zu)", name->size);
246 		return NULL;
247 	}
248 
249 	memcpy(topic->name, name->data, name->size);
250 	topic->namelen = name->size;
251 
252 	mqtt_sn_con_init(&topic->con);
253 
254 	return topic;
255 }
256 
mqtt_sn_topic_find_name(struct mqtt_sn_client * client,struct mqtt_sn_data * topic_name)257 static struct mqtt_sn_topic *mqtt_sn_topic_find_name(struct mqtt_sn_client *client,
258 						     struct mqtt_sn_data *topic_name)
259 {
260 	struct mqtt_sn_topic *topic;
261 
262 	SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
263 		if (topic->namelen == topic_name->size &&
264 		    memcmp(topic->name, topic_name->data, topic_name->size) == 0) {
265 			return topic;
266 		}
267 	}
268 
269 	return NULL;
270 }
271 
mqtt_sn_topic_find_msg_id(struct mqtt_sn_client * client,uint16_t msg_id)272 static struct mqtt_sn_topic *mqtt_sn_topic_find_msg_id(struct mqtt_sn_client *client,
273 						       uint16_t msg_id)
274 {
275 	struct mqtt_sn_topic *topic;
276 
277 	SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
278 		if (topic->con.msg_id == msg_id) {
279 			return topic;
280 		}
281 	}
282 
283 	return NULL;
284 }
285 
mqtt_sn_topic_destroy(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)286 static void mqtt_sn_topic_destroy(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
287 {
288 	struct mqtt_sn_publish *pub;
289 
290 	/* Destroy all pubs referencing this topic */
291 	while ((pub = mqtt_sn_publish_find_topic(client, topic)) != NULL) {
292 		LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
293 		mqtt_sn_publish_destroy(client, pub);
294 	}
295 
296 	sys_slist_find_and_remove(&client->topic, &topic->next);
297 }
298 
mqtt_sn_topic_destroy_all(struct mqtt_sn_client * client)299 static void mqtt_sn_topic_destroy_all(struct mqtt_sn_client *client)
300 {
301 	struct mqtt_sn_topic *topic;
302 	struct mqtt_sn_publish *pub;
303 	sys_snode_t *next;
304 
305 	while ((next = sys_slist_get(&client->topic)) != NULL) {
306 		topic = SYS_SLIST_CONTAINER(next, topic, next);
307 		/* Destroy all pubs referencing this topic */
308 		while ((pub = mqtt_sn_publish_find_topic(client, topic)) != NULL) {
309 			LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
310 			mqtt_sn_publish_destroy(client, pub);
311 		}
312 
313 		k_mem_slab_free(&topics, (void *)topic);
314 	}
315 }
316 
mqtt_sn_gw_destroy(struct mqtt_sn_client * client,struct mqtt_sn_gateway * gw)317 static void mqtt_sn_gw_destroy(struct mqtt_sn_client *client, struct mqtt_sn_gateway *gw)
318 {
319 	LOG_DBG("Destroying gateway %d", gw->gw_id);
320 	sys_slist_find_and_remove(&client->gateway, &gw->next);
321 	k_mem_slab_free(&gateways, (void *)gw);
322 }
323 
mqtt_sn_gw_destroy_all(struct mqtt_sn_client * client)324 static void mqtt_sn_gw_destroy_all(struct mqtt_sn_client *client)
325 {
326 	struct mqtt_sn_gateway *gw;
327 	sys_snode_t *next;
328 
329 	while ((next = sys_slist_get(&client->gateway)) != NULL) {
330 		gw = SYS_SLIST_CONTAINER(next, gw, next);
331 		sys_slist_find_and_remove(&client->gateway, next);
332 		k_mem_slab_free(&gateways, (void *)gw);
333 	}
334 }
335 
mqtt_sn_gw_create(uint8_t gw_id,short duration,struct mqtt_sn_data gw_addr)336 static struct mqtt_sn_gateway *mqtt_sn_gw_create(uint8_t gw_id, short duration,
337 						 struct mqtt_sn_data gw_addr)
338 {
339 	struct mqtt_sn_gateway *gw;
340 
341 	LOG_DBG("Free GW slots: %d", k_mem_slab_num_free_get(&gateways));
342 	if (k_mem_slab_alloc(&gateways, (void **)&gw, K_NO_WAIT)) {
343 		LOG_WRN("Can't create GW: no free slot");
344 		return NULL;
345 	}
346 
347 	__ASSERT(gw_addr.size < CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE,
348 		 "Gateway address is larger than allowed by CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE");
349 
350 	memset(gw, 0, sizeof(*gw));
351 	memcpy(gw->addr, gw_addr.data, gw_addr.size);
352 	gw->addr_len = gw_addr.size;
353 	gw->gw_id = gw_id;
354 	if (duration == -1) {
355 		gw->adv_timer = duration;
356 	} else {
357 		gw->adv_timer =
358 			k_uptime_get() + (duration * CONFIG_MQTT_SN_LIB_N_ADV * MSEC_PER_SEC);
359 	}
360 
361 	return gw;
362 }
363 
mqtt_sn_gw_find_id(struct mqtt_sn_client * client,uint16_t gw_id)364 static struct mqtt_sn_gateway *mqtt_sn_gw_find_id(struct mqtt_sn_client *client, uint16_t gw_id)
365 {
366 	struct mqtt_sn_gateway *gw;
367 
368 	SYS_SLIST_FOR_EACH_CONTAINER(&client->gateway, gw, next) {
369 		if (gw->gw_id == gw_id) {
370 			return gw;
371 		}
372 	}
373 
374 	return NULL;
375 }
376 
mqtt_sn_disconnect_internal(struct mqtt_sn_client * client)377 static void mqtt_sn_disconnect_internal(struct mqtt_sn_client *client)
378 {
379 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
380 
381 	mqtt_sn_set_state(client, MQTT_SN_CLIENT_DISCONNECTED);
382 	if (client->evt_cb) {
383 		client->evt_cb(client, &evt);
384 	}
385 
386 	/*
387 	 * Remove all publishes, but keep topics
388 	 * Topics are removed on deinit or when connect is called with
389 	 * clean-session = true
390 	 */
391 	mqtt_sn_publish_destroy_all(client);
392 
393 	k_work_cancel_delayable(&client->process_work);
394 }
395 
mqtt_sn_sleep_internal(struct mqtt_sn_client * client)396 static void mqtt_sn_sleep_internal(struct mqtt_sn_client *client)
397 {
398 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
399 
400 	mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
401 	if (client->evt_cb) {
402 		client->evt_cb(client, &evt);
403 	}
404 }
405 
mqtt_sn_do_subscribe(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic,bool dup)406 static void mqtt_sn_do_subscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic,
407 				 bool dup)
408 {
409 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_SUBSCRIBE};
410 
411 	if (!client || !topic) {
412 		return;
413 	}
414 
415 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
416 		LOG_ERR("Cannot subscribe: not connected");
417 		return;
418 	}
419 
420 	p.params.subscribe.msg_id = topic->con.msg_id;
421 	p.params.subscribe.qos = topic->qos;
422 	p.params.subscribe.topic_type = topic->type;
423 	p.params.subscribe.dup = dup;
424 	switch (topic->type) {
425 	case MQTT_SN_TOPIC_TYPE_NORMAL:
426 		p.params.subscribe.topic.topic_name.data = topic->name;
427 		p.params.subscribe.topic.topic_name.size = topic->namelen;
428 		break;
429 	case MQTT_SN_TOPIC_TYPE_PREDEF:
430 	case MQTT_SN_TOPIC_TYPE_SHORT:
431 		p.params.subscribe.topic.topic_id = topic->topic_id;
432 		break;
433 	default:
434 		LOG_ERR("Unexpected topic type %d", topic->type);
435 		return;
436 	}
437 
438 	encode_and_send(client, &p, 0);
439 }
440 
mqtt_sn_do_unsubscribe(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)441 static void mqtt_sn_do_unsubscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
442 {
443 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_UNSUBSCRIBE};
444 
445 	if (!client || !topic) {
446 		return;
447 	}
448 
449 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
450 		LOG_ERR("Cannot unsubscribe: not connected");
451 		return;
452 	}
453 
454 	p.params.unsubscribe.msg_id = topic->con.msg_id;
455 	p.params.unsubscribe.topic_type = topic->type;
456 	switch (topic->type) {
457 	case MQTT_SN_TOPIC_TYPE_NORMAL:
458 		p.params.unsubscribe.topic.topic_name.data = topic->name;
459 		p.params.unsubscribe.topic.topic_name.size = topic->namelen;
460 		break;
461 	case MQTT_SN_TOPIC_TYPE_PREDEF:
462 	case MQTT_SN_TOPIC_TYPE_SHORT:
463 		p.params.unsubscribe.topic.topic_id = topic->topic_id;
464 		break;
465 	default:
466 		LOG_ERR("Unexpected topic type %d", topic->type);
467 		return;
468 	}
469 
470 	encode_and_send(client, &p, 0);
471 }
472 
mqtt_sn_do_register(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)473 static void mqtt_sn_do_register(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
474 {
475 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_REGISTER};
476 
477 	if (!client || !topic) {
478 		return;
479 	}
480 
481 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
482 		LOG_ERR("Cannot register: not connected");
483 		return;
484 	}
485 
486 	p.params.reg.msg_id = topic->con.msg_id;
487 	switch (topic->type) {
488 	case MQTT_SN_TOPIC_TYPE_NORMAL:
489 		LOG_HEXDUMP_INF(topic->name, topic->namelen, "Registering topic");
490 		p.params.reg.topic.data = topic->name;
491 		p.params.reg.topic.size = topic->namelen;
492 		break;
493 	default:
494 		LOG_ERR("Unexpected topic type %d", topic->type);
495 		return;
496 	}
497 
498 	encode_and_send(client, &p, 0);
499 }
500 
mqtt_sn_do_publish(struct mqtt_sn_client * client,struct mqtt_sn_publish * pub,bool dup)501 static void mqtt_sn_do_publish(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub, bool dup)
502 {
503 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PUBLISH};
504 
505 	if (!client || !pub) {
506 		return;
507 	}
508 
509 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
510 		LOG_ERR("Cannot subscribe: not connected");
511 		return;
512 	}
513 
514 	LOG_INF("Publishing to topic ID %d", pub->topic->topic_id);
515 
516 	p.params.publish.data.data = pub->pubdata;
517 	p.params.publish.data.size = pub->datalen;
518 	p.params.publish.msg_id = pub->con.msg_id;
519 	p.params.publish.retain = pub->retain;
520 	p.params.publish.topic_id = pub->topic->topic_id;
521 	p.params.publish.topic_type = pub->topic->type;
522 	p.params.publish.qos = pub->qos;
523 	p.params.publish.dup = dup;
524 
525 	encode_and_send(client, &p, 0);
526 }
527 
mqtt_sn_do_searchgw(struct mqtt_sn_client * client)528 static void mqtt_sn_do_searchgw(struct mqtt_sn_client *client)
529 {
530 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_SEARCHGW};
531 
532 	p.params.searchgw.radius = CONFIG_MQTT_SN_LIB_BROADCAST_RADIUS;
533 
534 	encode_and_send(client, &p, CONFIG_MQTT_SN_LIB_BROADCAST_RADIUS);
535 }
536 
mqtt_sn_do_gwinfo(struct mqtt_sn_client * client)537 static void mqtt_sn_do_gwinfo(struct mqtt_sn_client *client)
538 {
539 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_GWINFO};
540 	struct mqtt_sn_gateway *gw;
541 	struct mqtt_sn_data addr;
542 
543 	gw = SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
544 
545 	if (gw == NULL || gw->addr_len == 0) {
546 		LOG_WRN("No Gateway Address");
547 		return;
548 	}
549 
550 	response.params.gwinfo.gw_id = gw->gw_id;
551 	addr.data = gw->addr;
552 	addr.size = gw->addr_len;
553 	response.params.gwinfo.gw_add = addr;
554 
555 	encode_and_send(client, &response, client->radius_gwinfo);
556 }
557 
mqtt_sn_do_ping(struct mqtt_sn_client * client)558 static void mqtt_sn_do_ping(struct mqtt_sn_client *client)
559 {
560 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PINGREQ};
561 
562 	switch (client->state) {
563 	case MQTT_SN_CLIENT_ASLEEP:
564 		/*
565 		 * From the spec regarding PINGREQ:
566 		 * ClientId: contains the client id; this field is optional
567 		 * and is included by a “sleeping” client when it goes to the
568 		 * “awake” state and is waiting for messages sent by the
569 		 * server/gateway, see Section 6.14 for further details.
570 		 */
571 		p.params.pingreq.client_id.data = client->client_id.data;
572 		p.params.pingreq.client_id.size = client->client_id.size;
573 	case MQTT_SN_CLIENT_ACTIVE:
574 		encode_and_send(client, &p, 0);
575 		break;
576 	default:
577 		LOG_WRN("Can't ping in state %d", client->state);
578 		break;
579 	}
580 }
581 
process_pubs(struct mqtt_sn_client * client,int64_t * next_cycle)582 static int process_pubs(struct mqtt_sn_client *client, int64_t *next_cycle)
583 {
584 	struct mqtt_sn_publish *pub, *pubs;
585 	const int64_t now = k_uptime_get();
586 	int64_t next_attempt;
587 	bool dup;
588 
589 	SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->publish, pub, pubs, next) {
590 		LOG_HEXDUMP_DBG(pub->topic->name, pub->topic->namelen,
591 				"Processing publish for topic");
592 		LOG_HEXDUMP_DBG(pub->pubdata, pub->datalen, "Processing publish data");
593 
594 		if (pub->con.last_attempt == 0) {
595 			next_attempt = 0;
596 			dup = false;
597 		} else {
598 			next_attempt = pub->con.last_attempt + T_RETRY_MSEC;
599 			dup = true;
600 		}
601 
602 		if (next_attempt <= now) {
603 			switch (pub->topic->state) {
604 			case MQTT_SN_TOPIC_STATE_REGISTERING:
605 			case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
606 			case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
607 				LOG_INF("Can't publish; topic is not ready");
608 				break;
609 			case MQTT_SN_TOPIC_STATE_REGISTERED:
610 			case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
611 				if (!pub->con.retries--) {
612 					LOG_WRN("PUB ran out of retries, disconnecting");
613 					mqtt_sn_disconnect_internal(client);
614 					return -ETIMEDOUT;
615 				}
616 				mqtt_sn_do_publish(client, pub, dup);
617 				if (pub->qos == MQTT_SN_QOS_0 || pub->qos == MQTT_SN_QOS_M1) {
618 					/* We are done, remove this */
619 					mqtt_sn_publish_destroy(client, pub);
620 					continue;
621 				} else {
622 					/* Wait for ack */
623 					pub->con.last_attempt = now;
624 					next_attempt = now + T_RETRY_MSEC;
625 				}
626 				break;
627 			}
628 		}
629 
630 		if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
631 			*next_cycle = next_attempt;
632 		}
633 	}
634 
635 	LOG_DBG("next_cycle: %lld", *next_cycle);
636 
637 	return 0;
638 }
639 
process_topics(struct mqtt_sn_client * client,int64_t * next_cycle)640 static int process_topics(struct mqtt_sn_client *client, int64_t *next_cycle)
641 {
642 	struct mqtt_sn_topic *topic;
643 	const int64_t now = k_uptime_get();
644 	int64_t next_attempt;
645 	bool dup;
646 
647 	SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
648 		LOG_HEXDUMP_DBG(topic->name, topic->namelen, "Processing topic");
649 
650 		if (topic->con.last_attempt == 0) {
651 			next_attempt = 0;
652 			dup = false;
653 		} else {
654 			next_attempt = topic->con.last_attempt + T_RETRY_MSEC;
655 			dup = true;
656 		}
657 
658 		if (next_attempt <= now) {
659 			switch (topic->state) {
660 			case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
661 				if (!topic->con.retries--) {
662 					LOG_WRN("Topic ran out of retries, disconnecting");
663 					mqtt_sn_disconnect_internal(client);
664 					return -ETIMEDOUT;
665 				}
666 
667 				mqtt_sn_do_subscribe(client, topic, dup);
668 				topic->con.last_attempt = now;
669 				next_attempt = now + T_RETRY_MSEC;
670 				break;
671 			case MQTT_SN_TOPIC_STATE_REGISTERING:
672 				if (!topic->con.retries--) {
673 					LOG_WRN("Topic ran out of retries, disconnecting");
674 					mqtt_sn_disconnect_internal(client);
675 					return -ETIMEDOUT;
676 				}
677 
678 				mqtt_sn_do_register(client, topic);
679 				topic->con.last_attempt = now;
680 				next_attempt = now + T_RETRY_MSEC;
681 				break;
682 			case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
683 				if (!topic->con.retries--) {
684 					LOG_WRN("Topic ran out of retries, disconnecting");
685 					mqtt_sn_disconnect_internal(client);
686 					return -ETIMEDOUT;
687 				}
688 				mqtt_sn_do_unsubscribe(client, topic);
689 				topic->con.last_attempt = now;
690 				next_attempt = now + T_RETRY_MSEC;
691 				break;
692 			case MQTT_SN_TOPIC_STATE_REGISTERED:
693 			case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
694 				break;
695 			}
696 		}
697 
698 		if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
699 			*next_cycle = next_attempt;
700 		}
701 	}
702 
703 	LOG_DBG("next_cycle: %lld", *next_cycle);
704 
705 	return 0;
706 }
707 
process_ping(struct mqtt_sn_client * client,int64_t * next_cycle)708 static int process_ping(struct mqtt_sn_client *client, int64_t *next_cycle)
709 {
710 	const int64_t now = k_uptime_get();
711 	struct mqtt_sn_gateway *gw = NULL;
712 	int64_t next_ping;
713 
714 	if (client->ping_retries == N_RETRY) {
715 		/* Last ping was acked */
716 		next_ping = client->last_ping + T_KEEPALIVE_MSEC;
717 	} else {
718 		next_ping = client->last_ping + T_RETRY_MSEC;
719 	}
720 
721 	if (next_ping < now) {
722 		if (!client->ping_retries--) {
723 			LOG_WRN("Ping ran out of retries");
724 			mqtt_sn_disconnect_internal(client);
725 			SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
726 			LOG_DBG("Removing non-responsive GW 0x%08x", gw->gw_id);
727 			mqtt_sn_gw_destroy(client, gw);
728 			return -ETIMEDOUT;
729 		}
730 
731 		LOG_DBG("Sending PINGREQ");
732 		mqtt_sn_do_ping(client);
733 		client->last_ping = now;
734 		next_ping = now + T_RETRY_MSEC;
735 	}
736 
737 	if (*next_cycle == 0 || next_ping < *next_cycle) {
738 		*next_cycle = next_ping;
739 	}
740 
741 	LOG_DBG("next_cycle: %lld", *next_cycle);
742 
743 	return 0;
744 }
745 
process_search(struct mqtt_sn_client * client,int64_t * next_cycle)746 static int process_search(struct mqtt_sn_client *client, int64_t *next_cycle)
747 {
748 	const int64_t now = k_uptime_get();
749 
750 	LOG_DBG("ts_searchgw: %lld", client->ts_searchgw);
751 	LOG_DBG("ts_gwinfo: %lld", client->ts_gwinfo);
752 
753 	if (client->ts_searchgw != 0 && client->ts_searchgw <= now) {
754 		LOG_DBG("Sending SEARCHGW");
755 		mqtt_sn_do_searchgw(client);
756 		client->ts_searchgw = 0;
757 	}
758 
759 	if (client->ts_gwinfo != 0 && client->ts_gwinfo <= now) {
760 		LOG_DBG("Sending GWINFO");
761 		mqtt_sn_do_gwinfo(client);
762 		client->ts_gwinfo = 0;
763 	}
764 
765 	if (*next_cycle == 0 || (client->ts_searchgw != 0 && client->ts_searchgw < *next_cycle)) {
766 		*next_cycle = client->ts_searchgw;
767 	}
768 	if (*next_cycle == 0 || (client->ts_gwinfo != 0 && client->ts_gwinfo < *next_cycle)) {
769 		*next_cycle = client->ts_gwinfo;
770 	}
771 
772 	LOG_DBG("next_cycle: %lld", *next_cycle);
773 
774 	return 0;
775 }
776 
process_advertise(struct mqtt_sn_client * client,int64_t * next_cycle)777 static int process_advertise(struct mqtt_sn_client *client, int64_t *next_cycle)
778 {
779 	const int64_t now = k_uptime_get();
780 	struct mqtt_sn_gateway *gw;
781 	struct mqtt_sn_gateway *gw_next;
782 
783 	SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->gateway, gw, gw_next, next) {
784 		LOG_DBG("Checking if GW 0x%02x is old", gw->gw_id);
785 		if (gw->adv_timer != -1 && gw->adv_timer <= now) {
786 			LOG_DBG("Removing non-responsive GW 0x%08x", gw->gw_id);
787 			if (client->gateway.head == &gw->next) {
788 				mqtt_sn_disconnect(client);
789 			}
790 			mqtt_sn_gw_destroy(client, gw);
791 		}
792 		if (gw->adv_timer != -1 && (*next_cycle == 0 || gw->adv_timer < *next_cycle)) {
793 			*next_cycle = gw->adv_timer;
794 		}
795 	}
796 	LOG_DBG("next_cycle: %lld", *next_cycle);
797 
798 	return 0;
799 }
800 
process_work(struct k_work * wrk)801 static void process_work(struct k_work *wrk)
802 {
803 	struct mqtt_sn_client *client;
804 	struct k_work_delayable *dwork;
805 	int64_t next_cycle = 0;
806 	int err;
807 
808 	dwork = k_work_delayable_from_work(wrk);
809 	client = CONTAINER_OF(dwork, struct mqtt_sn_client, process_work);
810 
811 	LOG_DBG("Executing work of client %p in state %d at time %lld", client, client->state,
812 		k_uptime_get());
813 
814 	/* Clean up old advertised gateways from list */
815 	err = process_advertise(client, &next_cycle);
816 	if (err) {
817 		return;
818 	}
819 
820 	/* Handle GW search process timers */
821 	err = process_search(client, &next_cycle);
822 	if (err) {
823 		return;
824 	}
825 
826 	if (client->state == MQTT_SN_CLIENT_ACTIVE) {
827 		err = process_topics(client, &next_cycle);
828 		if (err) {
829 			return;
830 		}
831 
832 		err = process_pubs(client, &next_cycle);
833 		if (err) {
834 			return;
835 		}
836 
837 		err = process_ping(client, &next_cycle);
838 		if (err) {
839 			return;
840 		}
841 	}
842 
843 	if (next_cycle > 0) {
844 		LOG_DBG("next_cycle: %lld", next_cycle);
845 		k_work_schedule(dwork, K_MSEC(next_cycle - k_uptime_get()));
846 	}
847 }
848 
mqtt_sn_client_init(struct mqtt_sn_client * client,const struct mqtt_sn_data * client_id,struct mqtt_sn_transport * transport,mqtt_sn_evt_cb_t evt_cb,void * tx,size_t txsz,void * rx,size_t rxsz)849 int mqtt_sn_client_init(struct mqtt_sn_client *client, const struct mqtt_sn_data *client_id,
850 			struct mqtt_sn_transport *transport, mqtt_sn_evt_cb_t evt_cb, void *tx,
851 			size_t txsz, void *rx, size_t rxsz)
852 {
853 	if (!client || !client_id || !transport || !evt_cb || !tx || !rx) {
854 		return -EINVAL;
855 	}
856 
857 	memset(client, 0, sizeof(*client));
858 
859 	client->client_id.data = client_id->data;
860 	client->client_id.size = client_id->size;
861 	client->transport = transport;
862 	client->evt_cb = evt_cb;
863 
864 	net_buf_simple_init_with_data(&client->tx, tx, txsz);
865 	net_buf_simple_reset(&client->tx);
866 
867 	net_buf_simple_init_with_data(&client->rx, rx, rxsz);
868 	net_buf_simple_reset(&client->rx);
869 
870 	k_work_init_delayable(&client->process_work, process_work);
871 
872 	if (transport->init) {
873 		transport->init(transport);
874 	}
875 
876 	return 0;
877 }
878 
mqtt_sn_client_deinit(struct mqtt_sn_client * client)879 void mqtt_sn_client_deinit(struct mqtt_sn_client *client)
880 {
881 	if (!client) {
882 		return;
883 	}
884 
885 	mqtt_sn_publish_destroy_all(client);
886 	mqtt_sn_topic_destroy_all(client);
887 	mqtt_sn_gw_destroy_all(client);
888 
889 	if (client->transport && client->transport->deinit) {
890 		client->transport->deinit(client->transport);
891 	}
892 
893 	k_work_cancel_delayable(&client->process_work);
894 }
895 
mqtt_sn_add_gw(struct mqtt_sn_client * client,uint8_t gw_id,struct mqtt_sn_data gw_addr)896 int mqtt_sn_add_gw(struct mqtt_sn_client *client, uint8_t gw_id, struct mqtt_sn_data gw_addr)
897 {
898 	struct mqtt_sn_gateway *gw;
899 
900 	gw = mqtt_sn_gw_find_id(client, gw_id);
901 
902 	if (gw != NULL) {
903 		mqtt_sn_gw_destroy(client, gw);
904 	}
905 
906 	gw = mqtt_sn_gw_create(gw_id, -1, gw_addr);
907 	if (!gw) {
908 		return -ENOMEM;
909 	}
910 
911 	sys_slist_append(&client->gateway, &gw->next);
912 
913 	return 0;
914 }
915 
mqtt_sn_search(struct mqtt_sn_client * client,uint8_t radius)916 int mqtt_sn_search(struct mqtt_sn_client *client, uint8_t radius)
917 {
918 	if (!client) {
919 		return -EINVAL;
920 	}
921 	/* Set SEARCHGW transmission timer */
922 	client->ts_searchgw = k_uptime_get() + (T_SEARCHGW_MSEC * sys_rand8_get() / 255);
923 	k_work_schedule(&client->process_work, K_NO_WAIT);
924 	LOG_DBG("Requested SEARCHGW for time %lld at time %lld", client->ts_searchgw,
925 		k_uptime_get());
926 
927 	return 0;
928 }
929 
mqtt_sn_connect(struct mqtt_sn_client * client,bool will,bool clean_session)930 int mqtt_sn_connect(struct mqtt_sn_client *client, bool will, bool clean_session)
931 {
932 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_CONNECT};
933 
934 	if (!client) {
935 		return -EINVAL;
936 	}
937 
938 	if (will && (!client->will_msg.data || !client->will_topic.data)) {
939 		LOG_ERR("will set to true, but no will data in client");
940 		return -EINVAL;
941 	}
942 
943 	if (clean_session) {
944 		mqtt_sn_topic_destroy_all(client);
945 	}
946 
947 	p.params.connect.clean_session = clean_session;
948 	p.params.connect.will = will;
949 	p.params.connect.duration = CONFIG_MQTT_SN_KEEPALIVE;
950 	p.params.connect.client_id.data = client->client_id.data;
951 	p.params.connect.client_id.size = client->client_id.size;
952 
953 	client->last_ping = k_uptime_get();
954 
955 	return encode_and_send(client, &p, 0);
956 }
957 
mqtt_sn_disconnect(struct mqtt_sn_client * client)958 int mqtt_sn_disconnect(struct mqtt_sn_client *client)
959 {
960 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
961 	int err;
962 
963 	if (!client) {
964 		return -EINVAL;
965 	}
966 
967 	p.params.disconnect.duration = 0;
968 
969 	err = encode_and_send(client, &p, 0);
970 	mqtt_sn_disconnect_internal(client);
971 
972 	return err;
973 }
974 
mqtt_sn_sleep(struct mqtt_sn_client * client,uint16_t duration)975 int mqtt_sn_sleep(struct mqtt_sn_client *client, uint16_t duration)
976 {
977 	struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
978 	int err;
979 
980 	if (!client || !duration) {
981 		return -EINVAL;
982 	}
983 
984 	p.params.disconnect.duration = duration;
985 
986 	err = encode_and_send(client, &p, 0);
987 	mqtt_sn_sleep_internal(client);
988 
989 	return err;
990 }
991 
mqtt_sn_subscribe(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name)992 int mqtt_sn_subscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
993 		      struct mqtt_sn_data *topic_name)
994 {
995 	struct mqtt_sn_topic *topic;
996 	int err;
997 
998 	if (!client || !topic_name || !topic_name->data || !topic_name->size) {
999 		return -EINVAL;
1000 	}
1001 
1002 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1003 		LOG_ERR("Cannot subscribe: not connected");
1004 		return -ENOTCONN;
1005 	}
1006 
1007 	topic = mqtt_sn_topic_find_name(client, topic_name);
1008 	if (!topic) {
1009 		topic = mqtt_sn_topic_create(topic_name);
1010 		if (!topic) {
1011 			return -ENOMEM;
1012 		}
1013 
1014 		topic->qos = qos;
1015 		topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBING;
1016 		sys_slist_append(&client->topic, &topic->next);
1017 	}
1018 
1019 	err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1020 	if (err < 0) {
1021 		return err;
1022 	}
1023 
1024 	return 0;
1025 }
1026 
mqtt_sn_unsubscribe(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name)1027 int mqtt_sn_unsubscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
1028 			struct mqtt_sn_data *topic_name)
1029 {
1030 	struct mqtt_sn_topic *topic;
1031 	int err;
1032 
1033 	if (!client || !topic_name) {
1034 		return -EINVAL;
1035 	}
1036 
1037 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1038 		LOG_ERR("Cannot unsubscribe: not connected");
1039 		return -ENOTCONN;
1040 	}
1041 
1042 	topic = mqtt_sn_topic_find_name(client, topic_name);
1043 	if (!topic) {
1044 		LOG_HEXDUMP_ERR(topic_name->data, topic_name->size, "Topic not found");
1045 		return -ENOENT;
1046 	}
1047 
1048 	topic->state = MQTT_SN_TOPIC_STATE_UNSUBSCRIBING;
1049 	mqtt_sn_con_init(&topic->con);
1050 
1051 	err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1052 	if (err < 0) {
1053 		return err;
1054 	}
1055 
1056 	return 0;
1057 }
1058 
mqtt_sn_publish(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name,bool retain,struct mqtt_sn_data * data)1059 int mqtt_sn_publish(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
1060 		    struct mqtt_sn_data *topic_name, bool retain, struct mqtt_sn_data *data)
1061 {
1062 	struct mqtt_sn_publish *pub;
1063 	struct mqtt_sn_topic *topic;
1064 	int err;
1065 
1066 	if (!client || !topic_name) {
1067 		return -EINVAL;
1068 	}
1069 
1070 	if (qos == MQTT_SN_QOS_M1) {
1071 		LOG_ERR("QoS -1 not supported");
1072 		return -ENOTSUP;
1073 	}
1074 
1075 	if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1076 		LOG_ERR("Cannot publish: disconnected");
1077 		return -ENOTCONN;
1078 	}
1079 
1080 	topic = mqtt_sn_topic_find_name(client, topic_name);
1081 	if (!topic) {
1082 		topic = mqtt_sn_topic_create(topic_name);
1083 		if (!topic) {
1084 			return -ENOMEM;
1085 		}
1086 
1087 		topic->qos = qos;
1088 		topic->state = MQTT_SN_TOPIC_STATE_REGISTERING;
1089 		sys_slist_append(&client->topic, &topic->next);
1090 	}
1091 
1092 	pub = mqtt_sn_publish_create(data);
1093 	if (!pub) {
1094 		k_work_reschedule(&client->process_work, K_NO_WAIT);
1095 		return -ENOMEM;
1096 	}
1097 
1098 	pub->qos = qos;
1099 	pub->retain = retain;
1100 	pub->topic = topic;
1101 
1102 	sys_slist_append(&client->publish, &pub->next);
1103 
1104 	err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1105 	if (err < 0) {
1106 		return err;
1107 	}
1108 
1109 	return 0;
1110 }
1111 
handle_advertise(struct mqtt_sn_client * client,struct mqtt_sn_param_advertise * p,struct mqtt_sn_data rx_addr)1112 static void handle_advertise(struct mqtt_sn_client *client, struct mqtt_sn_param_advertise *p,
1113 			     struct mqtt_sn_data rx_addr)
1114 {
1115 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_ADVERTISE};
1116 	struct mqtt_sn_gateway *gw;
1117 
1118 	gw = mqtt_sn_gw_find_id(client, p->gw_id);
1119 
1120 	if (gw == NULL) {
1121 		LOG_DBG("Creating GW 0x%02x with duration %d", p->gw_id, p->duration);
1122 		gw = mqtt_sn_gw_create(p->gw_id, p->duration, rx_addr);
1123 		if (!gw) {
1124 			return;
1125 		}
1126 		sys_slist_append(&client->gateway, &gw->next);
1127 	} else {
1128 		LOG_DBG("Updating timer for GW 0x%02x with duration %d", p->gw_id, p->duration);
1129 		gw->adv_timer =
1130 			k_uptime_get() + (p->duration * CONFIG_MQTT_SN_LIB_N_ADV * MSEC_PER_SEC);
1131 	}
1132 
1133 	k_work_schedule(&client->process_work, K_NO_WAIT);
1134 	if (client->evt_cb) {
1135 		client->evt_cb(client, &evt);
1136 	}
1137 }
1138 
handle_searchgw(struct mqtt_sn_client * client,struct mqtt_sn_param_searchgw * p)1139 static void handle_searchgw(struct mqtt_sn_client *client, struct mqtt_sn_param_searchgw *p)
1140 {
1141 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_SEARCHGW};
1142 
1143 	/* Increment SEARCHGW transmission timestamp if waiting */
1144 	if (client->ts_searchgw != 0) {
1145 		client->ts_searchgw = k_uptime_get() + (T_SEARCHGW_MSEC * sys_rand8_get() / 255);
1146 	}
1147 
1148 	/* Set transmission timestamp to respond to SEARCHGW if we have a GW */
1149 	if (sys_slist_len(&client->gateway) > 0) {
1150 		client->ts_gwinfo = k_uptime_get() + (T_GWINFO_MSEC * sys_rand8_get() / 255);
1151 	}
1152 	client->radius_gwinfo = p->radius;
1153 	k_work_schedule(&client->process_work, K_NO_WAIT);
1154 
1155 	if (client->evt_cb) {
1156 		client->evt_cb(client, &evt);
1157 	}
1158 }
1159 
handle_gwinfo(struct mqtt_sn_client * client,struct mqtt_sn_param_gwinfo * p,struct mqtt_sn_data rx_addr)1160 static void handle_gwinfo(struct mqtt_sn_client *client, struct mqtt_sn_param_gwinfo *p,
1161 			  struct mqtt_sn_data rx_addr)
1162 {
1163 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_GWINFO};
1164 	struct mqtt_sn_gateway *gw;
1165 
1166 	/* Clear SEARCHGW and GWINFO transmission if waiting */
1167 	client->ts_searchgw = 0;
1168 	client->ts_gwinfo = 0;
1169 	k_work_schedule(&client->process_work, K_NO_WAIT);
1170 
1171 	/* Extract GW info and store */
1172 	if (p->gw_add.size > 0) {
1173 		rx_addr.data = p->gw_add.data;
1174 		rx_addr.size = p->gw_add.size;
1175 	} else {
1176 	}
1177 	gw = mqtt_sn_gw_create(p->gw_id, -1, rx_addr);
1178 
1179 	if (!gw) {
1180 		return;
1181 	}
1182 
1183 	sys_slist_append(&client->gateway, &gw->next);
1184 
1185 	if (client->evt_cb) {
1186 		client->evt_cb(client, &evt);
1187 	}
1188 }
1189 
handle_connack(struct mqtt_sn_client * client,struct mqtt_sn_param_connack * p)1190 static void handle_connack(struct mqtt_sn_client *client, struct mqtt_sn_param_connack *p)
1191 {
1192 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_CONNECTED};
1193 
1194 	if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1195 		LOG_INF("MQTT_SN client connected");
1196 		switch (client->state) {
1197 		case MQTT_SN_CLIENT_DISCONNECTED:
1198 		case MQTT_SN_CLIENT_ASLEEP:
1199 		case MQTT_SN_CLIENT_AWAKE:
1200 			mqtt_sn_set_state(client, MQTT_SN_CLIENT_ACTIVE);
1201 			if (client->evt_cb) {
1202 				client->evt_cb(client, &evt);
1203 			}
1204 			client->ping_retries = N_RETRY;
1205 			break;
1206 		default:
1207 			LOG_ERR("Client received CONNACK but was in state %d", client->state);
1208 			return;
1209 		}
1210 	} else {
1211 		LOG_WRN("CONNACK ret code %d", p->ret_code);
1212 		mqtt_sn_disconnect_internal(client);
1213 	}
1214 
1215 	k_work_schedule(&client->process_work, K_NO_WAIT);
1216 }
1217 
handle_willtopicreq(struct mqtt_sn_client * client)1218 static void handle_willtopicreq(struct mqtt_sn_client *client)
1219 {
1220 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLTOPIC};
1221 
1222 	response.params.willtopic.qos = client->will_qos;
1223 	response.params.willtopic.retain = client->will_retain;
1224 	response.params.willtopic.topic.data = client->will_topic.data;
1225 	response.params.willtopic.topic.size = client->will_topic.size;
1226 
1227 	encode_and_send(client, &response, 0);
1228 }
1229 
handle_willmsgreq(struct mqtt_sn_client * client)1230 static void handle_willmsgreq(struct mqtt_sn_client *client)
1231 {
1232 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLMSG};
1233 
1234 	response.params.willmsg.msg.data = client->will_msg.data;
1235 	response.params.willmsg.msg.size = client->will_msg.size;
1236 
1237 	encode_and_send(client, &response, 0);
1238 }
1239 
handle_register(struct mqtt_sn_client * client,struct mqtt_sn_param_register * p)1240 static void handle_register(struct mqtt_sn_client *client, struct mqtt_sn_param_register *p)
1241 {
1242 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_REGACK};
1243 	struct mqtt_sn_topic *topic;
1244 
1245 	topic = mqtt_sn_topic_create(&p->topic);
1246 	if (!topic) {
1247 		return;
1248 	}
1249 
1250 	topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
1251 	topic->topic_id = p->topic_id;
1252 	topic->type = MQTT_SN_TOPIC_TYPE_NORMAL;
1253 
1254 	sys_slist_append(&client->topic, &topic->next);
1255 
1256 	response.params.regack.ret_code = MQTT_SN_CODE_ACCEPTED;
1257 	response.params.regack.topic_id = p->topic_id;
1258 	response.params.regack.msg_id = p->msg_id;
1259 
1260 	encode_and_send(client, &response, 0);
1261 }
1262 
handle_regack(struct mqtt_sn_client * client,struct mqtt_sn_param_regack * p)1263 static void handle_regack(struct mqtt_sn_client *client, struct mqtt_sn_param_regack *p)
1264 {
1265 	struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
1266 
1267 	if (!topic) {
1268 		LOG_ERR("Can't REGACK, no topic found");
1269 		return;
1270 	}
1271 
1272 	if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1273 		topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
1274 		topic->topic_id = p->topic_id;
1275 	} else {
1276 		LOG_WRN("Gateway could not register topic ID %u, code %d", p->topic_id,
1277 			p->ret_code);
1278 	}
1279 }
1280 
handle_publish(struct mqtt_sn_client * client,struct mqtt_sn_param_publish * p)1281 static void handle_publish(struct mqtt_sn_client *client, struct mqtt_sn_param_publish *p)
1282 {
1283 	struct mqtt_sn_param response;
1284 	struct mqtt_sn_evt evt = {.param.publish = {.data = p->data,
1285 						    .topic_id = p->topic_id,
1286 						    .topic_type = p->topic_type},
1287 				  .type = MQTT_SN_EVT_PUBLISH};
1288 
1289 	if (p->qos == MQTT_SN_QOS_1) {
1290 		response.type = MQTT_SN_MSG_TYPE_PUBACK;
1291 		response.params.puback.topic_id = p->topic_id;
1292 		response.params.puback.msg_id = p->msg_id;
1293 		response.params.puback.ret_code = MQTT_SN_CODE_ACCEPTED;
1294 
1295 		encode_and_send(client, &response, 0);
1296 	} else if (p->qos == MQTT_SN_QOS_2) {
1297 		response.type = MQTT_SN_MSG_TYPE_PUBREC;
1298 		response.params.pubrec.msg_id = p->msg_id;
1299 
1300 		encode_and_send(client, &response, 0);
1301 	}
1302 
1303 	if (client->evt_cb) {
1304 		client->evt_cb(client, &evt);
1305 	}
1306 }
1307 
handle_puback(struct mqtt_sn_client * client,struct mqtt_sn_param_puback * p)1308 static void handle_puback(struct mqtt_sn_client *client, struct mqtt_sn_param_puback *p)
1309 {
1310 	struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
1311 
1312 	if (!pub) {
1313 		LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1314 		return;
1315 	}
1316 
1317 	mqtt_sn_publish_destroy(client, pub);
1318 }
1319 
handle_pubrec(struct mqtt_sn_client * client,struct mqtt_sn_param_pubrec * p)1320 static void handle_pubrec(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrec *p)
1321 {
1322 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBREL};
1323 	struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
1324 
1325 	if (!pub) {
1326 		LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1327 		return;
1328 	}
1329 
1330 	pub->con.last_attempt = k_uptime_get();
1331 	pub->con.retries = N_RETRY;
1332 
1333 	response.params.pubrel.msg_id = p->msg_id;
1334 
1335 	encode_and_send(client, &response, 0);
1336 }
1337 
handle_pubrel(struct mqtt_sn_client * client,struct mqtt_sn_param_pubrel * p)1338 static void handle_pubrel(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrel *p)
1339 {
1340 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBCOMP};
1341 
1342 	response.params.pubcomp.msg_id = p->msg_id;
1343 
1344 	encode_and_send(client, &response, 0);
1345 }
1346 
handle_pubcomp(struct mqtt_sn_client * client,struct mqtt_sn_param_pubcomp * p)1347 static void handle_pubcomp(struct mqtt_sn_client *client, struct mqtt_sn_param_pubcomp *p)
1348 {
1349 	struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
1350 
1351 	if (!pub) {
1352 		LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1353 		return;
1354 	}
1355 
1356 	mqtt_sn_publish_destroy(client, pub);
1357 }
1358 
handle_suback(struct mqtt_sn_client * client,struct mqtt_sn_param_suback * p)1359 static void handle_suback(struct mqtt_sn_client *client, struct mqtt_sn_param_suback *p)
1360 {
1361 	struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
1362 
1363 	if (!topic) {
1364 		LOG_ERR("No matching SUBSCRIBE found for msg id %u", p->msg_id);
1365 		return;
1366 	}
1367 
1368 	if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1369 		topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBED;
1370 		topic->topic_id = p->topic_id;
1371 		topic->qos = p->qos;
1372 	} else {
1373 		LOG_WRN("SUBACK with ret code %d", p->ret_code);
1374 	}
1375 }
1376 
handle_unsuback(struct mqtt_sn_client * client,struct mqtt_sn_param_unsuback * p)1377 static void handle_unsuback(struct mqtt_sn_client *client, struct mqtt_sn_param_unsuback *p)
1378 {
1379 	struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
1380 
1381 	if (!topic || topic->state != MQTT_SN_TOPIC_STATE_UNSUBSCRIBING) {
1382 		LOG_ERR("No matching UNSUBSCRIBE found for msg id %u", p->msg_id);
1383 		return;
1384 	}
1385 
1386 	mqtt_sn_topic_destroy(client, topic);
1387 }
1388 
handle_pingreq(struct mqtt_sn_client * client)1389 static void handle_pingreq(struct mqtt_sn_client *client)
1390 {
1391 	struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PINGRESP};
1392 
1393 	encode_and_send(client, &response, 0);
1394 }
1395 
handle_pingresp(struct mqtt_sn_client * client)1396 static void handle_pingresp(struct mqtt_sn_client *client)
1397 {
1398 	struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_PINGRESP};
1399 
1400 	if (client->evt_cb) {
1401 		client->evt_cb(client, &evt);
1402 	}
1403 
1404 	if (client->state == MQTT_SN_CLIENT_AWAKE) {
1405 		mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
1406 	}
1407 
1408 	client->ping_retries = N_RETRY;
1409 }
1410 
handle_disconnect(struct mqtt_sn_client * client,struct mqtt_sn_param_disconnect * p)1411 static void handle_disconnect(struct mqtt_sn_client *client, struct mqtt_sn_param_disconnect *p)
1412 {
1413 	LOG_INF("Received DISCONNECT");
1414 	mqtt_sn_disconnect_internal(client);
1415 }
1416 
handle_msg(struct mqtt_sn_client * client,struct mqtt_sn_data rx_addr)1417 static int handle_msg(struct mqtt_sn_client *client, struct mqtt_sn_data rx_addr)
1418 {
1419 	int err;
1420 	struct mqtt_sn_param p;
1421 
1422 	err = mqtt_sn_decode_msg(&client->rx, &p);
1423 	if (err) {
1424 		return err;
1425 	}
1426 
1427 	LOG_INF("Got message of type %d", p.type);
1428 
1429 	switch (p.type) {
1430 	case MQTT_SN_MSG_TYPE_ADVERTISE:
1431 		handle_advertise(client, &p.params.advertise, rx_addr);
1432 		break;
1433 	case MQTT_SN_MSG_TYPE_SEARCHGW:
1434 		handle_searchgw(client, &p.params.searchgw);
1435 		break;
1436 	case MQTT_SN_MSG_TYPE_GWINFO:
1437 		handle_gwinfo(client, &p.params.gwinfo, rx_addr);
1438 		break;
1439 	case MQTT_SN_MSG_TYPE_CONNACK:
1440 		handle_connack(client, &p.params.connack);
1441 		break;
1442 	case MQTT_SN_MSG_TYPE_WILLTOPICREQ:
1443 		handle_willtopicreq(client);
1444 		break;
1445 	case MQTT_SN_MSG_TYPE_WILLMSGREQ:
1446 		handle_willmsgreq(client);
1447 		break;
1448 	case MQTT_SN_MSG_TYPE_REGISTER:
1449 		handle_register(client, &p.params.reg);
1450 		break;
1451 	case MQTT_SN_MSG_TYPE_REGACK:
1452 		handle_regack(client, &p.params.regack);
1453 		break;
1454 	case MQTT_SN_MSG_TYPE_PUBLISH:
1455 		handle_publish(client, &p.params.publish);
1456 		break;
1457 	case MQTT_SN_MSG_TYPE_PUBACK:
1458 		handle_puback(client, &p.params.puback);
1459 		break;
1460 	case MQTT_SN_MSG_TYPE_PUBREC:
1461 		handle_pubrec(client, &p.params.pubrec);
1462 		break;
1463 	case MQTT_SN_MSG_TYPE_PUBREL:
1464 		handle_pubrel(client, &p.params.pubrel);
1465 		break;
1466 	case MQTT_SN_MSG_TYPE_PUBCOMP:
1467 		handle_pubcomp(client, &p.params.pubcomp);
1468 		break;
1469 	case MQTT_SN_MSG_TYPE_SUBACK:
1470 		handle_suback(client, &p.params.suback);
1471 		break;
1472 	case MQTT_SN_MSG_TYPE_UNSUBACK:
1473 		handle_unsuback(client, &p.params.unsuback);
1474 		break;
1475 	case MQTT_SN_MSG_TYPE_PINGREQ:
1476 		handle_pingreq(client);
1477 		break;
1478 	case MQTT_SN_MSG_TYPE_PINGRESP:
1479 		handle_pingresp(client);
1480 		break;
1481 	case MQTT_SN_MSG_TYPE_DISCONNECT:
1482 		handle_disconnect(client, &p.params.disconnect);
1483 		break;
1484 	case MQTT_SN_MSG_TYPE_WILLTOPICRESP:
1485 		break;
1486 	case MQTT_SN_MSG_TYPE_WILLMSGRESP:
1487 		break;
1488 	default:
1489 		LOG_ERR("Unexpected message type %d", p.type);
1490 		break;
1491 	}
1492 
1493 	k_work_reschedule(&client->process_work, K_NO_WAIT);
1494 
1495 	return 0;
1496 }
1497 
mqtt_sn_input(struct mqtt_sn_client * client)1498 int mqtt_sn_input(struct mqtt_sn_client *client)
1499 {
1500 	ssize_t next_frame_size;
1501 	char addr[CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE];
1502 	struct mqtt_sn_data rx_addr = {.data = addr, .size = CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE};
1503 	int err;
1504 
1505 	if (!client || !client->transport || !client->transport->recvfrom) {
1506 		return -EINVAL;
1507 	}
1508 
1509 	if (client->transport->poll) {
1510 		next_frame_size = client->transport->poll(client);
1511 		if (next_frame_size <= 0) {
1512 			return next_frame_size;
1513 		}
1514 	}
1515 
1516 	net_buf_simple_reset(&client->rx);
1517 
1518 	next_frame_size = client->transport->recvfrom(client, client->rx.data, client->rx.size,
1519 						      (void *)rx_addr.data, &rx_addr.size);
1520 	if (next_frame_size <= 0) {
1521 		return next_frame_size;
1522 	}
1523 
1524 	if (next_frame_size > client->rx.size) {
1525 		return -ENOBUFS;
1526 	}
1527 
1528 	client->rx.len = next_frame_size;
1529 
1530 	LOG_HEXDUMP_DBG(client->rx.data, client->rx.len, "Received data");
1531 
1532 	err = handle_msg(client, rx_addr);
1533 
1534 	if (err) {
1535 		return err;
1536 	}
1537 
1538 	/* Should be zero */
1539 	return -client->rx.len;
1540 }
1541 
mqtt_sn_get_topic_name(struct mqtt_sn_client * client,uint16_t id,struct mqtt_sn_data * topic_name)1542 int mqtt_sn_get_topic_name(struct mqtt_sn_client *client, uint16_t id,
1543 			   struct mqtt_sn_data *topic_name)
1544 {
1545 	struct mqtt_sn_topic *topic;
1546 
1547 	if (!client || !topic_name) {
1548 		return -EINVAL;
1549 	}
1550 
1551 	SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
1552 		if (topic->topic_id == id) {
1553 			topic_name->data = (const uint8_t *)topic->name;
1554 			topic_name->size = topic->namelen;
1555 			return 0;
1556 		}
1557 	}
1558 	return -ENOENT;
1559 }
1560