1 /*
2 * Copyright (c) 2022 grandcentrix GmbH
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /** @file mqtt_sn.c
8 *
9 * @brief MQTT-SN Client API Implementation.
10 */
11
12 #include "mqtt_sn_msg.h"
13
14 #include <zephyr/logging/log.h>
15 #include <zephyr/net/mqtt_sn.h>
16 LOG_MODULE_REGISTER(net_mqtt_sn, CONFIG_MQTT_SN_LOG_LEVEL);
17
18 #define MQTT_SN_NET_BUFS (CONFIG_MQTT_SN_LIB_MAX_PUBLISH)
19
20 NET_BUF_POOL_FIXED_DEFINE(mqtt_sn_messages, MQTT_SN_NET_BUFS, CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE,
21 0, NULL);
22
23 struct mqtt_sn_confirmable {
24 int64_t last_attempt;
25 uint16_t msg_id;
26 uint8_t retries;
27 };
28
29 struct mqtt_sn_publish {
30 struct mqtt_sn_confirmable con;
31 sys_snode_t next;
32 struct mqtt_sn_topic *topic;
33 uint8_t pubdata[CONFIG_MQTT_SN_LIB_MAX_PAYLOAD_SIZE];
34 size_t datalen;
35 enum mqtt_sn_qos qos;
36 bool retain;
37 };
38
39 enum mqtt_sn_topic_state {
40 MQTT_SN_TOPIC_STATE_REGISTERING,
41 MQTT_SN_TOPIC_STATE_REGISTERED,
42 MQTT_SN_TOPIC_STATE_SUBSCRIBING,
43 MQTT_SN_TOPIC_STATE_SUBSCRIBED,
44 MQTT_SN_TOPIC_STATE_UNSUBSCRIBING,
45 };
46
47 struct mqtt_sn_topic {
48 struct mqtt_sn_confirmable con;
49 sys_snode_t next;
50 char name[CONFIG_MQTT_SN_LIB_MAX_TOPIC_SIZE];
51 size_t namelen;
52 uint16_t topic_id;
53 enum mqtt_sn_qos qos;
54 enum mqtt_sn_topic_type type;
55 enum mqtt_sn_topic_state state;
56 };
57
58 K_MEM_SLAB_DEFINE_STATIC(publishes, sizeof(struct mqtt_sn_publish),
59 CONFIG_MQTT_SN_LIB_MAX_PUBLISH, 4);
60 K_MEM_SLAB_DEFINE_STATIC(topics, sizeof(struct mqtt_sn_topic), CONFIG_MQTT_SN_LIB_MAX_TOPICS, 4);
61
62 enum mqtt_sn_client_state {
63 MQTT_SN_CLIENT_DISCONNECTED,
64 MQTT_SN_CLIENT_ACTIVE,
65 MQTT_SN_CLIENT_ASLEEP,
66 MQTT_SN_CLIENT_AWAKE
67 };
68
mqtt_sn_set_state(struct mqtt_sn_client * client,enum mqtt_sn_client_state state)69 static void mqtt_sn_set_state(struct mqtt_sn_client *client, enum mqtt_sn_client_state state)
70 {
71 int prev_state = client->state;
72
73 client->state = state;
74 LOG_DBG("Client %p state (%d) -> (%d)", client, prev_state, state);
75 }
76
77 #define T_RETRY_MSEC (CONFIG_MQTT_SN_LIB_T_RETRY * MSEC_PER_SEC)
78 #define N_RETRY (CONFIG_MQTT_SN_LIB_N_RETRY)
79 #define T_KEEPALIVE_MSEC (CONFIG_MQTT_SN_KEEPALIVE * MSEC_PER_SEC)
80
next_msg_id(void)81 static uint16_t next_msg_id(void)
82 {
83 static uint16_t msg_id;
84
85 return ++msg_id;
86 }
87
encode_and_send(struct mqtt_sn_client * client,struct mqtt_sn_param * p)88 static int encode_and_send(struct mqtt_sn_client *client, struct mqtt_sn_param *p)
89 {
90 int err;
91
92 err = mqtt_sn_encode_msg(&client->tx, p);
93 if (err) {
94 goto end;
95 }
96
97 LOG_HEXDUMP_DBG(client->tx.data, client->tx.len, "Send message");
98
99 if (!client->transport->msg_send) {
100 LOG_ERR("Can't send: no callback");
101 err = -ENOTSUP;
102 goto end;
103 }
104
105 if (!client->tx.len) {
106 LOG_WRN("Can't send: empty");
107 err = -ENOMSG;
108 goto end;
109 }
110
111 err = client->transport->msg_send(client, client->tx.data, client->tx.len);
112 if (err) {
113 LOG_ERR("Error during send: %d", err);
114 goto end;
115 }
116
117 end:
118 net_buf_simple_reset(&client->tx);
119
120 return err;
121 }
122
mqtt_sn_con_init(struct mqtt_sn_confirmable * con)123 static void mqtt_sn_con_init(struct mqtt_sn_confirmable *con)
124 {
125 con->last_attempt = 0;
126 con->retries = N_RETRY;
127 con->msg_id = next_msg_id();
128 }
129
mqtt_sn_publish_destroy(struct mqtt_sn_client * client,struct mqtt_sn_publish * pub)130 static void mqtt_sn_publish_destroy(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub)
131 {
132 sys_slist_find_and_remove(&client->publish, &pub->next);
133 k_mem_slab_free(&publishes, (void *)pub);
134 }
135
mqtt_sn_publish_destroy_all(struct mqtt_sn_client * client)136 static void mqtt_sn_publish_destroy_all(struct mqtt_sn_client *client)
137 {
138 struct mqtt_sn_publish *pub;
139 sys_snode_t *next;
140
141 while ((next = sys_slist_get(&client->publish)) != NULL) {
142 pub = SYS_SLIST_CONTAINER(next, pub, next);
143 k_mem_slab_free(&publishes, (void *)pub);
144 }
145 }
146
mqtt_sn_publish_create(struct mqtt_sn_data * data)147 static struct mqtt_sn_publish *mqtt_sn_publish_create(struct mqtt_sn_data *data)
148 {
149 struct mqtt_sn_publish *pub;
150
151 if (k_mem_slab_alloc(&publishes, (void **)&pub, K_NO_WAIT)) {
152 LOG_ERR("Can't create PUB: no free slot");
153 return NULL;
154 }
155
156 memset(pub, 0, sizeof(*pub));
157
158 if (data && data->data && data->size) {
159 if (data->size > sizeof(pub->pubdata)) {
160 LOG_ERR("Can't create PUB: Too much data (%" PRIu16 ")", data->size);
161 return NULL;
162 }
163
164 memcpy(pub->pubdata, data->data, data->size);
165 pub->datalen = data->size;
166 }
167
168 mqtt_sn_con_init(&pub->con);
169
170 return pub;
171 }
172
mqtt_sn_publish_find_msg_id(struct mqtt_sn_client * client,uint16_t msg_id)173 static struct mqtt_sn_publish *mqtt_sn_publish_find_msg_id(struct mqtt_sn_client *client,
174 uint16_t msg_id)
175 {
176 struct mqtt_sn_publish *pub;
177
178 SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
179 if (pub->con.msg_id == msg_id) {
180 return pub;
181 }
182 }
183
184 return NULL;
185 }
186
mqtt_sn_publish_find_topic(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)187 static struct mqtt_sn_publish *mqtt_sn_publish_find_topic(struct mqtt_sn_client *client,
188 struct mqtt_sn_topic *topic)
189 {
190 struct mqtt_sn_publish *pub;
191
192 SYS_SLIST_FOR_EACH_CONTAINER(&client->publish, pub, next) {
193 if (pub->topic == topic) {
194 return pub;
195 }
196 }
197
198 return NULL;
199 }
200
mqtt_sn_topic_create(struct mqtt_sn_data * name)201 static struct mqtt_sn_topic *mqtt_sn_topic_create(struct mqtt_sn_data *name)
202 {
203 struct mqtt_sn_topic *topic;
204
205 if (k_mem_slab_alloc(&topics, (void **)&topic, K_NO_WAIT)) {
206 LOG_ERR("Can't create topic: no free slot");
207 return NULL;
208 }
209
210 memset(topic, 0, sizeof(*topic));
211
212 if (!name || !name->data || !name->size) {
213 LOG_ERR("Can't create topic with empty name");
214 return NULL;
215 }
216
217 if (name->size > sizeof(topic->name)) {
218 LOG_ERR("Can't create topic: name too long (%" PRIu16 ")", name->size);
219 return NULL;
220 }
221
222 memcpy(topic->name, name->data, name->size);
223 topic->namelen = name->size;
224
225 mqtt_sn_con_init(&topic->con);
226
227 return topic;
228 }
229
mqtt_sn_topic_find_name(struct mqtt_sn_client * client,struct mqtt_sn_data * topic_name)230 static struct mqtt_sn_topic *mqtt_sn_topic_find_name(struct mqtt_sn_client *client,
231 struct mqtt_sn_data *topic_name)
232 {
233 struct mqtt_sn_topic *topic;
234
235 SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
236 if (topic->namelen == topic_name->size &&
237 memcmp(topic->name, topic_name->data, topic_name->size) == 0) {
238 return topic;
239 }
240 }
241
242 return NULL;
243 }
244
mqtt_sn_topic_find_msg_id(struct mqtt_sn_client * client,uint16_t msg_id)245 static struct mqtt_sn_topic *mqtt_sn_topic_find_msg_id(struct mqtt_sn_client *client,
246 uint16_t msg_id)
247 {
248 struct mqtt_sn_topic *topic;
249
250 SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
251 if (topic->con.msg_id == msg_id) {
252 return topic;
253 }
254 }
255
256 return NULL;
257 }
258
mqtt_sn_topic_destroy(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)259 static void mqtt_sn_topic_destroy(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
260 {
261 struct mqtt_sn_publish *pub;
262
263 /* Destroy all pubs referencing this topic */
264 while ((pub = mqtt_sn_publish_find_topic(client, topic)) != NULL) {
265 LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
266 mqtt_sn_publish_destroy(client, pub);
267 }
268
269 sys_slist_find_and_remove(&client->topic, &topic->next);
270 }
271
mqtt_sn_topic_destroy_all(struct mqtt_sn_client * client)272 static void mqtt_sn_topic_destroy_all(struct mqtt_sn_client *client)
273 {
274 struct mqtt_sn_topic *topic;
275 struct mqtt_sn_publish *pub;
276 sys_snode_t *next;
277
278 while ((next = sys_slist_get(&client->topic)) != NULL) {
279 topic = SYS_SLIST_CONTAINER(next, topic, next);
280 /* Destroy all pubs referencing this topic */
281 while ((pub = mqtt_sn_publish_find_topic(client, topic)) != NULL) {
282 LOG_WRN("Destroying publish msg_id %d", pub->con.msg_id);
283 mqtt_sn_publish_destroy(client, pub);
284 }
285
286 k_mem_slab_free(&topics, (void *)topic);
287 }
288 }
289
mqtt_sn_disconnect_internal(struct mqtt_sn_client * client)290 static void mqtt_sn_disconnect_internal(struct mqtt_sn_client *client)
291 {
292 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
293
294 mqtt_sn_set_state(client, MQTT_SN_CLIENT_DISCONNECTED);
295 if (client->evt_cb) {
296 client->evt_cb(client, &evt);
297 }
298
299 /*
300 * Remove all publishes, but keep topics
301 * Topics are removed on deinit or when connect is called with
302 * clean-session = true
303 */
304 mqtt_sn_publish_destroy_all(client);
305
306 k_work_cancel_delayable(&client->process_work);
307 }
308
mqtt_sn_sleep_internal(struct mqtt_sn_client * client)309 static void mqtt_sn_sleep_internal(struct mqtt_sn_client *client)
310 {
311 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_DISCONNECTED};
312
313 mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
314 if (client->evt_cb) {
315 client->evt_cb(client, &evt);
316 }
317 }
318
mqtt_sn_do_subscribe(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic,bool dup)319 static void mqtt_sn_do_subscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic,
320 bool dup)
321 {
322 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_SUBSCRIBE};
323
324 if (!client || !topic) {
325 return;
326 }
327
328 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
329 LOG_ERR("Cannot subscribe: not connected");
330 return;
331 }
332
333 p.params.subscribe.msg_id = topic->con.msg_id;
334 p.params.subscribe.qos = topic->qos;
335 p.params.subscribe.topic_type = topic->type;
336 p.params.subscribe.dup = dup;
337 switch (topic->type) {
338 case MQTT_SN_TOPIC_TYPE_NORMAL:
339 p.params.subscribe.topic.topic_name.data = topic->name;
340 p.params.subscribe.topic.topic_name.size = topic->namelen;
341 break;
342 case MQTT_SN_TOPIC_TYPE_PREDEF:
343 case MQTT_SN_TOPIC_TYPE_SHORT:
344 p.params.subscribe.topic.topic_id = topic->topic_id;
345 break;
346 default:
347 LOG_ERR("Unexpected topic type %d", topic->type);
348 return;
349 }
350
351 encode_and_send(client, &p);
352 }
353
mqtt_sn_do_unsubscribe(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)354 static void mqtt_sn_do_unsubscribe(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
355 {
356 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_UNSUBSCRIBE};
357
358 if (!client || !topic) {
359 return;
360 }
361
362 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
363 LOG_ERR("Cannot unsubscribe: not connected");
364 return;
365 }
366
367 p.params.unsubscribe.msg_id = topic->con.msg_id;
368 p.params.unsubscribe.topic_type = topic->type;
369 switch (topic->type) {
370 case MQTT_SN_TOPIC_TYPE_NORMAL:
371 p.params.unsubscribe.topic.topic_name.data = topic->name;
372 p.params.unsubscribe.topic.topic_name.size = topic->namelen;
373 break;
374 case MQTT_SN_TOPIC_TYPE_PREDEF:
375 case MQTT_SN_TOPIC_TYPE_SHORT:
376 p.params.unsubscribe.topic.topic_id = topic->topic_id;
377 break;
378 default:
379 LOG_ERR("Unexpected topic type %d", topic->type);
380 return;
381 }
382
383 encode_and_send(client, &p);
384 }
385
mqtt_sn_do_register(struct mqtt_sn_client * client,struct mqtt_sn_topic * topic)386 static void mqtt_sn_do_register(struct mqtt_sn_client *client, struct mqtt_sn_topic *topic)
387 {
388 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_REGISTER};
389
390 if (!client || !topic) {
391 return;
392 }
393
394 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
395 LOG_ERR("Cannot register: not connected");
396 return;
397 }
398
399 p.params.reg.msg_id = topic->con.msg_id;
400 switch (topic->type) {
401 case MQTT_SN_TOPIC_TYPE_NORMAL:
402 LOG_HEXDUMP_INF(topic->name, topic->namelen, "Registering topic");
403 p.params.reg.topic.data = topic->name;
404 p.params.reg.topic.size = topic->namelen;
405 break;
406 default:
407 LOG_ERR("Unexpected topic type %d", topic->type);
408 return;
409 }
410
411 encode_and_send(client, &p);
412 }
413
mqtt_sn_do_publish(struct mqtt_sn_client * client,struct mqtt_sn_publish * pub,bool dup)414 static void mqtt_sn_do_publish(struct mqtt_sn_client *client, struct mqtt_sn_publish *pub, bool dup)
415 {
416 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PUBLISH};
417
418 if (!client || !pub) {
419 return;
420 }
421
422 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
423 LOG_ERR("Cannot subscribe: not connected");
424 return;
425 }
426
427 LOG_INF("Publishing to topic ID %d", pub->topic->topic_id);
428
429 p.params.publish.data.data = pub->pubdata;
430 p.params.publish.data.size = pub->datalen;
431 p.params.publish.msg_id = pub->con.msg_id;
432 p.params.publish.retain = pub->retain;
433 p.params.publish.topic_id = pub->topic->topic_id;
434 p.params.publish.topic_type = pub->topic->type;
435 p.params.publish.qos = pub->qos;
436 p.params.publish.dup = dup;
437
438 encode_and_send(client, &p);
439 }
440
mqtt_sn_do_ping(struct mqtt_sn_client * client)441 static void mqtt_sn_do_ping(struct mqtt_sn_client *client)
442 {
443 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_PINGREQ};
444
445 switch (client->state) {
446 case MQTT_SN_CLIENT_ASLEEP:
447 /*
448 * From the spec regarding PINGREQ:
449 * ClientId: contains the client id; this field is optional
450 * and is included by a “sleeping” client when it goes to the
451 * “awake” state and is waiting for messages sent by the
452 * server/gateway, see Section 6.14 for further details.
453 */
454 p.params.pingreq.client_id.data = client->client_id.data;
455 p.params.pingreq.client_id.size = client->client_id.size;
456 case MQTT_SN_CLIENT_ACTIVE:
457 encode_and_send(client, &p);
458 break;
459 default:
460 LOG_WRN("Can't ping in state %d", client->state);
461 break;
462 }
463 }
464
process_pubs(struct mqtt_sn_client * client,int64_t * next_cycle)465 static int process_pubs(struct mqtt_sn_client *client, int64_t *next_cycle)
466 {
467 struct mqtt_sn_publish *pub, *pubs;
468 const int64_t now = k_uptime_get();
469 int64_t next_attempt;
470 bool dup;
471
472 *next_cycle = 0;
473
474 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&client->publish, pub, pubs, next) {
475 LOG_HEXDUMP_DBG(pub->topic->name, pub->topic->namelen,
476 "Processing publish for topic");
477 LOG_HEXDUMP_DBG(pub->pubdata, pub->datalen, "Processing publish data");
478
479 if (pub->con.last_attempt == 0) {
480 next_attempt = 0;
481 dup = false;
482 } else {
483 next_attempt = pub->con.last_attempt + T_RETRY_MSEC;
484 dup = true;
485 }
486
487 if (next_attempt <= now) {
488 switch (pub->topic->state) {
489 case MQTT_SN_TOPIC_STATE_REGISTERING:
490 case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
491 case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
492 LOG_INF("Can't publish; topic is not ready");
493 break;
494 case MQTT_SN_TOPIC_STATE_REGISTERED:
495 case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
496 if (!pub->con.retries--) {
497 LOG_WRN("PUB ran out of retries, disconnecting");
498 mqtt_sn_disconnect_internal(client);
499 return -ETIMEDOUT;
500 }
501 mqtt_sn_do_publish(client, pub, dup);
502 if (pub->qos == MQTT_SN_QOS_0 || pub->qos == MQTT_SN_QOS_M1) {
503 /* We are done, remove this */
504 mqtt_sn_publish_destroy(client, pub);
505 continue;
506 } else {
507 /* Wait for ack */
508 pub->con.last_attempt = now;
509 next_attempt = now + T_RETRY_MSEC;
510 }
511 break;
512 }
513 }
514
515 if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
516 *next_cycle = next_attempt;
517 }
518 }
519
520 return 0;
521 }
522
process_topics(struct mqtt_sn_client * client,int64_t * next_cycle)523 static int process_topics(struct mqtt_sn_client *client, int64_t *next_cycle)
524 {
525 struct mqtt_sn_topic *topic;
526 const int64_t now = k_uptime_get();
527 int64_t next_attempt;
528 bool dup;
529
530 SYS_SLIST_FOR_EACH_CONTAINER(&client->topic, topic, next) {
531 LOG_HEXDUMP_DBG(topic->name, topic->namelen, "Processing topic");
532
533 if (topic->con.last_attempt == 0) {
534 next_attempt = 0;
535 dup = false;
536 } else {
537 next_attempt = topic->con.last_attempt + T_RETRY_MSEC;
538 dup = true;
539 }
540
541 if (next_attempt <= now) {
542 switch (topic->state) {
543 case MQTT_SN_TOPIC_STATE_SUBSCRIBING:
544 if (!topic->con.retries--) {
545 LOG_WRN("Topic ran out of retries, disconnecting");
546 mqtt_sn_disconnect_internal(client);
547 return -ETIMEDOUT;
548 }
549
550 mqtt_sn_do_subscribe(client, topic, dup);
551 topic->con.last_attempt = now;
552 next_attempt = now + T_RETRY_MSEC;
553 break;
554 case MQTT_SN_TOPIC_STATE_REGISTERING:
555 if (!topic->con.retries--) {
556 LOG_WRN("Topic ran out of retries, disconnecting");
557 mqtt_sn_disconnect_internal(client);
558 return -ETIMEDOUT;
559 }
560
561 mqtt_sn_do_register(client, topic);
562 topic->con.last_attempt = now;
563 next_attempt = now + T_RETRY_MSEC;
564 break;
565 case MQTT_SN_TOPIC_STATE_UNSUBSCRIBING:
566 if (!topic->con.retries--) {
567 LOG_WRN("Topic ran out of retries, disconnecting");
568 mqtt_sn_disconnect_internal(client);
569 return -ETIMEDOUT;
570 }
571 mqtt_sn_do_unsubscribe(client, topic);
572 topic->con.last_attempt = now;
573 next_attempt = now + T_RETRY_MSEC;
574 break;
575 case MQTT_SN_TOPIC_STATE_REGISTERED:
576 case MQTT_SN_TOPIC_STATE_SUBSCRIBED:
577 break;
578 }
579 }
580
581 if (next_attempt > now && (*next_cycle == 0 || next_attempt < *next_cycle)) {
582 *next_cycle = next_attempt;
583 }
584 }
585
586 return 0;
587 }
588
process_ping(struct mqtt_sn_client * client,int64_t * next_cycle)589 static int process_ping(struct mqtt_sn_client *client, int64_t *next_cycle)
590 {
591 const int64_t now = k_uptime_get();
592 int64_t next_ping;
593
594 if (client->ping_retries == N_RETRY) {
595 /* Last ping was acked */
596 next_ping = client->last_ping + T_KEEPALIVE_MSEC;
597 } else {
598 next_ping = client->last_ping + T_RETRY_MSEC;
599 }
600
601 if (next_ping < now) {
602 if (!client->ping_retries--) {
603 LOG_WRN("Ping ran out of retries");
604 mqtt_sn_disconnect_internal(client);
605 return -ETIMEDOUT;
606 }
607
608 LOG_DBG("Sending PINGREQ");
609 mqtt_sn_do_ping(client);
610 client->last_ping = now;
611 next_ping = now + T_RETRY_MSEC;
612 }
613
614 if (*next_cycle == 0 || next_ping < *next_cycle) {
615 *next_cycle = next_ping;
616 }
617
618 return 0;
619 }
620
process_work(struct k_work * wrk)621 static void process_work(struct k_work *wrk)
622 {
623 struct mqtt_sn_client *client;
624 struct k_work_delayable *dwork;
625 int64_t next_cycle = 0;
626 int err;
627
628 dwork = k_work_delayable_from_work(wrk);
629 client = CONTAINER_OF(dwork, struct mqtt_sn_client, process_work);
630
631 LOG_DBG("Executing work of client %p in state %d", client, client->state);
632
633 if (client->state == MQTT_SN_CLIENT_DISCONNECTED) {
634 LOG_WRN("%s called while disconnected: Nothing to do", __func__);
635 return;
636 }
637
638 if (client->state == MQTT_SN_CLIENT_ACTIVE) {
639 err = process_topics(client, &next_cycle);
640 if (err) {
641 return;
642 }
643
644 err = process_pubs(client, &next_cycle);
645 if (err) {
646 return;
647 }
648
649 err = process_ping(client, &next_cycle);
650 if (err) {
651 return;
652 }
653 }
654
655 if (next_cycle > 0) {
656 k_work_schedule(dwork, K_MSEC(next_cycle - k_uptime_get()));
657 }
658 }
659
mqtt_sn_client_init(struct mqtt_sn_client * client,const struct mqtt_sn_data * client_id,struct mqtt_sn_transport * transport,mqtt_sn_evt_cb_t evt_cb,void * tx,size_t txsz,void * rx,size_t rxsz)660 int mqtt_sn_client_init(struct mqtt_sn_client *client, const struct mqtt_sn_data *client_id,
661 struct mqtt_sn_transport *transport, mqtt_sn_evt_cb_t evt_cb, void *tx,
662 size_t txsz, void *rx, size_t rxsz)
663 {
664 if (!client || !client_id || !transport || !evt_cb || !tx || !rx) {
665 return -EINVAL;
666 }
667
668 memset(client, 0, sizeof(*client));
669
670 client->client_id.data = client_id->data;
671 client->client_id.size = client_id->size;
672 client->transport = transport;
673 client->evt_cb = evt_cb;
674
675 net_buf_simple_init_with_data(&client->tx, tx, txsz);
676 net_buf_simple_reset(&client->tx);
677
678 net_buf_simple_init_with_data(&client->rx, rx, rxsz);
679 net_buf_simple_reset(&client->rx);
680
681 k_work_init_delayable(&client->process_work, process_work);
682
683 if (transport->init) {
684 transport->init(transport);
685 }
686
687 return 0;
688 }
689
mqtt_sn_client_deinit(struct mqtt_sn_client * client)690 void mqtt_sn_client_deinit(struct mqtt_sn_client *client)
691 {
692 if (!client) {
693 return;
694 }
695
696 mqtt_sn_publish_destroy_all(client);
697 mqtt_sn_topic_destroy_all(client);
698
699 if (client->transport && client->transport->deinit) {
700 client->transport->deinit(client->transport);
701 }
702
703 k_work_cancel_delayable(&client->process_work);
704 }
705
mqtt_sn_connect(struct mqtt_sn_client * client,bool will,bool clean_session)706 int mqtt_sn_connect(struct mqtt_sn_client *client, bool will, bool clean_session)
707 {
708 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_CONNECT};
709
710 if (!client) {
711 return -EINVAL;
712 }
713
714 if (will && (!client->will_msg.data || !client->will_topic.data)) {
715 LOG_ERR("will set to true, but no will data in client");
716 return -EINVAL;
717 }
718
719 if (clean_session) {
720 mqtt_sn_topic_destroy_all(client);
721 }
722
723 p.params.connect.clean_session = clean_session;
724 p.params.connect.will = will;
725 p.params.connect.duration = CONFIG_MQTT_SN_KEEPALIVE;
726 p.params.connect.client_id.data = client->client_id.data;
727 p.params.connect.client_id.size = client->client_id.size;
728
729 client->last_ping = k_uptime_get();
730
731 return encode_and_send(client, &p);
732 }
733
mqtt_sn_disconnect(struct mqtt_sn_client * client)734 int mqtt_sn_disconnect(struct mqtt_sn_client *client)
735 {
736 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
737 int err;
738
739 if (!client) {
740 return -EINVAL;
741 }
742
743 p.params.disconnect.duration = 0;
744
745 err = encode_and_send(client, &p);
746 mqtt_sn_disconnect_internal(client);
747
748 return err;
749 }
750
mqtt_sn_sleep(struct mqtt_sn_client * client,uint16_t duration)751 int mqtt_sn_sleep(struct mqtt_sn_client *client, uint16_t duration)
752 {
753 struct mqtt_sn_param p = {.type = MQTT_SN_MSG_TYPE_DISCONNECT};
754 int err;
755
756 if (!client || !duration) {
757 return -EINVAL;
758 }
759
760 p.params.disconnect.duration = duration;
761
762 err = encode_and_send(client, &p);
763 mqtt_sn_sleep_internal(client);
764
765 return err;
766 }
767
mqtt_sn_subscribe(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name)768 int mqtt_sn_subscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
769 struct mqtt_sn_data *topic_name)
770 {
771 struct mqtt_sn_topic *topic;
772 int err;
773
774 if (!client || !topic_name || !topic_name->data || !topic_name->size) {
775 return -EINVAL;
776 }
777
778 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
779 LOG_ERR("Cannot subscribe: not connected");
780 return -ENOTCONN;
781 }
782
783 topic = mqtt_sn_topic_find_name(client, topic_name);
784 if (!topic) {
785 topic = mqtt_sn_topic_create(topic_name);
786 if (!topic) {
787 return -ENOMEM;
788 }
789
790 topic->qos = qos;
791 topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBING;
792 sys_slist_append(&client->topic, &topic->next);
793 }
794
795 err = k_work_reschedule(&client->process_work, K_NO_WAIT);
796 if (err < 0) {
797 return err;
798 }
799
800 return 0;
801 }
802
mqtt_sn_unsubscribe(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name)803 int mqtt_sn_unsubscribe(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
804 struct mqtt_sn_data *topic_name)
805 {
806 struct mqtt_sn_topic *topic;
807 int err;
808
809 if (!client || !topic_name) {
810 return -EINVAL;
811 }
812
813 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
814 LOG_ERR("Cannot unsubscribe: not connected");
815 return -ENOTCONN;
816 }
817
818 topic = mqtt_sn_topic_find_name(client, topic_name);
819 if (!topic) {
820 LOG_HEXDUMP_ERR(topic_name->data, topic_name->size, "Topic not found");
821 return -ENOENT;
822 }
823
824 topic->state = MQTT_SN_TOPIC_STATE_UNSUBSCRIBING;
825 mqtt_sn_con_init(&topic->con);
826
827 err = k_work_reschedule(&client->process_work, K_NO_WAIT);
828 if (err < 0) {
829 return err;
830 }
831
832 return 0;
833 }
834
mqtt_sn_publish(struct mqtt_sn_client * client,enum mqtt_sn_qos qos,struct mqtt_sn_data * topic_name,bool retain,struct mqtt_sn_data * data)835 int mqtt_sn_publish(struct mqtt_sn_client *client, enum mqtt_sn_qos qos,
836 struct mqtt_sn_data *topic_name, bool retain, struct mqtt_sn_data *data)
837 {
838 struct mqtt_sn_publish *pub;
839 struct mqtt_sn_topic *topic;
840 int err;
841
842 if (!client || !topic_name) {
843 return -EINVAL;
844 }
845
846 if (qos == MQTT_SN_QOS_M1) {
847 LOG_ERR("QoS -1 not supported");
848 return -ENOTSUP;
849 }
850
851 if (client->state != MQTT_SN_CLIENT_ACTIVE) {
852 LOG_ERR("Cannot publish: disconnected");
853 return -ENOTCONN;
854 }
855
856 topic = mqtt_sn_topic_find_name(client, topic_name);
857 if (!topic) {
858 topic = mqtt_sn_topic_create(topic_name);
859 if (!topic) {
860 return -ENOMEM;
861 }
862
863 topic->qos = qos;
864 topic->state = MQTT_SN_TOPIC_STATE_REGISTERING;
865 sys_slist_append(&client->topic, &topic->next);
866 }
867
868 pub = mqtt_sn_publish_create(data);
869 if (!pub) {
870 k_work_reschedule(&client->process_work, K_NO_WAIT);
871 return -ENOMEM;
872 }
873
874 pub->qos = qos;
875 pub->retain = retain;
876 pub->topic = topic;
877
878 sys_slist_append(&client->publish, &pub->next);
879
880 err = k_work_reschedule(&client->process_work, K_NO_WAIT);
881 if (err < 0) {
882 return err;
883 }
884
885 return 0;
886 }
887
handle_connack(struct mqtt_sn_client * client,struct mqtt_sn_param_connack * p)888 static void handle_connack(struct mqtt_sn_client *client, struct mqtt_sn_param_connack *p)
889 {
890 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_CONNECTED};
891
892 if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
893 LOG_INF("MQTT_SN client connected");
894 switch (client->state) {
895 case MQTT_SN_CLIENT_DISCONNECTED:
896 case MQTT_SN_CLIENT_ASLEEP:
897 case MQTT_SN_CLIENT_AWAKE:
898 mqtt_sn_set_state(client, MQTT_SN_CLIENT_ACTIVE);
899 if (client->evt_cb) {
900 client->evt_cb(client, &evt);
901 }
902 client->ping_retries = N_RETRY;
903 break;
904 default:
905 LOG_ERR("Client received CONNACK but was in state %d", client->state);
906 return;
907 }
908 } else {
909 LOG_WRN("CONNACK ret code %d", p->ret_code);
910 mqtt_sn_disconnect_internal(client);
911 }
912
913 k_work_schedule(&client->process_work, K_NO_WAIT);
914 }
915
handle_willtopicreq(struct mqtt_sn_client * client)916 static void handle_willtopicreq(struct mqtt_sn_client *client)
917 {
918 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLTOPIC};
919
920 response.params.willtopic.qos = client->will_qos;
921 response.params.willtopic.retain = client->will_retain;
922 response.params.willtopic.topic.data = client->will_topic.data;
923 response.params.willtopic.topic.size = client->will_topic.size;
924
925 encode_and_send(client, &response);
926 }
927
handle_willmsgreq(struct mqtt_sn_client * client)928 static void handle_willmsgreq(struct mqtt_sn_client *client)
929 {
930 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_WILLMSG};
931
932 response.params.willmsg.msg.data = client->will_msg.data;
933 response.params.willmsg.msg.size = client->will_msg.size;
934
935 encode_and_send(client, &response);
936 }
937
handle_register(struct mqtt_sn_client * client,struct mqtt_sn_param_register * p)938 static void handle_register(struct mqtt_sn_client *client, struct mqtt_sn_param_register *p)
939 {
940 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_REGACK};
941 struct mqtt_sn_topic *topic;
942
943 topic = mqtt_sn_topic_create(&p->topic);
944 if (!topic) {
945 return;
946 }
947
948 topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
949 topic->topic_id = p->topic_id;
950 topic->type = MQTT_SN_TOPIC_TYPE_NORMAL;
951
952 response.params.regack.ret_code = MQTT_SN_CODE_ACCEPTED;
953 response.params.regack.topic_id = p->topic_id;
954 response.params.regack.msg_id = p->msg_id;
955
956 encode_and_send(client, &response);
957 }
958
handle_regack(struct mqtt_sn_client * client,struct mqtt_sn_param_regack * p)959 static void handle_regack(struct mqtt_sn_client *client, struct mqtt_sn_param_regack *p)
960 {
961 struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
962
963 if (!topic) {
964 LOG_ERR("Can't REGACK, no topic found");
965 return;
966 }
967
968 if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
969 topic->state = MQTT_SN_TOPIC_STATE_REGISTERED;
970 topic->topic_id = p->topic_id;
971 } else {
972 LOG_WRN("Gateway could not register topic ID %u, code %d", p->topic_id,
973 p->ret_code);
974 }
975 }
976
handle_publish(struct mqtt_sn_client * client,struct mqtt_sn_param_publish * p)977 static void handle_publish(struct mqtt_sn_client *client, struct mqtt_sn_param_publish *p)
978 {
979 struct mqtt_sn_param response;
980 struct mqtt_sn_evt evt = {.param.publish = {.data = p->data,
981 .topic_id = p->topic_id,
982 .topic_type = p->topic_type},
983 .type = MQTT_SN_EVT_PUBLISH};
984
985 if (p->qos == MQTT_SN_QOS_1) {
986 response.type = MQTT_SN_MSG_TYPE_PUBACK;
987 response.params.puback.topic_id = p->topic_id;
988 response.params.puback.msg_id = p->msg_id;
989 response.params.puback.ret_code = MQTT_SN_CODE_ACCEPTED;
990
991 encode_and_send(client, &response);
992 } else if (p->qos == MQTT_SN_QOS_2) {
993 response.type = MQTT_SN_MSG_TYPE_PUBREC;
994 response.params.pubrec.msg_id = p->msg_id;
995
996 encode_and_send(client, &response);
997 }
998
999 if (client->evt_cb) {
1000 client->evt_cb(client, &evt);
1001 }
1002 }
1003
handle_puback(struct mqtt_sn_client * client,struct mqtt_sn_param_puback * p)1004 static void handle_puback(struct mqtt_sn_client *client, struct mqtt_sn_param_puback *p)
1005 {
1006 struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
1007
1008 if (!pub) {
1009 LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1010 return;
1011 }
1012
1013 mqtt_sn_publish_destroy(client, pub);
1014 }
1015
handle_pubrec(struct mqtt_sn_client * client,struct mqtt_sn_param_pubrec * p)1016 static void handle_pubrec(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrec *p)
1017 {
1018 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBREL};
1019 struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
1020
1021 if (!pub) {
1022 LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1023 return;
1024 }
1025
1026 pub->con.last_attempt = k_uptime_get();
1027 pub->con.retries = N_RETRY;
1028
1029 response.params.pubrel.msg_id = p->msg_id;
1030
1031 encode_and_send(client, &response);
1032 }
1033
handle_pubrel(struct mqtt_sn_client * client,struct mqtt_sn_param_pubrel * p)1034 static void handle_pubrel(struct mqtt_sn_client *client, struct mqtt_sn_param_pubrel *p)
1035 {
1036 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PUBCOMP};
1037
1038 response.params.pubcomp.msg_id = p->msg_id;
1039
1040 encode_and_send(client, &response);
1041 }
1042
handle_pubcomp(struct mqtt_sn_client * client,struct mqtt_sn_param_pubcomp * p)1043 static void handle_pubcomp(struct mqtt_sn_client *client, struct mqtt_sn_param_pubcomp *p)
1044 {
1045 struct mqtt_sn_publish *pub = mqtt_sn_publish_find_msg_id(client, p->msg_id);
1046
1047 if (!pub) {
1048 LOG_ERR("No matching PUBLISH found for msg id %u", p->msg_id);
1049 return;
1050 }
1051
1052 mqtt_sn_publish_destroy(client, pub);
1053 }
1054
handle_suback(struct mqtt_sn_client * client,struct mqtt_sn_param_suback * p)1055 static void handle_suback(struct mqtt_sn_client *client, struct mqtt_sn_param_suback *p)
1056 {
1057 struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
1058
1059 if (!topic) {
1060 LOG_ERR("No matching SUBSCRIBE found for msg id %u", p->msg_id);
1061 return;
1062 }
1063
1064 if (p->ret_code == MQTT_SN_CODE_ACCEPTED) {
1065 topic->state = MQTT_SN_TOPIC_STATE_SUBSCRIBED;
1066 topic->topic_id = p->topic_id;
1067 topic->qos = p->qos;
1068 } else {
1069 LOG_WRN("SUBACK with ret code %d", p->ret_code);
1070 }
1071 }
1072
handle_unsuback(struct mqtt_sn_client * client,struct mqtt_sn_param_unsuback * p)1073 static void handle_unsuback(struct mqtt_sn_client *client, struct mqtt_sn_param_unsuback *p)
1074 {
1075 struct mqtt_sn_topic *topic = mqtt_sn_topic_find_msg_id(client, p->msg_id);
1076
1077 if (!topic || topic->state != MQTT_SN_TOPIC_STATE_UNSUBSCRIBING) {
1078 LOG_ERR("No matching UNSUBSCRIBE found for msg id %u", p->msg_id);
1079 return;
1080 }
1081
1082 mqtt_sn_topic_destroy(client, topic);
1083 }
1084
handle_pingreq(struct mqtt_sn_client * client)1085 static void handle_pingreq(struct mqtt_sn_client *client)
1086 {
1087 struct mqtt_sn_param response = {.type = MQTT_SN_MSG_TYPE_PINGRESP};
1088
1089 encode_and_send(client, &response);
1090 }
1091
handle_pingresp(struct mqtt_sn_client * client)1092 static void handle_pingresp(struct mqtt_sn_client *client)
1093 {
1094 struct mqtt_sn_evt evt = {.type = MQTT_SN_EVT_PINGRESP};
1095
1096 if (client->evt_cb) {
1097 client->evt_cb(client, &evt);
1098 }
1099
1100 if (client->state == MQTT_SN_CLIENT_AWAKE) {
1101 mqtt_sn_set_state(client, MQTT_SN_CLIENT_ASLEEP);
1102 }
1103
1104 client->ping_retries = N_RETRY;
1105 }
1106
handle_disconnect(struct mqtt_sn_client * client,struct mqtt_sn_param_disconnect * p)1107 static void handle_disconnect(struct mqtt_sn_client *client, struct mqtt_sn_param_disconnect *p)
1108 {
1109 LOG_INF("Received DISCONNECT");
1110 mqtt_sn_disconnect_internal(client);
1111 }
1112
handle_msg(struct mqtt_sn_client * client)1113 static int handle_msg(struct mqtt_sn_client *client)
1114 {
1115 int err;
1116 struct mqtt_sn_param p;
1117
1118 err = mqtt_sn_decode_msg(&client->rx, &p);
1119 if (err) {
1120 return err;
1121 }
1122
1123 LOG_INF("Got message of type %d", p.type);
1124
1125 switch (p.type) {
1126 case MQTT_SN_MSG_TYPE_GWINFO:
1127 break;
1128 case MQTT_SN_MSG_TYPE_CONNACK:
1129 handle_connack(client, &p.params.connack);
1130 break;
1131 case MQTT_SN_MSG_TYPE_WILLTOPICREQ:
1132 handle_willtopicreq(client);
1133 break;
1134 case MQTT_SN_MSG_TYPE_WILLMSGREQ:
1135 handle_willmsgreq(client);
1136 break;
1137 case MQTT_SN_MSG_TYPE_REGISTER:
1138 handle_register(client, &p.params.reg);
1139 break;
1140 case MQTT_SN_MSG_TYPE_REGACK:
1141 handle_regack(client, &p.params.regack);
1142 break;
1143 case MQTT_SN_MSG_TYPE_PUBLISH:
1144 handle_publish(client, &p.params.publish);
1145 break;
1146 case MQTT_SN_MSG_TYPE_PUBACK:
1147 handle_puback(client, &p.params.puback);
1148 break;
1149 case MQTT_SN_MSG_TYPE_PUBREC:
1150 handle_pubrec(client, &p.params.pubrec);
1151 break;
1152 case MQTT_SN_MSG_TYPE_PUBREL:
1153 handle_pubrel(client, &p.params.pubrel);
1154 break;
1155 case MQTT_SN_MSG_TYPE_PUBCOMP:
1156 handle_pubcomp(client, &p.params.pubcomp);
1157 break;
1158 case MQTT_SN_MSG_TYPE_SUBACK:
1159 handle_suback(client, &p.params.suback);
1160 break;
1161 case MQTT_SN_MSG_TYPE_UNSUBACK:
1162 handle_unsuback(client, &p.params.unsuback);
1163 break;
1164 case MQTT_SN_MSG_TYPE_PINGREQ:
1165 handle_pingreq(client);
1166 break;
1167 case MQTT_SN_MSG_TYPE_PINGRESP:
1168 handle_pingresp(client);
1169 break;
1170 case MQTT_SN_MSG_TYPE_DISCONNECT:
1171 handle_disconnect(client, &p.params.disconnect);
1172 break;
1173 case MQTT_SN_MSG_TYPE_WILLTOPICRESP:
1174 break;
1175 case MQTT_SN_MSG_TYPE_WILLMSGRESP:
1176 break;
1177 default:
1178 LOG_ERR("Unexpected message type %d", p.type);
1179 break;
1180 }
1181
1182 k_work_reschedule(&client->process_work, K_NO_WAIT);
1183
1184 return 0;
1185 }
1186
mqtt_sn_input(struct mqtt_sn_client * client)1187 int mqtt_sn_input(struct mqtt_sn_client *client)
1188 {
1189 ssize_t next_frame_size;
1190 int err;
1191
1192 if (!client || !client->transport || !client->transport->recv) {
1193 return -EINVAL;
1194 }
1195
1196 if (client->transport->poll) {
1197 next_frame_size = client->transport->poll(client);
1198 if (next_frame_size <= 0) {
1199 return next_frame_size;
1200 }
1201 }
1202
1203 net_buf_simple_reset(&client->rx);
1204
1205 next_frame_size = client->transport->recv(client, client->rx.data, client->rx.size);
1206 if (next_frame_size <= 0) {
1207 return next_frame_size;
1208 }
1209
1210 if (next_frame_size > client->rx.size) {
1211 return -ENOBUFS;
1212 }
1213
1214 client->rx.len = next_frame_size;
1215
1216 LOG_HEXDUMP_DBG(client->rx.data, client->rx.len, "Received data");
1217
1218 err = handle_msg(client);
1219
1220 if (err) {
1221 return err;
1222 }
1223
1224 /* Should be zero */
1225 return -client->rx.len;
1226 }
1227