1 /*
2 * Copyright (c) 2022 grandcentrix GmbH
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /** @file mqtt_sn.c
8 *
9 * @brief MQTT-SN Client API Implementation.
10 */
11
12 #include "mqtt_sn_msg.h"
13
14 #include <zephyr/logging/log.h>
15 #include <zephyr/random/random.h>
16 #include <zephyr/net/mqtt_sn.h>
17 LOG_MODULE_REGISTER(net_mqtt_sn, CONFIG_MQTT_SN_LOG_LEVEL);
18
19 #define MQTT_SN_NET_BUFS (CONFIG_MQTT_SN_LIB_MAX_PUBLISH)
20
21 NET_BUF_POOL_FIXED_DEFINE(mqtt_sn_messages, MQTT_SN_NET_BUFS, CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE,
22 0, NULL);
23
24 struct mqtt_sn_confirmable {
25 int64_t last_attempt;
26 uint16_t msg_id;
27 uint8_t retries;
28 };
29
30 struct mqtt_sn_publish {
31 struct mqtt_sn_confirmable con;
32 sys_snode_t next;
33 struct mqtt_sn_topic *topic;
34 uint8_t pubdata[CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE];
35 size_t datalen;
36 enum mqtt_sn_qos qos;
37 bool retain;
38 };
39
40 enum mqtt_sn_topic_state {
41 MQTT_SN_TOPIC_STATE_REGISTERING,
42 MQTT_SN_TOPIC_STATE_REGISTERED,
43 MQTT_SN_TOPIC_STATE_SUBSCRIBING,
44 MQTT_SN_TOPIC_STATE_SUBSCRIBED,
45 MQTT_SN_TOPIC_STATE_UNSUBSCRIBING,
46 };
47
48 struct mqtt_sn_topic {
49 struct mqtt_sn_confirmable con;
50 sys_snode_t next;
51 char name[CONFIG_MQTT_SN_LIB_MAX_TOPIC_SIZE];
52 size_t namelen;
53 uint16_t topic_id;
54 enum mqtt_sn_qos qos;
55 enum mqtt_sn_topic_type type;
56 enum mqtt_sn_topic_state state;
57 };
58
59 struct mqtt_sn_gateway {
60 sys_snode_t next;
61 char gw_id;
62 int64_t adv_timer;
63 char addr[CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE];
64 size_t addr_len;
65 };
66
67 K_MEM_SLAB_DEFINE_STATIC(publishes, sizeof(struct mqtt_sn_publish), CONFIG_MQTT_SN_LIB_MAX_PUBLISH,
68 4);
69 K_MEM_SLAB_DEFINE_STATIC(topics, sizeof(struct mqtt_sn_topic), CONFIG_MQTT_SN_LIB_MAX_TOPICS, 4);
70 K_MEM_SLAB_DEFINE_STATIC(gateways, sizeof(struct mqtt_sn_gateway), CONFIG_MQTT_SN_LIB_MAX_GATEWAYS,
71 4);
72
73 enum mqtt_sn_client_state {
74 MQTT_SN_CLIENT_DISCONNECTED,
75 MQTT_SN_CLIENT_ACTIVE,
76 MQTT_SN_CLIENT_ASLEEP,
77 MQTT_SN_CLIENT_AWAKE
78 };
79
mqtt_sn_set_state(struct mqtt_sn_client * client,enum mqtt_sn_client_state state)80 static void mqtt_sn_set_state(struct mqtt_sn_client *client, enum mqtt_sn_client_state state)
81 {
82 int prev_state = client->state;
83
84 client->state = state;
85 LOG_DBG("Client %p state (%d) -> (%d)", client, prev_state, state);
86 }
87
88 #define T_SEARCHGW_MSEC (CONFIG_MQTT_SN_LIB_T_SEARCHGW * MSEC_PER_SEC)
89 #define T_GWINFO_MSEC (CONFIG_MQTT_SN_LIB_T_GWINFO * MSEC_PER_SEC)
90 #define T_RETRY_MSEC (CONFIG_MQTT_SN_LIB_T_RETRY * MSEC_PER_SEC)
91 #define N_RETRY (CONFIG_MQTT_SN_LIB_N_RETRY)
92 #define T_KEEPALIVE_MSEC (CONFIG_MQTT_SN_KEEPALIVE * MSEC_PER_SEC)
93
next_msg_id(void)94 static uint16_t next_msg_id(void)
95 {
96 static uint16_t msg_id;
97
98 return ++msg_id;
99 }
100
encode_and_send(struct mqtt_sn_client * client,struct mqtt_sn_param * p,uint8_t broadcast_radius)101 static int encode_and_send(struct mqtt_sn_client *client, struct mqtt_sn_param *p,
102 uint8_t broadcast_radius)
103 {
104 int err;
105
106 err = mqtt_sn_encode_msg(&client->tx, p);
107 if (err) {
108 goto end;
109 }
110
111 LOG_HEXDUMP_DBG(client->tx.data, client->tx.len, "Send message");
112
113 if (!client->transport->sendto) {
114 LOG_ERR("Can't send: no callback");
115 err = -ENOTSUP;
116 goto end;
117 }
118
119 if (!client->tx.len) {
120 LOG_WRN("Can't send: empty");
121 err = -ENOMSG;
122 goto end;
123 }
124
125 if (broadcast_radius) {
126 err = client->transport->sendto(client, client->tx.data, client->tx.len, NULL,
127 broadcast_radius);
128 } else {
129 struct mqtt_sn_gateway *gw;
130
131 gw = SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
132 if (gw == NULL || gw->addr_len == 0) {
133 LOG_WRN("No Gateway Address");
134 err = -ENXIO;
135 goto end;
136 }
137 err = client->transport->sendto(client, client->tx.data, client->tx.len, gw->addr,
138 gw->addr_len);
139 }
140
141 end:
142 if (err) {
143 LOG_ERR("Error during send: %d", err);
144 }
145 net_buf_simple_reset(&client->tx);
146
147 return err;
148 }
149
mqtt_sn_con_init(struct mqtt_sn_confirmable * con)150 static void mqtt_sn_con_init(struct mqtt_sn_confirmable *con)
151 {
152 con->last_attempt = 0;
153 con->retries = N_RETRY;
154 con->msg_id = next_msg_id();
155 }
156
mqtt_sn_publish_destroy(struct mqtt_sn_client * client,struct mqtt_sn_publish * pub)157 static void mqtt_sn_publish_destroy(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub)
158 {
159 sys_slist_find_and_remove(&client->publish, &pub->next);
160 k_mem_slab_free(&publishes, (void *)pub);
161 }
162
mqtt_sn_publish_destroy_all(struct mqtt_sn_client * client)163 static void mqtt_sn_publish_destroy_all(struct mqtt_sn_client *client)
164 {
165 struct mqtt_sn_publish *pub;
166 sys_snode_t *next;
167
168 while ((next = sys_slist_get(&client->publish)) != NULL) {
169 pub = SYS_SLIST_CONTAINER(next, pub, next);
170 k_mem_slab_free(&publishes, (void *)pub);
171 }
172 }
173
mqtt_sn_publish_create(struct mqtt_sn_data * data)174 static struct mqtt_sn_publish *mqtt_sn_publish_create(struct mqtt_sn_data *data)
175 {
176 struct mqtt_sn_publish *pub;
177
178 if (k_mem_slab_alloc(&publishes, (void **)&pub, K_NO_WAIT)) {
179 LOG_ERR("Can't create PUB: no free slot");
180 return NULL;
181 }
182
183 memset(pub, 0, sizeof(*pub));
184
185 if (data && data->data && data->size) {
186 if (data->size > sizeof(pub->pubdata)) {
187 LOG_ERR("Can't create PUB: Too much data (%zu)", data->size);
188 return NULL;
189 }
190
191 memcpy(pub->pubdata, data->data, data->size);
192 pub->datalen = data->size;
193 }
194
195 mqtt_sn_con_init(&pub->con);
196
197 return pub;
198 }
199
mqtt_sn_publish_find_msg_id(struct mqtt_sn_client * client,uint16_t msg_id)200 static struct mqtt_sn_publish *mqtt_sn_publish_find_msg_id(struct mqtt_sn_client *client,
201 uint16_t msg_id)
202 {
203 struct mqtt_sn_publish *pub;
204
205 SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
206 if (pub->con.msg_id == msg_id) {
207 return pub;
208 }
209 }
210
211 return NULL;
212 }
213
mqtt_sn_publish_find_topic(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)214 static struct mqtt_sn_publish *mqtt_sn_publish_find_topic(struct mqtt_sn_client *client,
215 struct mqtt_sn_topic *topic)
216 {
217 struct mqtt_sn_publish *pub;
218
219 SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
220 if (pub->topic == topic) {
221 return pub;
222 }
223 }
224
225 return NULL;
226 }
227
mqtt_sn_topic_create(struct mqtt_sn_data * name)228 static struct mqtt_sn_topic *mqtt_sn_topic_create(struct mqtt_sn_data *name)
229 {
230 struct mqtt_sn_topic *topic;
231
232 if (k_mem_slab_alloc(&topics, (void **)&topic, K_NO_WAIT)) {
233 LOG_ERR("Can't create topic: no free slot");
234 return NULL;
235 }
236
237 memset(topic, 0, sizeof(*topic));
238
239 if (!name || !name->data || !name->size) {
240 LOG_ERR("Can't create topic with empty name");
241 return NULL;
242 }
243
244 if (name->size > sizeof(topic->name)) {
245 LOG_ERR("Can't create topic: name too long (%zu)", name->size);
246 return NULL;
247 }
248
249 memcpy(topic->name, name->data, name->size);
250 topic->namelen = name->size;
251
252 mqtt_sn_con_init(&topic->con);
253
254 return topic;
255 }
256
mqtt_sn_topic_find_name(struct mqtt_sn_client * client,struct mqtt_sn_data * topic_name)257 static struct mqtt_sn_topic *mqtt_sn_topic_find_name(struct mqtt_sn_client *client,
258 struct mqtt_sn_data *topic_name)
259 {
260 struct mqtt_sn_topic *topic;
261
262 SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
263 if (topic->namelen == topic_name->size &&
264 memcmp(topic->name, topic_name->data, topic_name->size) == 0) {
265 return topic;
266 }
267 }
268
269 return NULL;
270 }
271
mqtt_sn_topic_find_msg_id(struct mqtt_sn_client * client,uint16_t msg_id)272 static struct mqtt_sn_topic *mqtt_sn_topic_find_msg_id(struct mqtt_sn_client *client,
273 uint16_t msg_id)
274 {
275 struct mqtt_sn_topic *topic;
276
277 SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
278 if (topic->con.msg_id == msg_id) {
279 return topic;
280 }
281 }
282
283 return NULL;
284 }
285
mqtt_sn_topic_destroy(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)286 static void mqtt_sn_topic_destroy(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
287 {
288 struct mqtt_sn_publish *pub;
289
290 /* Destroy all pubs referencing this topic */
291 while ((pub = mqtt_sn_publish_find_topic(client, topic)) != NULL) {
292 LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
293 mqtt_sn_publish_destroy(client, pub);
294 }
295
296 sys_slist_find_and_remove(&client->topic, &topic->next);
297 }
298
mqtt_sn_topic_destroy_all(struct mqtt_sn_client * client)299 static void mqtt_sn_topic_destroy_all(struct mqtt_sn_client *client)
300 {
301 struct mqtt_sn_topic *topic;
302 struct mqtt_sn_publish *pub;
303 sys_snode_t *next;
304
305 while ((next = sys_slist_get(&client->topic)) != NULL) {
306 topic = SYS_SLIST_CONTAINER(next, topic, next);
307 /* Destroy all pubs referencing this topic */
308 while ((pub = mqtt_sn_publish_find_topic(client, topic)) != NULL) {
309 LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
310 mqtt_sn_publish_destroy(client, pub);
311 }
312
313 k_mem_slab_free(&topics, (void *)topic);
314 }
315 }
316
mqtt_sn_gw_destroy(struct mqtt_sn_client * client,struct mqtt_sn_gateway * gw)317 static void mqtt_sn_gw_destroy(struct mqtt_sn_client *client, struct mqtt_sn_gateway *gw)
318 {
319 LOG_DBG("Destroying gateway %d", gw->gw_id);
320 sys_slist_find_and_remove(&client->gateway, &gw->next);
321 k_mem_slab_free(&gateways, (void *)gw);
322 }
323
mqtt_sn_gw_destroy_all(struct mqtt_sn_client * client)324 static void mqtt_sn_gw_destroy_all(struct mqtt_sn_client *client)
325 {
326 struct mqtt_sn_gateway *gw;
327 sys_snode_t *next;
328
329 while ((next = sys_slist_get(&client->gateway)) != NULL) {
330 gw = SYS_SLIST_CONTAINER(next, gw, next);
331 sys_slist_find_and_remove(&client->gateway, next);
332 k_mem_slab_free(&gateways, (void *)gw);
333 }
334 }
335
mqtt_sn_gw_create(uint8_t gw_id,short duration,struct mqtt_sn_data gw_addr)336 static struct mqtt_sn_gateway *mqtt_sn_gw_create(uint8_t gw_id, short duration,
337 struct mqtt_sn_data gw_addr)
338 {
339 struct mqtt_sn_gateway *gw;
340
341 LOG_DBG("Free GW slots: %d", k_mem_slab_num_free_get(&gateways));
342 if (k_mem_slab_alloc(&gateways, (void **)&gw, K_NO_WAIT)) {
343 LOG_WRN("Can't create GW: no free slot");
344 return NULL;
345 }
346
347 __ASSERT(gw_addr.size < CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE,
348 "Gateway address is larger than allowed by CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE");
349
350 memset(gw, 0, sizeof(*gw));
351 memcpy(gw->addr, gw_addr.data, gw_addr.size);
352 gw->addr_len = gw_addr.size;
353 gw->gw_id = gw_id;
354 if (duration == -1) {
355 gw->adv_timer = duration;
356 } else {
357 gw->adv_timer =
358 k_uptime_get() + (duration * CONFIG_MQTT_SN_LIB_N_ADV * MSEC_PER_SEC);
359 }
360
361 return gw;
362 }
363
mqtt_sn_gw_find_id(struct mqtt_sn_client * client,uint16_t gw_id)364 static struct mqtt_sn_gateway *mqtt_sn_gw_find_id(struct mqtt_sn_client *client, uint16_t gw_id)
365 {
366 struct mqtt_sn_gateway *gw;
367
368 SYS_SLIST_FOR_EACH_CONTAINER(&client->gateway, gw, next) {
369 if (gw->gw_id == gw_id) {
370 return gw;
371 }
372 }
373
374 return NULL;
375 }
376
mqtt_sn_disconnect_internal(struct mqtt_sn_client * client)377 static void mqtt_sn_disconnect_internal(struct mqtt_sn_client *client)
378 {
379 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
380
381 mqtt_sn_set_state(client, MQTT_SN_CLIENT_DISCONNECTED);
382 if (client->evt_cb) {
383 client->evt_cb(client, &evt);
384 }
385
386 /*
387 * Remove all publishes, but keep topics
388 * Topics are removed on deinit or when connect is called with
389 * clean-session = true
390 */
391 mqtt_sn_publish_destroy_all(client);
392
393 k_work_cancel_delayable(&client->process_work);
394 }
395
mqtt_sn_sleep_internal(struct mqtt_sn_client * client)396 static void mqtt_sn_sleep_internal(struct mqtt_sn_client *client)
397 {
398 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
399
400 mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
401 if (client->evt_cb) {
402 client->evt_cb(client, &evt);
403 }
404 }
405
mqtt_sn_do_subscribe(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic,bool dup)406 static void mqtt_sn_do_subscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic,
407 bool dup)
408 {
409 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_SUBSCRIBE};
410
411 if (!client || !topic) {
412 return;
413 }
414
415 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
416 LOG_ERR("Cannot subscribe: not connected");
417 return;
418 }
419
420 p.params.subscribe.msg_id = topic->con.msg_id;
421 p.params.subscribe.qos = topic->qos;
422 p.params.subscribe.topic_type = topic->type;
423 p.params.subscribe.dup = dup;
424 switch (topic->type) {
425 case MQTT_SN_TOPIC_TYPE_NORMAL:
426 p.params.subscribe.topic.topic_name.data = topic->name;
427 p.params.subscribe.topic.topic_name.size = topic->namelen;
428 break;
429 case MQTT_SN_TOPIC_TYPE_PREDEF:
430 case MQTT_SN_TOPIC_TYPE_SHORT:
431 p.params.subscribe.topic.topic_id = topic->topic_id;
432 break;
433 default:
434 LOG_ERR("Unexpected topic type %d", topic->type);
435 return;
436 }
437
438 encode_and_send(client, &p, 0);
439 }
440
mqtt_sn_do_unsubscribe(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)441 static void mqtt_sn_do_unsubscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
442 {
443 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_UNSUBSCRIBE};
444
445 if (!client || !topic) {
446 return;
447 }
448
449 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
450 LOG_ERR("Cannot unsubscribe: not connected");
451 return;
452 }
453
454 p.params.unsubscribe.msg_id = topic->con.msg_id;
455 p.params.unsubscribe.topic_type = topic->type;
456 switch (topic->type) {
457 case MQTT_SN_TOPIC_TYPE_NORMAL:
458 p.params.unsubscribe.topic.topic_name.data = topic->name;
459 p.params.unsubscribe.topic.topic_name.size = topic->namelen;
460 break;
461 case MQTT_SN_TOPIC_TYPE_PREDEF:
462 case MQTT_SN_TOPIC_TYPE_SHORT:
463 p.params.unsubscribe.topic.topic_id = topic->topic_id;
464 break;
465 default:
466 LOG_ERR("Unexpected topic type %d", topic->type);
467 return;
468 }
469
470 encode_and_send(client, &p, 0);
471 }
472
mqtt_sn_do_register(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)473 static void mqtt_sn_do_register(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
474 {
475 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_REGISTER};
476
477 if (!client || !topic) {
478 return;
479 }
480
481 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
482 LOG_ERR("Cannot register: not connected");
483 return;
484 }
485
486 p.params.reg.msg_id = topic->con.msg_id;
487 switch (topic->type) {
488 case MQTT_SN_TOPIC_TYPE_NORMAL:
489 LOG_HEXDUMP_INF(topic->name, topic->namelen, "Registering topic");
490 p.params.reg.topic.data = topic->name;
491 p.params.reg.topic.size = topic->namelen;
492 break;
493 default:
494 LOG_ERR("Unexpected topic type %d", topic->type);
495 return;
496 }
497
498 encode_and_send(client, &p, 0);
499 }
500
mqtt_sn_do_publish(struct mqtt_sn_client * client,struct mqtt_sn_publish * pub,bool dup)501 static void mqtt_sn_do_publish(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub, bool dup)
502 {
503 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PUBLISH};
504
505 if (!client || !pub) {
506 return;
507 }
508
509 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
510 LOG_ERR("Cannot subscribe: not connected");
511 return;
512 }
513
514 LOG_INF("Publishing to topic ID %d", pub->topic->topic_id);
515
516 p.params.publish.data.data = pub->pubdata;
517 p.params.publish.data.size = pub->datalen;
518 p.params.publish.msg_id = pub->con.msg_id;
519 p.params.publish.retain = pub->retain;
520 p.params.publish.topic_id = pub->topic->topic_id;
521 p.params.publish.topic_type = pub->topic->type;
522 p.params.publish.qos = pub->qos;
523 p.params.publish.dup = dup;
524
525 encode_and_send(client, &p, 0);
526 }
527
mqtt_sn_do_searchgw(struct mqtt_sn_client * client)528 static void mqtt_sn_do_searchgw(struct mqtt_sn_client *client)
529 {
530 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_SEARCHGW};
531
532 p.params.searchgw.radius = CONFIG_MQTT_SN_LIB_BROADCAST_RADIUS;
533
534 encode_and_send(client, &p, CONFIG_MQTT_SN_LIB_BROADCAST_RADIUS);
535 }
536
mqtt_sn_do_gwinfo(struct mqtt_sn_client * client)537 static void mqtt_sn_do_gwinfo(struct mqtt_sn_client *client)
538 {
539 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_GWINFO};
540 struct mqtt_sn_gateway *gw;
541 struct mqtt_sn_data addr;
542
543 gw = SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
544
545 if (gw == NULL || gw->addr_len == 0) {
546 LOG_WRN("No Gateway Address");
547 return;
548 }
549
550 response.params.gwinfo.gw_id = gw->gw_id;
551 addr.data = gw->addr;
552 addr.size = gw->addr_len;
553 response.params.gwinfo.gw_add = addr;
554
555 encode_and_send(client, &response, client->radius_gwinfo);
556 }
557
mqtt_sn_do_ping(struct mqtt_sn_client * client)558 static void mqtt_sn_do_ping(struct mqtt_sn_client *client)
559 {
560 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PINGREQ};
561
562 switch (client->state) {
563 case MQTT_SN_CLIENT_ASLEEP:
564 /*
565 * From the spec regarding PINGREQ:
566 * ClientId: contains the client id; this field is optional
567 * and is included by a “sleeping” client when it goes to the
568 * “awake” state and is waiting for messages sent by the
569 * server/gateway, see Section 6.14 for further details.
570 */
571 p.params.pingreq.client_id.data = client->client_id.data;
572 p.params.pingreq.client_id.size = client->client_id.size;
573 case MQTT_SN_CLIENT_ACTIVE:
574 encode_and_send(client, &p, 0);
575 break;
576 default:
577 LOG_WRN("Can't ping in state %d", client->state);
578 break;
579 }
580 }
581
process_pubs(struct mqtt_sn_client * client,int64_t * next_cycle)582 static int process_pubs(struct mqtt_sn_client *client, int64_t *next_cycle)
583 {
584 struct mqtt_sn_publish *pub, *pubs;
585 const int64_t now = k_uptime_get();
586 int64_t next_attempt;
587 bool dup;
588
589 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->publish, pub, pubs, next) {
590 LOG_HEXDUMP_DBG(pub->topic->name, pub->topic->namelen,
591 "Processing publish for topic");
592 LOG_HEXDUMP_DBG(pub->pubdata, pub->datalen, "Processing publish data");
593
594 if (pub->con.last_attempt == 0) {
595 next_attempt = 0;
596 dup = false;
597 } else {
598 next_attempt = pub->con.last_attempt + T_RETRY_MSEC;
599 dup = true;
600 }
601
602 if (next_attempt <= now) {
603 switch (pub->topic->state) {
604 case MQTT_SN_TOPIC_STATE_REGISTERING:
605 case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
606 case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
607 LOG_INF("Can't publish; topic is not ready");
608 break;
609 case MQTT_SN_TOPIC_STATE_REGISTERED:
610 case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
611 if (!pub->con.retries--) {
612 LOG_WRN("PUB ran out of retries, disconnecting");
613 mqtt_sn_disconnect_internal(client);
614 return -ETIMEDOUT;
615 }
616 mqtt_sn_do_publish(client, pub, dup);
617 if (pub->qos == MQTT_SN_QOS_0 || pub->qos == MQTT_SN_QOS_M1) {
618 /* We are done, remove this */
619 mqtt_sn_publish_destroy(client, pub);
620 continue;
621 } else {
622 /* Wait for ack */
623 pub->con.last_attempt = now;
624 next_attempt = now + T_RETRY_MSEC;
625 }
626 break;
627 }
628 }
629
630 if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
631 *next_cycle = next_attempt;
632 }
633 }
634
635 LOG_DBG("next_cycle: %lld", *next_cycle);
636
637 return 0;
638 }
639
process_topics(struct mqtt_sn_client * client,int64_t * next_cycle)640 static int process_topics(struct mqtt_sn_client *client, int64_t *next_cycle)
641 {
642 struct mqtt_sn_topic *topic;
643 const int64_t now = k_uptime_get();
644 int64_t next_attempt;
645 bool dup;
646
647 SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
648 LOG_HEXDUMP_DBG(topic->name, topic->namelen, "Processing topic");
649
650 if (topic->con.last_attempt == 0) {
651 next_attempt = 0;
652 dup = false;
653 } else {
654 next_attempt = topic->con.last_attempt + T_RETRY_MSEC;
655 dup = true;
656 }
657
658 if (next_attempt <= now) {
659 switch (topic->state) {
660 case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
661 if (!topic->con.retries--) {
662 LOG_WRN("Topic ran out of retries, disconnecting");
663 mqtt_sn_disconnect_internal(client);
664 return -ETIMEDOUT;
665 }
666
667 mqtt_sn_do_subscribe(client, topic, dup);
668 topic->con.last_attempt = now;
669 next_attempt = now + T_RETRY_MSEC;
670 break;
671 case MQTT_SN_TOPIC_STATE_REGISTERING:
672 if (!topic->con.retries--) {
673 LOG_WRN("Topic ran out of retries, disconnecting");
674 mqtt_sn_disconnect_internal(client);
675 return -ETIMEDOUT;
676 }
677
678 mqtt_sn_do_register(client, topic);
679 topic->con.last_attempt = now;
680 next_attempt = now + T_RETRY_MSEC;
681 break;
682 case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
683 if (!topic->con.retries--) {
684 LOG_WRN("Topic ran out of retries, disconnecting");
685 mqtt_sn_disconnect_internal(client);
686 return -ETIMEDOUT;
687 }
688 mqtt_sn_do_unsubscribe(client, topic);
689 topic->con.last_attempt = now;
690 next_attempt = now + T_RETRY_MSEC;
691 break;
692 case MQTT_SN_TOPIC_STATE_REGISTERED:
693 case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
694 break;
695 }
696 }
697
698 if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
699 *next_cycle = next_attempt;
700 }
701 }
702
703 LOG_DBG("next_cycle: %lld", *next_cycle);
704
705 return 0;
706 }
707
process_ping(struct mqtt_sn_client * client,int64_t * next_cycle)708 static int process_ping(struct mqtt_sn_client *client, int64_t *next_cycle)
709 {
710 const int64_t now = k_uptime_get();
711 struct mqtt_sn_gateway *gw = NULL;
712 int64_t next_ping;
713
714 if (client->ping_retries == N_RETRY) {
715 /* Last ping was acked */
716 next_ping = client->last_ping + T_KEEPALIVE_MSEC;
717 } else {
718 next_ping = client->last_ping + T_RETRY_MSEC;
719 }
720
721 if (next_ping < now) {
722 if (!client->ping_retries--) {
723 LOG_WRN("Ping ran out of retries");
724 mqtt_sn_disconnect_internal(client);
725 SYS_SLIST_PEEK_HEAD_CONTAINER(&client->gateway, gw, next);
726 LOG_DBG("Removing non-responsive GW 0x%08x", gw->gw_id);
727 mqtt_sn_gw_destroy(client, gw);
728 return -ETIMEDOUT;
729 }
730
731 LOG_DBG("Sending PINGREQ");
732 mqtt_sn_do_ping(client);
733 client->last_ping = now;
734 next_ping = now + T_RETRY_MSEC;
735 }
736
737 if (*next_cycle == 0 || next_ping < *next_cycle) {
738 *next_cycle = next_ping;
739 }
740
741 LOG_DBG("next_cycle: %lld", *next_cycle);
742
743 return 0;
744 }
745
process_search(struct mqtt_sn_client * client,int64_t * next_cycle)746 static int process_search(struct mqtt_sn_client *client, int64_t *next_cycle)
747 {
748 const int64_t now = k_uptime_get();
749
750 LOG_DBG("ts_searchgw: %lld", client->ts_searchgw);
751 LOG_DBG("ts_gwinfo: %lld", client->ts_gwinfo);
752
753 if (client->ts_searchgw != 0 && client->ts_searchgw <= now) {
754 LOG_DBG("Sending SEARCHGW");
755 mqtt_sn_do_searchgw(client);
756 client->ts_searchgw = 0;
757 }
758
759 if (client->ts_gwinfo != 0 && client->ts_gwinfo <= now) {
760 LOG_DBG("Sending GWINFO");
761 mqtt_sn_do_gwinfo(client);
762 client->ts_gwinfo = 0;
763 }
764
765 if (*next_cycle == 0 || (client->ts_searchgw != 0 && client->ts_searchgw < *next_cycle)) {
766 *next_cycle = client->ts_searchgw;
767 }
768 if (*next_cycle == 0 || (client->ts_gwinfo != 0 && client->ts_gwinfo < *next_cycle)) {
769 *next_cycle = client->ts_gwinfo;
770 }
771
772 LOG_DBG("next_cycle: %lld", *next_cycle);
773
774 return 0;
775 }
776
process_advertise(struct mqtt_sn_client * client,int64_t * next_cycle)777 static int process_advertise(struct mqtt_sn_client *client, int64_t *next_cycle)
778 {
779 const int64_t now = k_uptime_get();
780 struct mqtt_sn_gateway *gw;
781 struct mqtt_sn_gateway *gw_next;
782
783 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->gateway, gw, gw_next, next) {
784 LOG_DBG("Checking if GW 0x%02x is old", gw->gw_id);
785 if (gw->adv_timer != -1 && gw->adv_timer <= now) {
786 LOG_DBG("Removing non-responsive GW 0x%08x", gw->gw_id);
787 if (client->gateway.head == &gw->next) {
788 mqtt_sn_disconnect(client);
789 }
790 mqtt_sn_gw_destroy(client, gw);
791 }
792 if (gw->adv_timer != -1 && (*next_cycle == 0 || gw->adv_timer < *next_cycle)) {
793 *next_cycle = gw->adv_timer;
794 }
795 }
796 LOG_DBG("next_cycle: %lld", *next_cycle);
797
798 return 0;
799 }
800
process_work(struct k_work * wrk)801 static void process_work(struct k_work *wrk)
802 {
803 struct mqtt_sn_client *client;
804 struct k_work_delayable *dwork;
805 int64_t next_cycle = 0;
806 int err;
807
808 dwork = k_work_delayable_from_work(wrk);
809 client = CONTAINER_OF(dwork, struct mqtt_sn_client, process_work);
810
811 LOG_DBG("Executing work of client %p in state %d at time %lld", client, client->state,
812 k_uptime_get());
813
814 /* Clean up old advertised gateways from list */
815 err = process_advertise(client, &next_cycle);
816 if (err) {
817 return;
818 }
819
820 /* Handle GW search process timers */
821 err = process_search(client, &next_cycle);
822 if (err) {
823 return;
824 }
825
826 if (client->state == MQTT_SN_CLIENT_ACTIVE) {
827 err = process_topics(client, &next_cycle);
828 if (err) {
829 return;
830 }
831
832 err = process_pubs(client, &next_cycle);
833 if (err) {
834 return;
835 }
836
837 err = process_ping(client, &next_cycle);
838 if (err) {
839 return;
840 }
841 }
842
843 if (next_cycle > 0) {
844 LOG_DBG("next_cycle: %lld", next_cycle);
845 k_work_schedule(dwork, K_MSEC(next_cycle - k_uptime_get()));
846 }
847 }
848
mqtt_sn_client_init(struct mqtt_sn_client * client,const struct mqtt_sn_data * client_id,struct mqtt_sn_transport * transport,mqtt_sn_evt_cb_t evt_cb,void * tx,size_t txsz,void * rx,size_t rxsz)849 int mqtt_sn_client_init(struct mqtt_sn_client *client, const struct mqtt_sn_data *client_id,
850 struct mqtt_sn_transport *transport, mqtt_sn_evt_cb_t evt_cb, void *tx,
851 size_t txsz, void *rx, size_t rxsz)
852 {
853 if (!client || !client_id || !transport || !evt_cb || !tx || !rx) {
854 return -EINVAL;
855 }
856
857 memset(client, 0, sizeof(*client));
858
859 client->client_id.data = client_id->data;
860 client->client_id.size = client_id->size;
861 client->transport = transport;
862 client->evt_cb = evt_cb;
863
864 net_buf_simple_init_with_data(&client->tx, tx, txsz);
865 net_buf_simple_reset(&client->tx);
866
867 net_buf_simple_init_with_data(&client->rx, rx, rxsz);
868 net_buf_simple_reset(&client->rx);
869
870 k_work_init_delayable(&client->process_work, process_work);
871
872 if (transport->init) {
873 transport->init(transport);
874 }
875
876 return 0;
877 }
878
mqtt_sn_client_deinit(struct mqtt_sn_client * client)879 void mqtt_sn_client_deinit(struct mqtt_sn_client *client)
880 {
881 if (!client) {
882 return;
883 }
884
885 mqtt_sn_publish_destroy_all(client);
886 mqtt_sn_topic_destroy_all(client);
887 mqtt_sn_gw_destroy_all(client);
888
889 if (client->transport && client->transport->deinit) {
890 client->transport->deinit(client->transport);
891 }
892
893 k_work_cancel_delayable(&client->process_work);
894 }
895
mqtt_sn_add_gw(struct mqtt_sn_client * client,uint8_t gw_id,struct mqtt_sn_data gw_addr)896 int mqtt_sn_add_gw(struct mqtt_sn_client *client, uint8_t gw_id, struct mqtt_sn_data gw_addr)
897 {
898 struct mqtt_sn_gateway *gw;
899
900 gw = mqtt_sn_gw_find_id(client, gw_id);
901
902 if (gw != NULL) {
903 mqtt_sn_gw_destroy(client, gw);
904 }
905
906 gw = mqtt_sn_gw_create(gw_id, -1, gw_addr);
907 if (!gw) {
908 return -ENOMEM;
909 }
910
911 sys_slist_append(&client->gateway, &gw->next);
912
913 return 0;
914 }
915
mqtt_sn_search(struct mqtt_sn_client * client,uint8_t radius)916 int mqtt_sn_search(struct mqtt_sn_client *client, uint8_t radius)
917 {
918 if (!client) {
919 return -EINVAL;
920 }
921 /* Set SEARCHGW transmission timer */
922 client->ts_searchgw = k_uptime_get() + (T_SEARCHGW_MSEC * sys_rand8_get() / 255);
923 k_work_schedule(&client->process_work, K_NO_WAIT);
924 LOG_DBG("Requested SEARCHGW for time %lld at time %lld", client->ts_searchgw,
925 k_uptime_get());
926
927 return 0;
928 }
929
mqtt_sn_connect(struct mqtt_sn_client * client,bool will,bool clean_session)930 int mqtt_sn_connect(struct mqtt_sn_client *client, bool will, bool clean_session)
931 {
932 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_CONNECT};
933
934 if (!client) {
935 return -EINVAL;
936 }
937
938 if (will && (!client->will_msg.data || !client->will_topic.data)) {
939 LOG_ERR("will set to true, but no will data in client");
940 return -EINVAL;
941 }
942
943 if (clean_session) {
944 mqtt_sn_topic_destroy_all(client);
945 }
946
947 p.params.connect.clean_session = clean_session;
948 p.params.connect.will = will;
949 p.params.connect.duration = CONFIG_MQTT_SN_KEEPALIVE;
950 p.params.connect.client_id.data = client->client_id.data;
951 p.params.connect.client_id.size = client->client_id.size;
952
953 client->last_ping = k_uptime_get();
954
955 return encode_and_send(client, &p, 0);
956 }
957
mqtt_sn_disconnect(struct mqtt_sn_client * client)958 int mqtt_sn_disconnect(struct mqtt_sn_client *client)
959 {
960 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
961 int err;
962
963 if (!client) {
964 return -EINVAL;
965 }
966
967 p.params.disconnect.duration = 0;
968
969 err = encode_and_send(client, &p, 0);
970 mqtt_sn_disconnect_internal(client);
971
972 return err;
973 }
974
mqtt_sn_sleep(struct mqtt_sn_client * client,uint16_t duration)975 int mqtt_sn_sleep(struct mqtt_sn_client *client, uint16_t duration)
976 {
977 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
978 int err;
979
980 if (!client || !duration) {
981 return -EINVAL;
982 }
983
984 p.params.disconnect.duration = duration;
985
986 err = encode_and_send(client, &p, 0);
987 mqtt_sn_sleep_internal(client);
988
989 return err;
990 }
991
mqtt_sn_subscribe(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name)992 int mqtt_sn_subscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
993 struct mqtt_sn_data *topic_name)
994 {
995 struct mqtt_sn_topic *topic;
996 int err;
997
998 if (!client || !topic_name || !topic_name->data || !topic_name->size) {
999 return -EINVAL;
1000 }
1001
1002 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1003 LOG_ERR("Cannot subscribe: not connected");
1004 return -ENOTCONN;
1005 }
1006
1007 topic = mqtt_sn_topic_find_name(client, topic_name);
1008 if (!topic) {
1009 topic = mqtt_sn_topic_create(topic_name);
1010 if (!topic) {
1011 return -ENOMEM;
1012 }
1013
1014 topic->qos = qos;
1015 topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBING;
1016 sys_slist_append(&client->topic, &topic->next);
1017 }
1018
1019 err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1020 if (err < 0) {
1021 return err;
1022 }
1023
1024 return 0;
1025 }
1026
mqtt_sn_unsubscribe(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name)1027 int mqtt_sn_unsubscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
1028 struct mqtt_sn_data *topic_name)
1029 {
1030 struct mqtt_sn_topic *topic;
1031 int err;
1032
1033 if (!client || !topic_name) {
1034 return -EINVAL;
1035 }
1036
1037 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1038 LOG_ERR("Cannot unsubscribe: not connected");
1039 return -ENOTCONN;
1040 }
1041
1042 topic = mqtt_sn_topic_find_name(client, topic_name);
1043 if (!topic) {
1044 LOG_HEXDUMP_ERR(topic_name->data, topic_name->size, "Topic not found");
1045 return -ENOENT;
1046 }
1047
1048 topic->state = MQTT_SN_TOPIC_STATE_UNSUBSCRIBING;
1049 mqtt_sn_con_init(&topic->con);
1050
1051 err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1052 if (err < 0) {
1053 return err;
1054 }
1055
1056 return 0;
1057 }
1058
mqtt_sn_publish(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name,bool retain,struct mqtt_sn_data * data)1059 int mqtt_sn_publish(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
1060 struct mqtt_sn_data *topic_name, bool retain, struct mqtt_sn_data *data)
1061 {
1062 struct mqtt_sn_publish *pub;
1063 struct mqtt_sn_topic *topic;
1064 int err;
1065
1066 if (!client || !topic_name) {
1067 return -EINVAL;
1068 }
1069
1070 if (qos == MQTT_SN_QOS_M1) {
1071 LOG_ERR("QoS -1 not supported");
1072 return -ENOTSUP;
1073 }
1074
1075 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
1076 LOG_ERR("Cannot publish: disconnected");
1077 return -ENOTCONN;
1078 }
1079
1080 topic = mqtt_sn_topic_find_name(client, topic_name);
1081 if (!topic) {
1082 topic = mqtt_sn_topic_create(topic_name);
1083 if (!topic) {
1084 return -ENOMEM;
1085 }
1086
1087 topic->qos = qos;
1088 topic->state = MQTT_SN_TOPIC_STATE_REGISTERING;
1089 sys_slist_append(&client->topic, &topic->next);
1090 }
1091
1092 pub = mqtt_sn_publish_create(data);
1093 if (!pub) {
1094 k_work_reschedule(&client->process_work, K_NO_WAIT);
1095 return -ENOMEM;
1096 }
1097
1098 pub->qos = qos;
1099 pub->retain = retain;
1100 pub->topic = topic;
1101
1102 sys_slist_append(&client->publish, &pub->next);
1103
1104 err = k_work_reschedule(&client->process_work, K_NO_WAIT);
1105 if (err < 0) {
1106 return err;
1107 }
1108
1109 return 0;
1110 }
1111
handle_advertise(struct mqtt_sn_client * client,struct mqtt_sn_param_advertise * p,struct mqtt_sn_data rx_addr)1112 static void handle_advertise(struct mqtt_sn_client *client, struct mqtt_sn_param_advertise *p,
1113 struct mqtt_sn_data rx_addr)
1114 {
1115 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_ADVERTISE};
1116 struct mqtt_sn_gateway *gw;
1117
1118 gw = mqtt_sn_gw_find_id(client, p->gw_id);
1119
1120 if (gw == NULL) {
1121 LOG_DBG("Creating GW 0x%02x with duration %d", p->gw_id, p->duration);
1122 gw = mqtt_sn_gw_create(p->gw_id, p->duration, rx_addr);
1123 if (!gw) {
1124 return;
1125 }
1126 sys_slist_append(&client->gateway, &gw->next);
1127 } else {
1128 LOG_DBG("Updating timer for GW 0x%02x with duration %d", p->gw_id, p->duration);
1129 gw->adv_timer =
1130 k_uptime_get() + (p->duration * CONFIG_MQTT_SN_LIB_N_ADV * MSEC_PER_SEC);
1131 }
1132
1133 k_work_schedule(&client->process_work, K_NO_WAIT);
1134 if (client->evt_cb) {
1135 client->evt_cb(client, &evt);
1136 }
1137 }
1138
handle_searchgw(struct mqtt_sn_client * client,struct mqtt_sn_param_searchgw * p)1139 static void handle_searchgw(struct mqtt_sn_client *client, struct mqtt_sn_param_searchgw *p)
1140 {
1141 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_SEARCHGW};
1142
1143 /* Increment SEARCHGW transmission timestamp if waiting */
1144 if (client->ts_searchgw != 0) {
1145 client->ts_searchgw = k_uptime_get() + (T_SEARCHGW_MSEC * sys_rand8_get() / 255);
1146 }
1147
1148 /* Set transmission timestamp to respond to SEARCHGW if we have a GW */
1149 if (sys_slist_len(&client->gateway) > 0) {
1150 client->ts_gwinfo = k_uptime_get() + (T_GWINFO_MSEC * sys_rand8_get() / 255);
1151 }
1152 client->radius_gwinfo = p->radius;
1153 k_work_schedule(&client->process_work, K_NO_WAIT);
1154
1155 if (client->evt_cb) {
1156 client->evt_cb(client, &evt);
1157 }
1158 }
1159
handle_gwinfo(struct mqtt_sn_client * client,struct mqtt_sn_param_gwinfo * p,struct mqtt_sn_data rx_addr)1160 static void handle_gwinfo(struct mqtt_sn_client *client, struct mqtt_sn_param_gwinfo *p,
1161 struct mqtt_sn_data rx_addr)
1162 {
1163 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_GWINFO};
1164 struct mqtt_sn_gateway *gw;
1165
1166 /* Clear SEARCHGW and GWINFO transmission if waiting */
1167 client->ts_searchgw = 0;
1168 client->ts_gwinfo = 0;
1169 k_work_schedule(&client->process_work, K_NO_WAIT);
1170
1171 /* Extract GW info and store */
1172 if (p->gw_add.size > 0) {
1173 rx_addr.data = p->gw_add.data;
1174 rx_addr.size = p->gw_add.size;
1175 } else {
1176 }
1177 gw = mqtt_sn_gw_create(p->gw_id, -1, rx_addr);
1178
1179 if (!gw) {
1180 return;
1181 }
1182
1183 sys_slist_append(&client->gateway, &gw->next);
1184
1185 if (client->evt_cb) {
1186 client->evt_cb(client, &evt);
1187 }
1188 }
1189
handle_connack(struct mqtt_sn_client * client,struct mqtt_sn_param_connack * p)1190 static void handle_connack(struct mqtt_sn_client *client, struct mqtt_sn_param_connack *p)
1191 {
1192 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_CONNECTED};
1193
1194 if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1195 LOG_INF("MQTT_SN client connected");
1196 switch (client->state) {
1197 case MQTT_SN_CLIENT_DISCONNECTED:
1198 case MQTT_SN_CLIENT_ASLEEP:
1199 case MQTT_SN_CLIENT_AWAKE:
1200 mqtt_sn_set_state(client, MQTT_SN_CLIENT_ACTIVE);
1201 if (client->evt_cb) {
1202 client->evt_cb(client, &evt);
1203 }
1204 client->ping_retries = N_RETRY;
1205 break;
1206 default:
1207 LOG_ERR("Client received CONNACK but was in state %d", client->state);
1208 return;
1209 }
1210 } else {
1211 LOG_WRN("CONNACK ret code %d", p->ret_code);
1212 mqtt_sn_disconnect_internal(client);
1213 }
1214
1215 k_work_schedule(&client->process_work, K_NO_WAIT);
1216 }
1217
handle_willtopicreq(struct mqtt_sn_client * client)1218 static void handle_willtopicreq(struct mqtt_sn_client *client)
1219 {
1220 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLTOPIC};
1221
1222 response.params.willtopic.qos = client->will_qos;
1223 response.params.willtopic.retain = client->will_retain;
1224 response.params.willtopic.topic.data = client->will_topic.data;
1225 response.params.willtopic.topic.size = client->will_topic.size;
1226
1227 encode_and_send(client, &response, 0);
1228 }
1229
handle_willmsgreq(struct mqtt_sn_client * client)1230 static void handle_willmsgreq(struct mqtt_sn_client *client)
1231 {
1232 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLMSG};
1233
1234 response.params.willmsg.msg.data = client->will_msg.data;
1235 response.params.willmsg.msg.size = client->will_msg.size;
1236
1237 encode_and_send(client, &response, 0);
1238 }
1239
handle_register(struct mqtt_sn_client * client,struct mqtt_sn_param_register * p)1240 static void handle_register(struct mqtt_sn_client *client, struct mqtt_sn_param_register *p)
1241 {
1242 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_REGACK};
1243 struct mqtt_sn_topic *topic;
1244
1245 topic = mqtt_sn_topic_create(&p->topic);
1246 if (!topic) {
1247 return;
1248 }
1249
1250 topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
1251 topic->topic_id = p->topic_id;
1252 topic->type = MQTT_SN_TOPIC_TYPE_NORMAL;
1253
1254 sys_slist_append(&client->topic, &topic->next);
1255
1256 response.params.regack.ret_code = MQTT_SN_CODE_ACCEPTED;
1257 response.params.regack.topic_id = p->topic_id;
1258 response.params.regack.msg_id = p->msg_id;
1259
1260 encode_and_send(client, &response, 0);
1261 }
1262
handle_regack(struct mqtt_sn_client * client,struct mqtt_sn_param_regack * p)1263 static void handle_regack(struct mqtt_sn_client *client, struct mqtt_sn_param_regack *p)
1264 {
1265 struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
1266
1267 if (!topic) {
1268 LOG_ERR("Can't REGACK, no topic found");
1269 return;
1270 }
1271
1272 if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1273 topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
1274 topic->topic_id = p->topic_id;
1275 } else {
1276 LOG_WRN("Gateway could not register topic ID %u, code %d", p->topic_id,
1277 p->ret_code);
1278 }
1279 }
1280
handle_publish(struct mqtt_sn_client * client,struct mqtt_sn_param_publish * p)1281 static void handle_publish(struct mqtt_sn_client *client, struct mqtt_sn_param_publish *p)
1282 {
1283 struct mqtt_sn_param response;
1284 struct mqtt_sn_evt evt = {.param.publish = {.data = p->data,
1285 .topic_id = p->topic_id,
1286 .topic_type = p->topic_type},
1287 .type = MQTT_SN_EVT_PUBLISH};
1288
1289 if (p->qos == MQTT_SN_QOS_1) {
1290 response.type = MQTT_SN_MSG_TYPE_PUBACK;
1291 response.params.puback.topic_id = p->topic_id;
1292 response.params.puback.msg_id = p->msg_id;
1293 response.params.puback.ret_code = MQTT_SN_CODE_ACCEPTED;
1294
1295 encode_and_send(client, &response, 0);
1296 } else if (p->qos == MQTT_SN_QOS_2) {
1297 response.type = MQTT_SN_MSG_TYPE_PUBREC;
1298 response.params.pubrec.msg_id = p->msg_id;
1299
1300 encode_and_send(client, &response, 0);
1301 }
1302
1303 if (client->evt_cb) {
1304 client->evt_cb(client, &evt);
1305 }
1306 }
1307
handle_puback(struct mqtt_sn_client * client,struct mqtt_sn_param_puback * p)1308 static void handle_puback(struct mqtt_sn_client *client, struct mqtt_sn_param_puback *p)
1309 {
1310 struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
1311
1312 if (!pub) {
1313 LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1314 return;
1315 }
1316
1317 mqtt_sn_publish_destroy(client, pub);
1318 }
1319
handle_pubrec(struct mqtt_sn_client * client,struct mqtt_sn_param_pubrec * p)1320 static void handle_pubrec(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrec *p)
1321 {
1322 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBREL};
1323 struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
1324
1325 if (!pub) {
1326 LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1327 return;
1328 }
1329
1330 pub->con.last_attempt = k_uptime_get();
1331 pub->con.retries = N_RETRY;
1332
1333 response.params.pubrel.msg_id = p->msg_id;
1334
1335 encode_and_send(client, &response, 0);
1336 }
1337
handle_pubrel(struct mqtt_sn_client * client,struct mqtt_sn_param_pubrel * p)1338 static void handle_pubrel(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrel *p)
1339 {
1340 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBCOMP};
1341
1342 response.params.pubcomp.msg_id = p->msg_id;
1343
1344 encode_and_send(client, &response, 0);
1345 }
1346
handle_pubcomp(struct mqtt_sn_client * client,struct mqtt_sn_param_pubcomp * p)1347 static void handle_pubcomp(struct mqtt_sn_client *client, struct mqtt_sn_param_pubcomp *p)
1348 {
1349 struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
1350
1351 if (!pub) {
1352 LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1353 return;
1354 }
1355
1356 mqtt_sn_publish_destroy(client, pub);
1357 }
1358
handle_suback(struct mqtt_sn_client * client,struct mqtt_sn_param_suback * p)1359 static void handle_suback(struct mqtt_sn_client *client, struct mqtt_sn_param_suback *p)
1360 {
1361 struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
1362
1363 if (!topic) {
1364 LOG_ERR("No matching SUBSCRIBE found for msg id %u", p->msg_id);
1365 return;
1366 }
1367
1368 if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1369 topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBED;
1370 topic->topic_id = p->topic_id;
1371 topic->qos = p->qos;
1372 } else {
1373 LOG_WRN("SUBACK with ret code %d", p->ret_code);
1374 }
1375 }
1376
handle_unsuback(struct mqtt_sn_client * client,struct mqtt_sn_param_unsuback * p)1377 static void handle_unsuback(struct mqtt_sn_client *client, struct mqtt_sn_param_unsuback *p)
1378 {
1379 struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
1380
1381 if (!topic || topic->state != MQTT_SN_TOPIC_STATE_UNSUBSCRIBING) {
1382 LOG_ERR("No matching UNSUBSCRIBE found for msg id %u", p->msg_id);
1383 return;
1384 }
1385
1386 mqtt_sn_topic_destroy(client, topic);
1387 }
1388
handle_pingreq(struct mqtt_sn_client * client)1389 static void handle_pingreq(struct mqtt_sn_client *client)
1390 {
1391 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PINGRESP};
1392
1393 encode_and_send(client, &response, 0);
1394 }
1395
handle_pingresp(struct mqtt_sn_client * client)1396 static void handle_pingresp(struct mqtt_sn_client *client)
1397 {
1398 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_PINGRESP};
1399
1400 if (client->evt_cb) {
1401 client->evt_cb(client, &evt);
1402 }
1403
1404 if (client->state == MQTT_SN_CLIENT_AWAKE) {
1405 mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
1406 }
1407
1408 client->ping_retries = N_RETRY;
1409 }
1410
handle_disconnect(struct mqtt_sn_client * client,struct mqtt_sn_param_disconnect * p)1411 static void handle_disconnect(struct mqtt_sn_client *client, struct mqtt_sn_param_disconnect *p)
1412 {
1413 LOG_INF("Received DISCONNECT");
1414 mqtt_sn_disconnect_internal(client);
1415 }
1416
handle_msg(struct mqtt_sn_client * client,struct mqtt_sn_data rx_addr)1417 static int handle_msg(struct mqtt_sn_client *client, struct mqtt_sn_data rx_addr)
1418 {
1419 int err;
1420 struct mqtt_sn_param p;
1421
1422 err = mqtt_sn_decode_msg(&client->rx, &p);
1423 if (err) {
1424 return err;
1425 }
1426
1427 LOG_INF("Got message of type %d", p.type);
1428
1429 switch (p.type) {
1430 case MQTT_SN_MSG_TYPE_ADVERTISE:
1431 handle_advertise(client, &p.params.advertise, rx_addr);
1432 break;
1433 case MQTT_SN_MSG_TYPE_SEARCHGW:
1434 handle_searchgw(client, &p.params.searchgw);
1435 break;
1436 case MQTT_SN_MSG_TYPE_GWINFO:
1437 handle_gwinfo(client, &p.params.gwinfo, rx_addr);
1438 break;
1439 case MQTT_SN_MSG_TYPE_CONNACK:
1440 handle_connack(client, &p.params.connack);
1441 break;
1442 case MQTT_SN_MSG_TYPE_WILLTOPICREQ:
1443 handle_willtopicreq(client);
1444 break;
1445 case MQTT_SN_MSG_TYPE_WILLMSGREQ:
1446 handle_willmsgreq(client);
1447 break;
1448 case MQTT_SN_MSG_TYPE_REGISTER:
1449 handle_register(client, &p.params.reg);
1450 break;
1451 case MQTT_SN_MSG_TYPE_REGACK:
1452 handle_regack(client, &p.params.regack);
1453 break;
1454 case MQTT_SN_MSG_TYPE_PUBLISH:
1455 handle_publish(client, &p.params.publish);
1456 break;
1457 case MQTT_SN_MSG_TYPE_PUBACK:
1458 handle_puback(client, &p.params.puback);
1459 break;
1460 case MQTT_SN_MSG_TYPE_PUBREC:
1461 handle_pubrec(client, &p.params.pubrec);
1462 break;
1463 case MQTT_SN_MSG_TYPE_PUBREL:
1464 handle_pubrel(client, &p.params.pubrel);
1465 break;
1466 case MQTT_SN_MSG_TYPE_PUBCOMP:
1467 handle_pubcomp(client, &p.params.pubcomp);
1468 break;
1469 case MQTT_SN_MSG_TYPE_SUBACK:
1470 handle_suback(client, &p.params.suback);
1471 break;
1472 case MQTT_SN_MSG_TYPE_UNSUBACK:
1473 handle_unsuback(client, &p.params.unsuback);
1474 break;
1475 case MQTT_SN_MSG_TYPE_PINGREQ:
1476 handle_pingreq(client);
1477 break;
1478 case MQTT_SN_MSG_TYPE_PINGRESP:
1479 handle_pingresp(client);
1480 break;
1481 case MQTT_SN_MSG_TYPE_DISCONNECT:
1482 handle_disconnect(client, &p.params.disconnect);
1483 break;
1484 case MQTT_SN_MSG_TYPE_WILLTOPICRESP:
1485 break;
1486 case MQTT_SN_MSG_TYPE_WILLMSGRESP:
1487 break;
1488 default:
1489 LOG_ERR("Unexpected message type %d", p.type);
1490 break;
1491 }
1492
1493 k_work_reschedule(&client->process_work, K_NO_WAIT);
1494
1495 return 0;
1496 }
1497
mqtt_sn_input(struct mqtt_sn_client * client)1498 int mqtt_sn_input(struct mqtt_sn_client *client)
1499 {
1500 ssize_t next_frame_size;
1501 char addr[CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE];
1502 struct mqtt_sn_data rx_addr = {.data = addr, .size = CONFIG_MQTT_SN_LIB_MAX_ADDR_SIZE};
1503 int err;
1504
1505 if (!client || !client->transport || !client->transport->recvfrom) {
1506 return -EINVAL;
1507 }
1508
1509 if (client->transport->poll) {
1510 next_frame_size = client->transport->poll(client);
1511 if (next_frame_size <= 0) {
1512 return next_frame_size;
1513 }
1514 }
1515
1516 net_buf_simple_reset(&client->rx);
1517
1518 next_frame_size = client->transport->recvfrom(client, client->rx.data, client->rx.size,
1519 (void *)rx_addr.data, &rx_addr.size);
1520 if (next_frame_size <= 0) {
1521 return next_frame_size;
1522 }
1523
1524 if (next_frame_size > client->rx.size) {
1525 return -ENOBUFS;
1526 }
1527
1528 client->rx.len = next_frame_size;
1529
1530 LOG_HEXDUMP_DBG(client->rx.data, client->rx.len, "Received data");
1531
1532 err = handle_msg(client, rx_addr);
1533
1534 if (err) {
1535 return err;
1536 }
1537
1538 /* Should be zero */
1539 return -client->rx.len;
1540 }
1541
mqtt_sn_get_topic_name(struct mqtt_sn_client * client,uint16_t id,struct mqtt_sn_data * topic_name)1542 int mqtt_sn_get_topic_name(struct mqtt_sn_client *client, uint16_t id,
1543 struct mqtt_sn_data *topic_name)
1544 {
1545 struct mqtt_sn_topic *topic;
1546
1547 if (!client || !topic_name) {
1548 return -EINVAL;
1549 }
1550
1551 SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
1552 if (topic->topic_id == id) {
1553 topic_name->data = (const uint8_t *)topic->name;
1554 topic_name->size = topic->namelen;
1555 return 0;
1556 }
1557 }
1558 return -ENOENT;
1559 }
1560