1 /*
2 * Copyright (c) 2022 grandcentrix GmbH
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /** @file mqtt_sn.c
8 *
9 * @brief MQTT-SN Client API Implementation.
10 */
11
12 #include "mqtt_sn_msg.h"
13
14 #include <zephyr/logging/log.h>
15 #include <zephyr/random/random.h>
16 #include <zephyr/net/mqtt_sn.h>
17 LOG_MODULE_REGISTER(net_mqtt_sn, CONFIG_MQTT_SN_LOG_LEVEL);
18
19 #define MQTT_SN_NET_BUFS (CONFIG_MQTT_SN_LIB_MAX_PUBLISH)
20
21 NET_BUF_POOL_FIXED_DEFINE(mqtt_sn_messages, MQTT_SN_NET_BUFS, CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE,
22 0, NULL);
23
24 /**
25 * A struct to track attempts for actions that require acknowledgment,
26 * i.e. topic registering, subscribing, or unsubscribing.
27 */
28 struct mqtt_sn_confirmable {
29 int64_t last_attempt;
30 uint16_t msg_id;
31 uint8_t retries;
32 };
33
34 struct mqtt_sn_publish {
35 struct mqtt_sn_confirmable con;
36 sys_snode_t next;
37 struct mqtt_sn_topic *topic;
38 uint8_t pubdata[CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE];
39 size_t datalen;
40 enum mqtt_sn_qos qos;
41 bool retain;
42 };
43
44 enum mqtt_sn_topic_state {
45 MQTT_SN_TOPIC_STATE_REGISTER, /*!< Topic requested to be registered */
46 MQTT_SN_TOPIC_STATE_REGISTERING, /*!< Topic in progress of registering */
47 MQTT_SN_TOPIC_STATE_REGISTERED, /*!< Topic registered */
48 MQTT_SN_TOPIC_STATE_SUBSCRIBE, /*!< Topic requested to subscribe */
49 MQTT_SN_TOPIC_STATE_SUBSCRIBING, /*!< Topic in progress of subscribing */
50 MQTT_SN_TOPIC_STATE_SUBSCRIBED, /*!< Topic subscribed */
51 MQTT_SN_TOPIC_STATE_UNSUBSCRIBE, /*!< Topic requested to unsubscribe */
52 MQTT_SN_TOPIC_STATE_UNSUBSCRIBING, /*!< Topic in progress of unsubscribing */
53 };
54
55 struct mqtt_sn_topic {
56 struct mqtt_sn_confirmable con;
57 sys_snode_t next;
58 char name[CONFIG_MQTT_SN_LIB_MAX_TOPIC_SIZE];
59 size_t namelen;
60 uint16_t topic_id;
61 enum mqtt_sn_qos qos;
62 enum mqtt_sn_topic_type type;
63 enum mqtt_sn_topic_state state;
64 };
65
66 struct mqtt_sn_gateway {
67 sys_snode_t next;
68 char gw_id;
69 int64_t adv_timer;
70 char addr[CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE];
71 size_t addr_len;
72 };
73
74 K_MEM_SLAB_DEFINE_STATIC(publishes, sizeof(struct mqtt_sn_publish), CONFIG_MQTT_SN_LIB_MAX_PUBLISH,
75 4);
76 K_MEM_SLAB_DEFINE_STATIC(topics, sizeof(struct mqtt_sn_topic), CONFIG_MQTT_SN_LIB_MAX_TOPICS, 4);
77 K_MEM_SLAB_DEFINE_STATIC(gateways, sizeof(struct mqtt_sn_gateway), CONFIG_MQTT_SN_LIB_MAX_GATEWAYS,
78 4);
79
80 enum mqtt_sn_client_state {
81 MQTT_SN_CLIENT_DISCONNECTED,
82 MQTT_SN_CLIENT_ACTIVE,
83 MQTT_SN_CLIENT_ASLEEP,
84 MQTT_SN_CLIENT_AWAKE
85 };
86
mqtt_sn_set_state(struct mqtt_sn_client * client,enum mqtt_sn_client_state state)87 static void mqtt_sn_set_state(struct mqtt_sn_client *client, enum mqtt_sn_client_state state)
88 {
89 int prev_state = client->state;
90
91 client->state = state;
92 LOG_DBG("Client %p state (%d) -> (%d)", client, prev_state, state);
93 }
94
95 #define T_SEARCHGW_MSEC (CONFIG_MQTT_SN_LIB_T_SEARCHGW * MSEC_PER_SEC)
96 #define T_GWINFO_MSEC (CONFIG_MQTT_SN_LIB_T_GWINFO * MSEC_PER_SEC)
97 #define T_RETRY_MSEC (CONFIG_MQTT_SN_LIB_T_RETRY * MSEC_PER_SEC)
98 #define N_RETRY (CONFIG_MQTT_SN_LIB_N_RETRY)
99 #define T_KEEPALIVE_MSEC (CONFIG_MQTT_SN_KEEPALIVE * MSEC_PER_SEC)
100
next_msg_id(void)101 static uint16_t next_msg_id(void)
102 {
103 static uint16_t msg_id;
104
105 return ++msg_id;
106 }
107
encode_and_send(struct mqtt_sn_client * client,struct mqtt_sn_param * p,uint8_t broadcast_radius)108 static int encode_and_send(struct mqtt_sn_client *client, struct mqtt_sn_param *p,
109 uint8_t broadcast_radius)
110 {
111 int err;
112
113 err = mqtt_sn_encode_msg(&client->tx, p);
114 if (err) {
115 goto end;
116 }
117
118 LOG_HEXDUMP_DBG(client->tx.data, client->tx.len, "Send message");
119
120 if (!client->transport->sendto) {
121 LOG_ERR("Can't send: no callback");
122 err = -ENOTSUP;
123 goto end;
124 }
125
126 if (!client->tx.len) {
127 LOG_WRN("Can't send: empty");
128 err = -ENOMSG;
129 goto end;
130 }
131
132 if (broadcast_radius) {
133 err = client->transport->sendto(client, client->tx.data, client->tx.len, NULL,
134 broadcast_radius);
135 } else {
136 struct mqtt_sn_gateway *gw;
137
138 gw = SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
139 if (gw == NULL || gw->addr_len == 0) {
140 LOG_WRN("No Gateway Address");
141 err = -ENXIO;
142 goto end;
143 }
144 err = client->transport->sendto(client, client->tx.data, client->tx.len, gw->addr,
145 gw->addr_len);
146 }
147
148 end:
149 if (err) {
150 LOG_ERR("Error during send: %d", err);
151 }
152 net_buf_simple_reset(&client->tx);
153
154 return err;
155 }
156
mqtt_sn_con_init(struct mqtt_sn_confirmable * con)157 static void mqtt_sn_con_init(struct mqtt_sn_confirmable *con)
158 {
159 con->last_attempt = 0;
160 con->retries = N_RETRY;
161 con->msg_id = next_msg_id();
162 }
163
mqtt_sn_publish_destroy(struct mqtt_sn_client * client,struct mqtt_sn_publish * pub)164 static void mqtt_sn_publish_destroy(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub)
165 {
166 sys_slist_find_and_remove(&client->publish, &pub->next);
167 k_mem_slab_free(&publishes, (void *)pub);
168 }
169
mqtt_sn_publish_destroy_all(struct mqtt_sn_client * client)170 static void mqtt_sn_publish_destroy_all(struct mqtt_sn_client *client)
171 {
172 struct mqtt_sn_publish *pub;
173 sys_snode_t *next;
174
175 while ((next = sys_slist_get(&client->publish)) != NULL) {
176 pub = SYS_SLIST_CONTAINER(next, pub, next);
177 k_mem_slab_free(&publishes, (void *)pub);
178 }
179 }
180
mqtt_sn_publish_create(struct mqtt_sn_data * data)181 static struct mqtt_sn_publish *mqtt_sn_publish_create(struct mqtt_sn_data *data)
182 {
183 struct mqtt_sn_publish *pub;
184
185 if (k_mem_slab_alloc(&publishes, (void **)&pub, K_NO_WAIT)) {
186 LOG_ERR("Can't create PUB: no free slot");
187 return NULL;
188 }
189
190 memset(pub, 0, sizeof(*pub));
191
192 if (data && data->data && data->size) {
193 if (data->size > sizeof(pub->pubdata)) {
194 LOG_ERR("Can't create PUB: Too much data (%zu)", data->size);
195 return NULL;
196 }
197
198 memcpy(pub->pubdata, data->data, data->size);
199 pub->datalen = data->size;
200 }
201
202 mqtt_sn_con_init(&pub->con);
203
204 return pub;
205 }
206
mqtt_sn_publish_find_by_msg_id(struct mqtt_sn_client * client,uint16_t msg_id)207 static struct mqtt_sn_publish *mqtt_sn_publish_find_by_msg_id(struct mqtt_sn_client *client,
208 uint16_t msg_id)
209 {
210 struct mqtt_sn_publish *pub;
211
212 SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
213 if (pub->con.msg_id == msg_id) {
214 return pub;
215 }
216 }
217
218 return NULL;
219 }
220
mqtt_sn_publish_find_by_topic(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)221 static struct mqtt_sn_publish *mqtt_sn_publish_find_by_topic(struct mqtt_sn_client *client,
222 struct mqtt_sn_topic *topic)
223 {
224 struct mqtt_sn_publish *pub;
225
226 SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
227 if (pub->topic == topic) {
228 return pub;
229 }
230 }
231
232 return NULL;
233 }
234
mqtt_sn_topic_create(struct mqtt_sn_data * name)235 static struct mqtt_sn_topic *mqtt_sn_topic_create(struct mqtt_sn_data *name)
236 {
237 struct mqtt_sn_topic *topic;
238
239 if (k_mem_slab_alloc(&topics, (void **)&topic, K_NO_WAIT)) {
240 LOG_ERR("Can't create topic: no free slot");
241 return NULL;
242 }
243
244 memset(topic, 0, sizeof(*topic));
245
246 if (!name || !name->data || !name->size) {
247 LOG_ERR("Can't create topic with empty name");
248 return NULL;
249 }
250
251 if (name->size > sizeof(topic->name)) {
252 LOG_ERR("Can't create topic: name too long (%zu)", name->size);
253 return NULL;
254 }
255
256 memcpy(topic->name, name->data, name->size);
257 topic->namelen = name->size;
258
259 mqtt_sn_con_init(&topic->con);
260
261 return topic;
262 }
263
mqtt_sn_topic_find_by_name(struct mqtt_sn_client * client,struct mqtt_sn_data * topic_name)264 static struct mqtt_sn_topic *mqtt_sn_topic_find_by_name(struct mqtt_sn_client *client,
265 struct mqtt_sn_data *topic_name)
266 {
267 struct mqtt_sn_topic *topic;
268
269 SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
270 if (topic->namelen == topic_name->size &&
271 memcmp(topic->name, topic_name->data, topic_name->size) == 0) {
272 return topic;
273 }
274 }
275
276 return NULL;
277 }
278
mqtt_sn_topic_find_by_msg_id(struct mqtt_sn_client * client,uint16_t msg_id)279 static struct mqtt_sn_topic *mqtt_sn_topic_find_by_msg_id(struct mqtt_sn_client *client,
280 uint16_t msg_id)
281 {
282 struct mqtt_sn_topic *topic;
283
284 SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
285 if (topic->con.msg_id == msg_id) {
286 return topic;
287 }
288 }
289
290 return NULL;
291 }
292
mqtt_sn_topic_destroy(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)293 static void mqtt_sn_topic_destroy(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
294 {
295 struct mqtt_sn_publish *pub;
296
297 /* Destroy all pubs referencing this topic */
298 while ((pub = mqtt_sn_publish_find_by_topic(client, topic)) != NULL) {
299 LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
300 mqtt_sn_publish_destroy(client, pub);
301 }
302
303 sys_slist_find_and_remove(&client->topic, &topic->next);
304 }
305
mqtt_sn_topic_destroy_all(struct mqtt_sn_client * client)306 static void mqtt_sn_topic_destroy_all(struct mqtt_sn_client *client)
307 {
308 struct mqtt_sn_topic *topic;
309 struct mqtt_sn_publish *pub;
310 sys_snode_t *next;
311
312 while ((next = sys_slist_get(&client->topic)) != NULL) {
313 topic = SYS_SLIST_CONTAINER(next, topic, next);
314 /* Destroy all pubs referencing this topic */
315 while ((pub = mqtt_sn_publish_find_by_topic(client, topic)) != NULL) {
316 LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
317 mqtt_sn_publish_destroy(client, pub);
318 }
319
320 k_mem_slab_free(&topics, (void *)topic);
321 }
322 }
323
mqtt_sn_gw_destroy(struct mqtt_sn_client * client,struct mqtt_sn_gateway * gw)324 static void mqtt_sn_gw_destroy(struct mqtt_sn_client *client, struct mqtt_sn_gateway *gw)
325 {
326 LOG_DBG("Destroying gateway %d", gw->gw_id);
327 sys_slist_find_and_remove(&client->gateway, &gw->next);
328 k_mem_slab_free(&gateways, (void *)gw);
329 }
330
mqtt_sn_gw_destroy_all(struct mqtt_sn_client * client)331 static void mqtt_sn_gw_destroy_all(struct mqtt_sn_client *client)
332 {
333 struct mqtt_sn_gateway *gw;
334 sys_snode_t *next;
335
336 while ((next = sys_slist_get(&client->gateway)) != NULL) {
337 gw = SYS_SLIST_CONTAINER(next, gw, next);
338 sys_slist_find_and_remove(&client->gateway, next);
339 k_mem_slab_free(&gateways, (void *)gw);
340 }
341 }
342
mqtt_sn_gw_create(uint8_t gw_id,short duration,struct mqtt_sn_data gw_addr)343 static struct mqtt_sn_gateway *mqtt_sn_gw_create(uint8_t gw_id, short duration,
344 struct mqtt_sn_data gw_addr)
345 {
346 struct mqtt_sn_gateway *gw;
347
348 LOG_DBG("Free GW slots: %d", k_mem_slab_num_free_get(&gateways));
349 if (k_mem_slab_alloc(&gateways, (void **)&gw, K_NO_WAIT)) {
350 LOG_WRN("Can't create GW: no free slot");
351 return NULL;
352 }
353
354 __ASSERT(gw_addr.size < CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE,
355 "Gateway address is larger than allowed by CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE");
356
357 memset(gw, 0, sizeof(*gw));
358 memcpy(gw->addr, gw_addr.data, gw_addr.size);
359 gw->addr_len = gw_addr.size;
360 gw->gw_id = gw_id;
361 if (duration == -1) {
362 gw->adv_timer = duration;
363 } else {
364 gw->adv_timer =
365 k_uptime_get() + (duration * CONFIG_MQTT_SN_LIB_N_ADV * MSEC_PER_SEC);
366 }
367
368 return gw;
369 }
370
mqtt_sn_gw_find_by_id(struct mqtt_sn_client * client,uint16_t gw_id)371 static struct mqtt_sn_gateway *mqtt_sn_gw_find_by_id(struct mqtt_sn_client *client, uint16_t gw_id)
372 {
373 struct mqtt_sn_gateway *gw;
374
375 SYS_SLIST_FOR_EACH_CONTAINER(&client->gateway, gw, next) {
376 if (gw->gw_id == gw_id) {
377 return gw;
378 }
379 }
380
381 return NULL;
382 }
383
mqtt_sn_disconnect_internal(struct mqtt_sn_client * client)384 static void mqtt_sn_disconnect_internal(struct mqtt_sn_client *client)
385 {
386 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
387
388 mqtt_sn_set_state(client, MQTT_SN_CLIENT_DISCONNECTED);
389 if (client->evt_cb) {
390 client->evt_cb(client, &evt);
391 }
392
393 /*
394 * Remove all publishes, but keep topics
395 * Topics are removed on deinit or when connect is called with
396 * clean-session = true
397 */
398 mqtt_sn_publish_destroy_all(client);
399
400 k_work_cancel_delayable(&client->process_work);
401 }
402
mqtt_sn_sleep_internal(struct mqtt_sn_client * client)403 static void mqtt_sn_sleep_internal(struct mqtt_sn_client *client)
404 {
405 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
406
407 mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
408 if (client->evt_cb) {
409 client->evt_cb(client, &evt);
410 }
411 }
412
413 /**
414 * @brief Internal function to send a SUBSCRIBE message for a topic.
415 *
416 * @param client
417 * @param topic
418 * @param dup DUP flag - see MQTT-SN spec
419 */
mqtt_sn_do_subscribe(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic,bool dup)420 static void mqtt_sn_do_subscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic,
421 bool dup)
422 {
423 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_SUBSCRIBE};
424
425 if (!client || !topic) {
426 return;
427 }
428
429 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
430 LOG_ERR("Cannot subscribe: not connected");
431 return;
432 }
433
434 p.params.subscribe.msg_id = topic->con.msg_id;
435 p.params.subscribe.qos = topic->qos;
436 p.params.subscribe.topic_type = topic->type;
437 p.params.subscribe.dup = dup;
438 switch (topic->type) {
439 case MQTT_SN_TOPIC_TYPE_NORMAL:
440 p.params.subscribe.topic.topic_name.data = topic->name;
441 p.params.subscribe.topic.topic_name.size = topic->namelen;
442 break;
443 case MQTT_SN_TOPIC_TYPE_PREDEF:
444 case MQTT_SN_TOPIC_TYPE_SHORT:
445 p.params.subscribe.topic.topic_id = topic->topic_id;
446 break;
447 default:
448 LOG_ERR("Unexpected topic type %d", topic->type);
449 return;
450 }
451
452 encode_and_send(client, &p, 0);
453 }
454
455 /**
456 * @brief Internal function to send an UNSUBSCRIBE message for a topic.
457 *
458 * @param client
459 * @param topic
460 */
mqtt_sn_do_unsubscribe(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)461 static void mqtt_sn_do_unsubscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
462 {
463 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_UNSUBSCRIBE};
464
465 if (!client || !topic) {
466 return;
467 }
468
469 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
470 LOG_ERR("Cannot unsubscribe: not connected");
471 return;
472 }
473
474 p.params.unsubscribe.msg_id = topic->con.msg_id;
475 p.params.unsubscribe.topic_type = topic->type;
476 switch (topic->type) {
477 case MQTT_SN_TOPIC_TYPE_NORMAL:
478 p.params.unsubscribe.topic.topic_name.data = topic->name;
479 p.params.unsubscribe.topic.topic_name.size = topic->namelen;
480 break;
481 case MQTT_SN_TOPIC_TYPE_PREDEF:
482 case MQTT_SN_TOPIC_TYPE_SHORT:
483 p.params.unsubscribe.topic.topic_id = topic->topic_id;
484 break;
485 default:
486 LOG_ERR("Unexpected topic type %d", topic->type);
487 return;
488 }
489
490 encode_and_send(client, &p, 0);
491 }
492
493 /**
494 * @brief Internal function to a register a topic with the MQTT-SN gateway.
495 *
496 * @param client
497 * @param topic
498 */
mqtt_sn_do_register(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)499 static void mqtt_sn_do_register(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
500 {
501 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_REGISTER};
502
503 if (!client || !topic) {
504 return;
505 }
506
507 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
508 LOG_ERR("Cannot register: not connected");
509 return;
510 }
511
512 p.params.reg.msg_id = topic->con.msg_id;
513 switch (topic->type) {
514 case MQTT_SN_TOPIC_TYPE_NORMAL:
515 LOG_HEXDUMP_INF(topic->name, topic->namelen, "Registering topic");
516 p.params.reg.topic.data = topic->name;
517 p.params.reg.topic.size = topic->namelen;
518 break;
519 default:
520 LOG_ERR("Unexpected topic type %d", topic->type);
521 return;
522 }
523
524 encode_and_send(client, &p, 0);
525 }
526
527 /**
528 * @brief Internal function to send a PUBLISH message.
529 *
530 * Note that this function does not do sanity checks regarding the pub's topic.
531 *
532 * @param client
533 * @param pub
534 * @param dup DUP flag - see MQTT-SN spec.
535 */
mqtt_sn_do_publish(struct mqtt_sn_client * client,struct mqtt_sn_publish * pub,bool dup)536 static void mqtt_sn_do_publish(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub, bool dup)
537 {
538 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PUBLISH};
539
540 if (!client || !pub) {
541 return;
542 }
543
544 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
545 LOG_ERR("Cannot subscribe: not connected");
546 return;
547 }
548
549 LOG_INF("Publishing to topic ID %d", pub->topic->topic_id);
550
551 p.params.publish.data.data = pub->pubdata;
552 p.params.publish.data.size = pub->datalen;
553 p.params.publish.msg_id = pub->con.msg_id;
554 p.params.publish.retain = pub->retain;
555 p.params.publish.topic_id = pub->topic->topic_id;
556 p.params.publish.topic_type = pub->topic->type;
557 p.params.publish.qos = pub->qos;
558 p.params.publish.dup = dup;
559
560 encode_and_send(client, &p, 0);
561 }
562
563 /**
564 * @brief Internal function to send a SEARCHGW message.
565 *
566 * @param client
567 */
mqtt_sn_do_searchgw(struct mqtt_sn_client * client)568 static void mqtt_sn_do_searchgw(struct mqtt_sn_client *client)
569 {
570 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_SEARCHGW};
571
572 p.params.searchgw.radius = CONFIG_MQTT_SN_LIB_BROADCAST_RADIUS;
573
574 encode_and_send(client, &p, CONFIG_MQTT_SN_LIB_BROADCAST_RADIUS);
575 }
576
577 /**
578 * @brief Internal function to send a GWINFO message.
579 *
580 * @param client
581 */
mqtt_sn_do_gwinfo(struct mqtt_sn_client * client)582 static void mqtt_sn_do_gwinfo(struct mqtt_sn_client *client)
583 {
584 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_GWINFO};
585 struct mqtt_sn_gateway *gw;
586 struct mqtt_sn_data addr;
587
588 gw = SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
589
590 if (gw == NULL || gw->addr_len == 0) {
591 LOG_WRN("No Gateway Address");
592 return;
593 }
594
595 response.params.gwinfo.gw_id = gw->gw_id;
596 addr.data = gw->addr;
597 addr.size = gw->addr_len;
598 response.params.gwinfo.gw_add = addr;
599
600 encode_and_send(client, &response, client->radius_gwinfo);
601 }
602
603 /**
604 * @brief Internal function to send a PINGREQ message.
605 *
606 * @param client
607 */
mqtt_sn_do_ping(struct mqtt_sn_client * client)608 static void mqtt_sn_do_ping(struct mqtt_sn_client *client)
609 {
610 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PINGREQ};
611
612 switch (client->state) {
613 case MQTT_SN_CLIENT_ASLEEP:
614 /*
615 * From the spec regarding PINGREQ:
616 * ClientId: contains the client id; this field is optional
617 * and is included by a “sleeping” client when it goes to the
618 * “awake” state and is waiting for messages sent by the
619 * server/gateway, see Section 6.14 for further details.
620 */
621 p.params.pingreq.client_id.data = client->client_id.data;
622 p.params.pingreq.client_id.size = client->client_id.size;
623 case MQTT_SN_CLIENT_ACTIVE:
624 encode_and_send(client, &p, 0);
625 break;
626 default:
627 LOG_WRN("Can't ping in state %d", client->state);
628 break;
629 }
630 }
631
632 /**
633 * @brief Process all publish tasks in the queue.
634 *
635 * @param client
636 * @param next_cycle will be set to the time when the next action is required
637 *
638 * @retval 0 on success
639 * @retval -ETIMEDOUT when a publish task ran out of retries
640 */
process_pubs(struct mqtt_sn_client * client,int64_t * next_cycle)641 static int process_pubs(struct mqtt_sn_client *client, int64_t *next_cycle)
642 {
643 struct mqtt_sn_publish *pub, *pubs;
644 const int64_t now = k_uptime_get();
645 int64_t next_attempt;
646 bool dup; /* dup flag if message is resent */
647
648 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->publish, pub, pubs, next) {
649 LOG_HEXDUMP_DBG(pub->topic->name, pub->topic->namelen,
650 "Processing publish for topic");
651 LOG_HEXDUMP_DBG(pub->pubdata, pub->datalen, "Processing publish data");
652
653 if (pub->con.last_attempt == 0) {
654 next_attempt = 0;
655 dup = false;
656 } else {
657 next_attempt = pub->con.last_attempt + T_RETRY_MSEC;
658 dup = true;
659 }
660
661 /* Check if action is due */
662 if (next_attempt <= now) {
663 switch (pub->topic->state) {
664 case MQTT_SN_TOPIC_STATE_REGISTERED:
665 case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
666 if (!pub->con.retries--) {
667 LOG_WRN("PUB ran out of retries, disconnecting");
668 mqtt_sn_disconnect_internal(client);
669 return -ETIMEDOUT;
670 }
671 mqtt_sn_do_publish(client, pub, dup);
672 if (pub->qos == MQTT_SN_QOS_0 || pub->qos == MQTT_SN_QOS_M1) {
673 /* We are done, remove this */
674 mqtt_sn_publish_destroy(client, pub);
675 continue;
676 } else {
677 /* Wait for ack */
678 pub->con.last_attempt = now;
679 next_attempt = now + T_RETRY_MSEC;
680 }
681 break;
682 default:
683 LOG_INF("Can't publish; topic is not ready");
684 break;
685 }
686 }
687
688 /* Remember time when next action is due */
689 if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
690 *next_cycle = next_attempt;
691 }
692 }
693
694 LOG_DBG("next_cycle: %lld", *next_cycle);
695
696 return 0;
697 }
698
699 /**
700 * @brief Process all topic tasks in the queue.
701 *
702 * @param client
703 * @param next_cycle will be set to the time when the next action is required
704 *
705 * @retval 0 on success
706 * @retval -ETIMEDOUT when a publish task ran out of retries
707 */
process_topics(struct mqtt_sn_client * client,int64_t * next_cycle)708 static int process_topics(struct mqtt_sn_client *client, int64_t *next_cycle)
709 {
710 struct mqtt_sn_topic *topic;
711 const int64_t now = k_uptime_get();
712 int64_t next_attempt;
713
714 bool subscribing = false;
715 bool registering = false;
716
717 bool dup; /* dup flag if message is resent */
718
719 /* First pass to check for REGISTERING, SUBSCRIBING, UNSUBSCRIBING */
720 SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
721 switch (topic->state) {
722 case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
723 case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
724 subscribing = true;
725 break;
726 case MQTT_SN_TOPIC_STATE_REGISTERING:
727 registering = true;
728 break;
729 default:
730 break;
731 }
732 }
733
734 /* Second pass */
735 SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
736 LOG_HEXDUMP_DBG(topic->name, topic->namelen, "Processing topic");
737
738 if (topic->con.last_attempt == 0) {
739 next_attempt = 0;
740 dup = false;
741 } else {
742 next_attempt = topic->con.last_attempt + T_RETRY_MSEC;
743 dup = true;
744 }
745
746 /* Check if action is due */
747 if (next_attempt <= now) {
748 switch (topic->state) {
749 case MQTT_SN_TOPIC_STATE_SUBSCRIBE:
750 if (subscribing) {
751 /*
752 * Only one topic can be subscribing or unsubscribing
753 * at the same time
754 */
755 break;
756 }
757 topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBING;
758 LOG_INF("Topic subscription now in progress");
759 __fallthrough;
760 case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
761 subscribing = true;
762 if (!topic->con.retries--) {
763 LOG_WRN("Topic ran out of retries, disconnecting");
764 mqtt_sn_disconnect_internal(client);
765 return -ETIMEDOUT;
766 }
767
768 mqtt_sn_do_subscribe(client, topic, dup);
769 topic->con.last_attempt = now;
770 next_attempt = now + T_RETRY_MSEC;
771 break;
772 case MQTT_SN_TOPIC_STATE_REGISTER:
773 if (registering) {
774 /* Only one topic can be registering at the same time */
775 break;
776 }
777 topic->state = MQTT_SN_TOPIC_STATE_REGISTERING;
778 LOG_INF("Topic registration now in progress");
779 __fallthrough;
780 case MQTT_SN_TOPIC_STATE_REGISTERING:
781 registering = true;
782 if (!topic->con.retries--) {
783 LOG_WRN("Topic ran out of retries, disconnecting");
784 mqtt_sn_disconnect_internal(client);
785 return -ETIMEDOUT;
786 }
787
788 mqtt_sn_do_register(client, topic);
789 topic->con.last_attempt = now;
790 next_attempt = now + T_RETRY_MSEC;
791 break;
792 case MQTT_SN_TOPIC_STATE_UNSUBSCRIBE:
793 if (subscribing) {
794 /*
795 * Only one topic can be subscribing or unsubscribing
796 * at the same time
797 */
798 break;
799 }
800 topic->state = MQTT_SN_TOPIC_STATE_UNSUBSCRIBING;
801 LOG_INF("Topic unsubscription now in progress");
802 __fallthrough;
803 case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
804 subscribing = true;
805 if (!topic->con.retries--) {
806 LOG_WRN("Topic ran out of retries, disconnecting");
807 mqtt_sn_disconnect_internal(client);
808 return -ETIMEDOUT;
809 }
810 mqtt_sn_do_unsubscribe(client, topic);
811 topic->con.last_attempt = now;
812 next_attempt = now + T_RETRY_MSEC;
813 break;
814 case MQTT_SN_TOPIC_STATE_REGISTERED:
815 case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
816 break;
817 }
818 }
819
820 /* Remember time when next action is due */
821 if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
822 *next_cycle = next_attempt;
823 }
824 }
825
826 LOG_DBG("next_cycle: %lld", *next_cycle);
827
828 return 0;
829 }
830
831 /**
832 * @brief Housekeeping task for the client ping
833 *
834 * @param client
835 * @param next_cycle will be set to the time when the next action is required
836 * @retval 0 on success
837 * @retval -ETIMEDOUT if client ran out of ping retries
838 */
process_ping(struct mqtt_sn_client * client,int64_t * next_cycle)839 static int process_ping(struct mqtt_sn_client *client, int64_t *next_cycle)
840 {
841 const int64_t now = k_uptime_get();
842 struct mqtt_sn_gateway *gw = NULL;
843 int64_t next_ping;
844
845 if (client->ping_retries == N_RETRY) {
846 /* Last ping was acked */
847 next_ping = client->last_ping + T_KEEPALIVE_MSEC;
848 } else {
849 next_ping = client->last_ping + T_RETRY_MSEC;
850 }
851
852 if (next_ping < now) {
853 if (!client->ping_retries--) {
854 LOG_WRN("Ping ran out of retries");
855 mqtt_sn_disconnect_internal(client);
856 SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
857 LOG_DBG("Removing non-responsive GW 0x%08x", gw->gw_id);
858 mqtt_sn_gw_destroy(client, gw);
859 return -ETIMEDOUT;
860 }
861
862 LOG_DBG("Sending PINGREQ");
863 mqtt_sn_do_ping(client);
864 client->last_ping = now;
865 next_ping = now + T_RETRY_MSEC;
866 }
867
868 if (*next_cycle == 0 || next_ping < *next_cycle) {
869 *next_cycle = next_ping;
870 }
871
872 LOG_DBG("next_cycle: %lld", *next_cycle);
873
874 return 0;
875 }
876
877 /**
878 * @brief Housekeeping task for the gateway search
879 *
880 * @param client
881 * @param next_cycle will be set to the time when the next action is required
882 * @retval 0 on success
883 */
process_search(struct mqtt_sn_client * client,int64_t * next_cycle)884 static int process_search(struct mqtt_sn_client *client, int64_t *next_cycle)
885 {
886 const int64_t now = k_uptime_get();
887
888 LOG_DBG("ts_searchgw: %lld", client->ts_searchgw);
889 LOG_DBG("ts_gwinfo: %lld", client->ts_gwinfo);
890
891 if (client->ts_searchgw != 0 && client->ts_searchgw <= now) {
892 LOG_DBG("Sending SEARCHGW");
893 mqtt_sn_do_searchgw(client);
894 client->ts_searchgw = 0;
895 }
896
897 if (client->ts_gwinfo != 0 && client->ts_gwinfo <= now) {
898 LOG_DBG("Sending GWINFO");
899 mqtt_sn_do_gwinfo(client);
900 client->ts_gwinfo = 0;
901 }
902
903 if (*next_cycle == 0 || (client->ts_searchgw != 0 && client->ts_searchgw < *next_cycle)) {
904 *next_cycle = client->ts_searchgw;
905 }
906 if (*next_cycle == 0 || (client->ts_gwinfo != 0 && client->ts_gwinfo < *next_cycle)) {
907 *next_cycle = client->ts_gwinfo;
908 }
909
910 LOG_DBG("next_cycle: %lld", *next_cycle);
911
912 return 0;
913 }
914
915 /**
916 * @brief Housekeeping task for gateway advertisements
917 *
918 * @param client
919 * @param next_cycle will be set to the time when the next action is required
920 * @return int
921 */
process_advertise(struct mqtt_sn_client * client,int64_t * next_cycle)922 static int process_advertise(struct mqtt_sn_client *client, int64_t *next_cycle)
923 {
924 const int64_t now = k_uptime_get();
925 struct mqtt_sn_gateway *gw;
926 struct mqtt_sn_gateway *gw_next;
927
928 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->gateway, gw, gw_next, next) {
929 LOG_DBG("Checking if GW 0x%02x is old", gw->gw_id);
930 if (gw->adv_timer != -1 && gw->adv_timer <= now) {
931 LOG_DBG("Removing non-responsive GW 0x%08x", gw->gw_id);
932 if (client->gateway.head == &gw->next) {
933 mqtt_sn_disconnect(client);
934 }
935 mqtt_sn_gw_destroy(client, gw);
936 }
937 if (gw->adv_timer != -1 && (*next_cycle == 0 || gw->adv_timer < *next_cycle)) {
938 *next_cycle = gw->adv_timer;
939 }
940 }
941 LOG_DBG("next_cycle: %lld", *next_cycle);
942
943 return 0;
944 }
945
946 /**
947 * @brief Housekeeping task that is called by workqueue item
948 *
949 * @param wrk The work item
950 */
process_work(struct k_work * wrk)951 static void process_work(struct k_work *wrk)
952 {
953 struct mqtt_sn_client *client;
954 struct k_work_delayable *dwork;
955 int64_t next_cycle = 0;
956 int err;
957
958 dwork = k_work_delayable_from_work(wrk);
959 client = CONTAINER_OF(dwork, struct mqtt_sn_client, process_work);
960
961 LOG_DBG("Executing work of client %p in state %d at time %lld", client, client->state,
962 k_uptime_get());
963
964 /* Clean up old advertised gateways from list */
965 err = process_advertise(client, &next_cycle);
966 if (err) {
967 return;
968 }
969
970 /* Handle GW search process timers */
971 err = process_search(client, &next_cycle);
972 if (err) {
973 return;
974 }
975
976 if (client->state == MQTT_SN_CLIENT_ACTIVE) {
977 err = process_topics(client, &next_cycle);
978 if (err) {
979 return;
980 }
981
982 err = process_pubs(client, &next_cycle);
983 if (err) {
984 return;
985 }
986
987 err = process_ping(client, &next_cycle);
988 if (err) {
989 return;
990 }
991 }
992
993 if (next_cycle > 0) {
994 LOG_DBG("next_cycle: %lld", next_cycle);
995 k_work_schedule(dwork, K_MSEC(next_cycle - k_uptime_get()));
996 }
997 }
998
mqtt_sn_client_init(struct mqtt_sn_client * client,const struct mqtt_sn_data * client_id,struct mqtt_sn_transport * transport,mqtt_sn_evt_cb_t evt_cb,void * tx,size_t txsz,void * rx,size_t rxsz)999 int mqtt_sn_client_init(struct mqtt_sn_client *client, const struct mqtt_sn_data *client_id,
1000 struct mqtt_sn_transport *transport, mqtt_sn_evt_cb_t evt_cb, void *tx,
1001 size_t txsz, void *rx, size_t rxsz)
1002 {
1003 if (!client || !client_id || !transport || !evt_cb || !tx || !rx) {
1004 return -EINVAL;
1005 }
1006
1007 memset(client, 0, sizeof(*client));
1008
1009 client->client_id.data = client_id->data;
1010 client->client_id.size = client_id->size;
1011 client->transport = transport;
1012 client->evt_cb = evt_cb;
1013
1014 net_buf_simple_init_with_data(&client->tx, tx, txsz);
1015 net_buf_simple_reset(&client->tx);
1016
1017 net_buf_simple_init_with_data(&client->rx, rx, rxsz);
1018 net_buf_simple_reset(&client->rx);
1019
1020 k_work_init_delayable(&client->process_work, process_work);
1021
1022 if (transport->init) {
1023 transport->init(transport);
1024 }
1025
1026 return 0;
1027 }
1028
mqtt_sn_client_deinit(struct mqtt_sn_client * client)1029 void mqtt_sn_client_deinit(struct mqtt_sn_client *client)
1030 {
1031 if (!client) {
1032 return;
1033 }
1034
1035 mqtt_sn_publish_destroy_all(client);
1036 mqtt_sn_topic_destroy_all(client);
1037 mqtt_sn_gw_destroy_all(client);
1038
1039 if (client->transport && client->transport->deinit) {
1040 client->transport->deinit(client->transport);
1041 }
1042
1043 k_work_cancel_delayable(&client->process_work);
1044 }
1045
mqtt_sn_add_gw(struct mqtt_sn_client * client,uint8_t gw_id,struct mqtt_sn_data gw_addr)1046 int mqtt_sn_add_gw(struct mqtt_sn_client *client, uint8_t gw_id, struct mqtt_sn_data gw_addr)
1047 {
1048 struct mqtt_sn_gateway *gw;
1049
1050 gw = mqtt_sn_gw_find_by_id(client, gw_id);
1051
1052 if (gw != NULL) {
1053 mqtt_sn_gw_destroy(client, gw);
1054 }
1055
1056 gw = mqtt_sn_gw_create(gw_id, -1, gw_addr);
1057 if (!gw) {
1058 return -ENOMEM;
1059 }
1060
1061 sys_slist_append(&client->gateway, &gw->next);
1062
1063 return 0;
1064 }
1065
mqtt_sn_search(struct mqtt_sn_client * client,uint8_t radius)1066 int mqtt_sn_search(struct mqtt_sn_client *client, uint8_t radius)
1067 {
1068 if (!client) {
1069 return -EINVAL;
1070 }
1071 /* Set SEARCHGW transmission timer */
1072 client->ts_searchgw = k_uptime_get() + (T_SEARCHGW_MSEC * sys_rand8_get() / 255);
1073 k_work_schedule(&client->process_work, K_NO_WAIT);
1074 LOG_DBG("Requested SEARCHGW for time %lld at time %lld", client->ts_searchgw,
1075 k_uptime_get());
1076
1077 return 0;
1078 }
1079
mqtt_sn_connect(struct mqtt_sn_client * client,bool will,bool clean_session)1080 int mqtt_sn_connect(struct mqtt_sn_client *client, bool will, bool clean_session)
1081 {
1082 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_CONNECT};
1083
1084 if (!client) {
1085 return -EINVAL;
1086 }
1087
1088 if (will && (!client->will_msg.data || !client->will_topic.data)) {
1089 LOG_ERR("will set to true, but no will data in client");
1090 return -EINVAL;
1091 }
1092
1093 if (clean_session) {
1094 mqtt_sn_topic_destroy_all(client);
1095 }
1096
1097 p.params.connect.clean_session = clean_session;
1098 p.params.connect.will = will;
1099 p.params.connect.duration = CONFIG_MQTT_SN_KEEPALIVE;
1100 p.params.connect.client_id.data = client->client_id.data;
1101 p.params.connect.client_id.size = client->client_id.size;
1102
1103 client->last_ping = k_uptime_get();
1104
1105 return encode_and_send(client, &p, 0);
1106 }
1107
mqtt_sn_disconnect(struct mqtt_sn_client * client)1108 int mqtt_sn_disconnect(struct mqtt_sn_client *client)
1109 {
1110 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
1111 int err;
1112
1113 if (!client) {
1114 return -EINVAL;
1115 }
1116
1117 p.params.disconnect.duration = 0;
1118
1119 err = encode_and_send(client, &p, 0);
1120 mqtt_sn_disconnect_internal(client);
1121
1122 return err;
1123 }
1124
mqtt_sn_sleep(struct mqtt_sn_client * client,uint16_t duration)1125 int mqtt_sn_sleep(struct mqtt_sn_client *client, uint16_t duration)
1126 {
1127 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
1128 int err;
1129
1130 if (!client || !duration) {
1131 return -EINVAL;
1132 }
1133
1134 p.params.disconnect.duration = duration;
1135
1136 err = encode_and_send(client, &p, 0);
1137 mqtt_sn_sleep_internal(client);
1138
1139 return err;
1140 }
1141
mqtt_sn_subscribe(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name)1142 int mqtt_sn_subscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
1143 struct mqtt_sn_data *topic_name)
1144 {
1145 struct mqtt_sn_topic *topic;
1146 int err;
1147
1148 if (!client || !topic_name || !topic_name->data || !topic_name->size) {
1149 return -EINVAL;
1150 }
1151
1152 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1153 LOG_ERR("Cannot subscribe: not connected");
1154 return -ENOTCONN;
1155 }
1156
1157 topic = mqtt_sn_topic_find_by_name(client, topic_name);
1158 if (!topic) {
1159 topic = mqtt_sn_topic_create(topic_name);
1160 if (!topic) {
1161 return -ENOMEM;
1162 }
1163
1164 topic->qos = qos;
1165 topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBE;
1166 sys_slist_append(&client->topic, &topic->next);
1167 }
1168
1169 err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1170 if (err < 0) {
1171 return err;
1172 }
1173
1174 return 0;
1175 }
1176
mqtt_sn_unsubscribe(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name)1177 int mqtt_sn_unsubscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
1178 struct mqtt_sn_data *topic_name)
1179 {
1180 struct mqtt_sn_topic *topic;
1181 int err;
1182
1183 if (!client || !topic_name) {
1184 return -EINVAL;
1185 }
1186
1187 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1188 LOG_ERR("Cannot unsubscribe: not connected");
1189 return -ENOTCONN;
1190 }
1191
1192 topic = mqtt_sn_topic_find_by_name(client, topic_name);
1193 if (!topic) {
1194 LOG_HEXDUMP_ERR(topic_name->data, topic_name->size, "Topic not found");
1195 return -ENOENT;
1196 }
1197
1198 if (topic->state != MQTT_SN_TOPIC_STATE_SUBSCRIBED) {
1199 LOG_ERR("Cannot unsubscribe: not subscribed");
1200 return -EAGAIN;
1201 }
1202
1203 topic->state = MQTT_SN_TOPIC_STATE_UNSUBSCRIBE;
1204 mqtt_sn_con_init(&topic->con);
1205
1206 err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1207 if (err < 0) {
1208 return err;
1209 }
1210
1211 return 0;
1212 }
1213
mqtt_sn_publish(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name,bool retain,struct mqtt_sn_data * data)1214 int mqtt_sn_publish(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
1215 struct mqtt_sn_data *topic_name, bool retain, struct mqtt_sn_data *data)
1216 {
1217 struct mqtt_sn_publish *pub;
1218 struct mqtt_sn_topic *topic;
1219 int err;
1220
1221 if (!client || !topic_name) {
1222 return -EINVAL;
1223 }
1224
1225 if (qos == MQTT_SN_QOS_M1) {
1226 LOG_ERR("QoS -1 not supported");
1227 return -ENOTSUP;
1228 }
1229
1230 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1231 LOG_ERR("Cannot publish: disconnected");
1232 return -ENOTCONN;
1233 }
1234
1235 topic = mqtt_sn_topic_find_by_name(client, topic_name);
1236 if (!topic) {
1237 topic = mqtt_sn_topic_create(topic_name);
1238 if (!topic) {
1239 return -ENOMEM;
1240 }
1241
1242 topic->qos = qos;
1243 topic->state = MQTT_SN_TOPIC_STATE_REGISTER;
1244 sys_slist_append(&client->topic, &topic->next);
1245 }
1246
1247 pub = mqtt_sn_publish_create(data);
1248 if (!pub) {
1249 k_work_reschedule(&client->process_work, K_NO_WAIT);
1250 return -ENOMEM;
1251 }
1252
1253 pub->qos = qos;
1254 pub->retain = retain;
1255 pub->topic = topic;
1256
1257 sys_slist_append(&client->publish, &pub->next);
1258
1259 err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1260 if (err < 0) {
1261 return err;
1262 }
1263
1264 return 0;
1265 }
1266
handle_advertise(struct mqtt_sn_client * client,struct mqtt_sn_param_advertise * p,struct mqtt_sn_data rx_addr)1267 static void handle_advertise(struct mqtt_sn_client *client, struct mqtt_sn_param_advertise *p,
1268 struct mqtt_sn_data rx_addr)
1269 {
1270 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_ADVERTISE};
1271 struct mqtt_sn_gateway *gw;
1272
1273 gw = mqtt_sn_gw_find_by_id(client, p->gw_id);
1274
1275 if (gw == NULL) {
1276 LOG_DBG("Creating GW 0x%02x with duration %d", p->gw_id, p->duration);
1277 gw = mqtt_sn_gw_create(p->gw_id, p->duration, rx_addr);
1278 if (!gw) {
1279 return;
1280 }
1281 sys_slist_append(&client->gateway, &gw->next);
1282 } else {
1283 LOG_DBG("Updating timer for GW 0x%02x with duration %d", p->gw_id, p->duration);
1284 gw->adv_timer =
1285 k_uptime_get() + (p->duration * CONFIG_MQTT_SN_LIB_N_ADV * MSEC_PER_SEC);
1286 }
1287
1288 k_work_schedule(&client->process_work, K_NO_WAIT);
1289 if (client->evt_cb) {
1290 client->evt_cb(client, &evt);
1291 }
1292 }
1293
handle_searchgw(struct mqtt_sn_client * client,struct mqtt_sn_param_searchgw * p)1294 static void handle_searchgw(struct mqtt_sn_client *client, struct mqtt_sn_param_searchgw *p)
1295 {
1296 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_SEARCHGW};
1297
1298 /* Increment SEARCHGW transmission timestamp if waiting */
1299 if (client->ts_searchgw != 0) {
1300 client->ts_searchgw = k_uptime_get() + (T_SEARCHGW_MSEC * sys_rand8_get() / 255);
1301 }
1302
1303 /* Set transmission timestamp to respond to SEARCHGW if we have a GW */
1304 if (sys_slist_len(&client->gateway) > 0) {
1305 client->ts_gwinfo = k_uptime_get() + (T_GWINFO_MSEC * sys_rand8_get() / 255);
1306 }
1307 client->radius_gwinfo = p->radius;
1308 k_work_schedule(&client->process_work, K_NO_WAIT);
1309
1310 if (client->evt_cb) {
1311 client->evt_cb(client, &evt);
1312 }
1313 }
1314
handle_gwinfo(struct mqtt_sn_client * client,struct mqtt_sn_param_gwinfo * p,struct mqtt_sn_data rx_addr)1315 static void handle_gwinfo(struct mqtt_sn_client *client, struct mqtt_sn_param_gwinfo *p,
1316 struct mqtt_sn_data rx_addr)
1317 {
1318 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_GWINFO};
1319 struct mqtt_sn_gateway *gw;
1320
1321 /* Clear SEARCHGW and GWINFO transmission if waiting */
1322 client->ts_searchgw = 0;
1323 client->ts_gwinfo = 0;
1324 k_work_schedule(&client->process_work, K_NO_WAIT);
1325
1326 /* Extract GW info and store */
1327 if (p->gw_add.size > 0) {
1328 rx_addr.data = p->gw_add.data;
1329 rx_addr.size = p->gw_add.size;
1330 } else {
1331 }
1332 gw = mqtt_sn_gw_create(p->gw_id, -1, rx_addr);
1333
1334 if (!gw) {
1335 return;
1336 }
1337
1338 sys_slist_append(&client->gateway, &gw->next);
1339
1340 if (client->evt_cb) {
1341 client->evt_cb(client, &evt);
1342 }
1343 }
1344
handle_connack(struct mqtt_sn_client * client,struct mqtt_sn_param_connack * p)1345 static void handle_connack(struct mqtt_sn_client *client, struct mqtt_sn_param_connack *p)
1346 {
1347 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_CONNECTED};
1348
1349 if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1350 LOG_INF("MQTT_SN client connected");
1351 switch (client->state) {
1352 case MQTT_SN_CLIENT_DISCONNECTED:
1353 case MQTT_SN_CLIENT_ASLEEP:
1354 case MQTT_SN_CLIENT_AWAKE:
1355 mqtt_sn_set_state(client, MQTT_SN_CLIENT_ACTIVE);
1356 if (client->evt_cb) {
1357 client->evt_cb(client, &evt);
1358 }
1359 client->ping_retries = N_RETRY;
1360 break;
1361 default:
1362 LOG_ERR("Client received CONNACK but was in state %d", client->state);
1363 return;
1364 }
1365 } else {
1366 LOG_WRN("CONNACK ret code %d", p->ret_code);
1367 mqtt_sn_disconnect_internal(client);
1368 }
1369
1370 k_work_schedule(&client->process_work, K_NO_WAIT);
1371 }
1372
handle_willtopicreq(struct mqtt_sn_client * client)1373 static void handle_willtopicreq(struct mqtt_sn_client *client)
1374 {
1375 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLTOPIC};
1376
1377 response.params.willtopic.qos = client->will_qos;
1378 response.params.willtopic.retain = client->will_retain;
1379 response.params.willtopic.topic.data = client->will_topic.data;
1380 response.params.willtopic.topic.size = client->will_topic.size;
1381
1382 encode_and_send(client, &response, 0);
1383 }
1384
handle_willmsgreq(struct mqtt_sn_client * client)1385 static void handle_willmsgreq(struct mqtt_sn_client *client)
1386 {
1387 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLMSG};
1388
1389 response.params.willmsg.msg.data = client->will_msg.data;
1390 response.params.willmsg.msg.size = client->will_msg.size;
1391
1392 encode_and_send(client, &response, 0);
1393 }
1394
handle_register(struct mqtt_sn_client * client,struct mqtt_sn_param_register * p)1395 static void handle_register(struct mqtt_sn_client *client, struct mqtt_sn_param_register *p)
1396 {
1397 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_REGACK};
1398 struct mqtt_sn_topic *topic;
1399
1400 topic = mqtt_sn_topic_create(&p->topic);
1401 if (!topic) {
1402 return;
1403 }
1404
1405 topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
1406 topic->topic_id = p->topic_id;
1407 topic->type = MQTT_SN_TOPIC_TYPE_NORMAL;
1408
1409 sys_slist_append(&client->topic, &topic->next);
1410
1411 response.params.regack.ret_code = MQTT_SN_CODE_ACCEPTED;
1412 response.params.regack.topic_id = p->topic_id;
1413 response.params.regack.msg_id = p->msg_id;
1414
1415 encode_and_send(client, &response, 0);
1416 }
1417
handle_regack(struct mqtt_sn_client * client,struct mqtt_sn_param_regack * p)1418 static void handle_regack(struct mqtt_sn_client *client, struct mqtt_sn_param_regack *p)
1419 {
1420 struct mqtt_sn_topic *topic = mqtt_sn_topic_find_by_msg_id(client, p->msg_id);
1421
1422 if (!topic) {
1423 LOG_ERR("Can't REGACK, no topic found");
1424 return;
1425 }
1426
1427 if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1428 topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
1429 topic->topic_id = p->topic_id;
1430 } else {
1431 LOG_WRN("Gateway could not register topic ID %u, code %d", p->topic_id,
1432 p->ret_code);
1433 }
1434 }
1435
handle_publish(struct mqtt_sn_client * client,struct mqtt_sn_param_publish * p)1436 static void handle_publish(struct mqtt_sn_client *client, struct mqtt_sn_param_publish *p)
1437 {
1438 struct mqtt_sn_param response;
1439 struct mqtt_sn_evt evt = {.param.publish = {.data = p->data,
1440 .topic_id = p->topic_id,
1441 .topic_type = p->topic_type},
1442 .type = MQTT_SN_EVT_PUBLISH};
1443
1444 if (p->qos == MQTT_SN_QOS_1) {
1445 response.type = MQTT_SN_MSG_TYPE_PUBACK;
1446 response.params.puback.topic_id = p->topic_id;
1447 response.params.puback.msg_id = p->msg_id;
1448 response.params.puback.ret_code = MQTT_SN_CODE_ACCEPTED;
1449
1450 encode_and_send(client, &response, 0);
1451 } else if (p->qos == MQTT_SN_QOS_2) {
1452 response.type = MQTT_SN_MSG_TYPE_PUBREC;
1453 response.params.pubrec.msg_id = p->msg_id;
1454
1455 encode_and_send(client, &response, 0);
1456 }
1457
1458 if (client->evt_cb) {
1459 client->evt_cb(client, &evt);
1460 }
1461 }
1462
handle_puback(struct mqtt_sn_client * client,struct mqtt_sn_param_puback * p)1463 static void handle_puback(struct mqtt_sn_client *client, struct mqtt_sn_param_puback *p)
1464 {
1465 struct mqtt_sn_publish *pub = mqtt_sn_publish_find_by_msg_id(client, p->msg_id);
1466
1467 if (!pub) {
1468 LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1469 return;
1470 }
1471
1472 mqtt_sn_publish_destroy(client, pub);
1473 }
1474
handle_pubrec(struct mqtt_sn_client * client,struct mqtt_sn_param_pubrec * p)1475 static void handle_pubrec(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrec *p)
1476 {
1477 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBREL};
1478 struct mqtt_sn_publish *pub = mqtt_sn_publish_find_by_msg_id(client, p->msg_id);
1479
1480 if (!pub) {
1481 LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1482 return;
1483 }
1484
1485 pub->con.last_attempt = k_uptime_get();
1486 pub->con.retries = N_RETRY;
1487
1488 response.params.pubrel.msg_id = p->msg_id;
1489
1490 encode_and_send(client, &response, 0);
1491 }
1492
handle_pubrel(struct mqtt_sn_client * client,struct mqtt_sn_param_pubrel * p)1493 static void handle_pubrel(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrel *p)
1494 {
1495 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBCOMP};
1496
1497 response.params.pubcomp.msg_id = p->msg_id;
1498
1499 encode_and_send(client, &response, 0);
1500 }
1501
handle_pubcomp(struct mqtt_sn_client * client,struct mqtt_sn_param_pubcomp * p)1502 static void handle_pubcomp(struct mqtt_sn_client *client, struct mqtt_sn_param_pubcomp *p)
1503 {
1504 struct mqtt_sn_publish *pub = mqtt_sn_publish_find_by_msg_id(client, p->msg_id);
1505
1506 if (!pub) {
1507 LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1508 return;
1509 }
1510
1511 mqtt_sn_publish_destroy(client, pub);
1512 }
1513
handle_suback(struct mqtt_sn_client * client,struct mqtt_sn_param_suback * p)1514 static void handle_suback(struct mqtt_sn_client *client, struct mqtt_sn_param_suback *p)
1515 {
1516 struct mqtt_sn_topic *topic = mqtt_sn_topic_find_by_msg_id(client, p->msg_id);
1517
1518 if (!topic) {
1519 LOG_ERR("No matching SUBSCRIBE found for msg id %u", p->msg_id);
1520 return;
1521 }
1522
1523 if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1524 topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBED;
1525 topic->topic_id = p->topic_id;
1526 topic->qos = p->qos;
1527 } else {
1528 LOG_WRN("SUBACK with ret code %d", p->ret_code);
1529 }
1530 }
1531
handle_unsuback(struct mqtt_sn_client * client,struct mqtt_sn_param_unsuback * p)1532 static void handle_unsuback(struct mqtt_sn_client *client, struct mqtt_sn_param_unsuback *p)
1533 {
1534 struct mqtt_sn_topic *topic = mqtt_sn_topic_find_by_msg_id(client, p->msg_id);
1535
1536 if (!topic || topic->state != MQTT_SN_TOPIC_STATE_UNSUBSCRIBING) {
1537 LOG_ERR("No matching UNSUBSCRIBE found for msg id %u", p->msg_id);
1538 return;
1539 }
1540
1541 mqtt_sn_topic_destroy(client, topic);
1542 }
1543
handle_pingreq(struct mqtt_sn_client * client)1544 static void handle_pingreq(struct mqtt_sn_client *client)
1545 {
1546 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PINGRESP};
1547
1548 encode_and_send(client, &response, 0);
1549 }
1550
handle_pingresp(struct mqtt_sn_client * client)1551 static void handle_pingresp(struct mqtt_sn_client *client)
1552 {
1553 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_PINGRESP};
1554
1555 if (client->evt_cb) {
1556 client->evt_cb(client, &evt);
1557 }
1558
1559 if (client->state == MQTT_SN_CLIENT_AWAKE) {
1560 mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
1561 }
1562
1563 client->ping_retries = N_RETRY;
1564 }
1565
handle_disconnect(struct mqtt_sn_client * client,struct mqtt_sn_param_disconnect * p)1566 static void handle_disconnect(struct mqtt_sn_client *client, struct mqtt_sn_param_disconnect *p)
1567 {
1568 LOG_INF("Received DISCONNECT");
1569 mqtt_sn_disconnect_internal(client);
1570 }
1571
handle_msg(struct mqtt_sn_client * client,struct mqtt_sn_data rx_addr)1572 static int handle_msg(struct mqtt_sn_client *client, struct mqtt_sn_data rx_addr)
1573 {
1574 int err;
1575 struct mqtt_sn_param p;
1576
1577 err = mqtt_sn_decode_msg(&client->rx, &p);
1578 if (err) {
1579 return err;
1580 }
1581
1582 LOG_INF("Got message of type %d", p.type);
1583
1584 switch (p.type) {
1585 case MQTT_SN_MSG_TYPE_ADVERTISE:
1586 handle_advertise(client, &p.params.advertise, rx_addr);
1587 break;
1588 case MQTT_SN_MSG_TYPE_SEARCHGW:
1589 handle_searchgw(client, &p.params.searchgw);
1590 break;
1591 case MQTT_SN_MSG_TYPE_GWINFO:
1592 handle_gwinfo(client, &p.params.gwinfo, rx_addr);
1593 break;
1594 case MQTT_SN_MSG_TYPE_CONNACK:
1595 handle_connack(client, &p.params.connack);
1596 break;
1597 case MQTT_SN_MSG_TYPE_WILLTOPICREQ:
1598 handle_willtopicreq(client);
1599 break;
1600 case MQTT_SN_MSG_TYPE_WILLMSGREQ:
1601 handle_willmsgreq(client);
1602 break;
1603 case MQTT_SN_MSG_TYPE_REGISTER:
1604 handle_register(client, &p.params.reg);
1605 break;
1606 case MQTT_SN_MSG_TYPE_REGACK:
1607 handle_regack(client, &p.params.regack);
1608 break;
1609 case MQTT_SN_MSG_TYPE_PUBLISH:
1610 handle_publish(client, &p.params.publish);
1611 break;
1612 case MQTT_SN_MSG_TYPE_PUBACK:
1613 handle_puback(client, &p.params.puback);
1614 break;
1615 case MQTT_SN_MSG_TYPE_PUBREC:
1616 handle_pubrec(client, &p.params.pubrec);
1617 break;
1618 case MQTT_SN_MSG_TYPE_PUBREL:
1619 handle_pubrel(client, &p.params.pubrel);
1620 break;
1621 case MQTT_SN_MSG_TYPE_PUBCOMP:
1622 handle_pubcomp(client, &p.params.pubcomp);
1623 break;
1624 case MQTT_SN_MSG_TYPE_SUBACK:
1625 handle_suback(client, &p.params.suback);
1626 break;
1627 case MQTT_SN_MSG_TYPE_UNSUBACK:
1628 handle_unsuback(client, &p.params.unsuback);
1629 break;
1630 case MQTT_SN_MSG_TYPE_PINGREQ:
1631 handle_pingreq(client);
1632 break;
1633 case MQTT_SN_MSG_TYPE_PINGRESP:
1634 handle_pingresp(client);
1635 break;
1636 case MQTT_SN_MSG_TYPE_DISCONNECT:
1637 handle_disconnect(client, &p.params.disconnect);
1638 break;
1639 case MQTT_SN_MSG_TYPE_WILLTOPICRESP:
1640 break;
1641 case MQTT_SN_MSG_TYPE_WILLMSGRESP:
1642 break;
1643 default:
1644 LOG_ERR("Unexpected message type %d", p.type);
1645 break;
1646 }
1647
1648 k_work_reschedule(&client->process_work, K_NO_WAIT);
1649
1650 return 0;
1651 }
1652
mqtt_sn_input(struct mqtt_sn_client * client)1653 int mqtt_sn_input(struct mqtt_sn_client *client)
1654 {
1655 ssize_t next_frame_size;
1656 char addr[CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE];
1657 struct mqtt_sn_data rx_addr = {.data = addr, .size = CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE};
1658 int err;
1659
1660 if (!client || !client->transport || !client->transport->recvfrom) {
1661 return -EINVAL;
1662 }
1663
1664 if (client->transport->poll) {
1665 next_frame_size = client->transport->poll(client);
1666 if (next_frame_size <= 0) {
1667 return next_frame_size;
1668 }
1669 }
1670
1671 net_buf_simple_reset(&client->rx);
1672
1673 next_frame_size = client->transport->recvfrom(client, client->rx.data, client->rx.size,
1674 (void *)rx_addr.data, &rx_addr.size);
1675 if (next_frame_size <= 0) {
1676 return next_frame_size;
1677 }
1678
1679 if (next_frame_size > client->rx.size) {
1680 return -ENOBUFS;
1681 }
1682
1683 client->rx.len = next_frame_size;
1684
1685 LOG_HEXDUMP_DBG(client->rx.data, client->rx.len, "Received data");
1686
1687 err = handle_msg(client, rx_addr);
1688
1689 if (err) {
1690 return err;
1691 }
1692
1693 /* Should be zero */
1694 return -client->rx.len;
1695 }
1696
mqtt_sn_get_topic_name(struct mqtt_sn_client * client,uint16_t id,struct mqtt_sn_data * topic_name)1697 int mqtt_sn_get_topic_name(struct mqtt_sn_client *client, uint16_t id,
1698 struct mqtt_sn_data *topic_name)
1699 {
1700 struct mqtt_sn_topic *topic;
1701
1702 if (!client || !topic_name) {
1703 return -EINVAL;
1704 }
1705
1706 SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
1707 if (topic->topic_id == id) {
1708 topic_name->data = (const uint8_t *)topic->name;
1709 topic_name->size = topic->namelen;
1710 return 0;
1711 }
1712 }
1713 return -ENOENT;
1714 }
1715