1 /*
2 * Copyright (c) 2022 grandcentrix GmbH
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include "common.h"
8
9 #include <errno.h>
10 #include <stdio.h>
11 #include <zephyr/kernel.h>
12 #include <zephyr/net/mqtt_sn.h>
13 #include <zephyr/net/conn_mgr_monitor.h>
14 #include <zephyr/net/net_mgmt.h>
15 #include <zephyr/net/socket.h>
16
17 #include <zephyr/logging/log.h>
18 LOG_MODULE_DECLARE(mqtt_sn_publisher_sample);
19
20 #ifdef CONFIG_NET_SAMPLE_MQTT_SN_STATIC_GATEWAY
21 #define SAMPLE_GW_IP CONFIG_NET_SAMPLE_MQTT_SN_GATEWAY_IP
22 #else
23 #define SAMPLE_GW_IP ""
24 #endif
25
26 static void process_thread(void);
27
28 K_THREAD_DEFINE(udp_thread_id, STACK_SIZE, process_thread, NULL, NULL, NULL, THREAD_PRIORITY,
29 IS_ENABLED(CONFIG_USERSPACE) ? K_USER : 0, -1);
30
31 static APP_BMEM struct mqtt_sn_client mqtt_client;
32 static APP_BMEM struct mqtt_sn_transport_udp tp;
33 static APP_DMEM struct mqtt_sn_data client_id = MQTT_SN_DATA_STRING_LITERAL("ZEPHYR");
34
35 static APP_BMEM uint8_t tx_buf[CONFIG_NET_SAMPLE_MQTT_SN_BUFFER_SIZE];
36 static APP_BMEM uint8_t rx_buf[CONFIG_NET_SAMPLE_MQTT_SN_BUFFER_SIZE];
37
38 static APP_BMEM bool mqtt_sn_connected;
39
evt_cb(struct mqtt_sn_client * client,const struct mqtt_sn_evt * evt)40 static void evt_cb(struct mqtt_sn_client *client, const struct mqtt_sn_evt *evt)
41 {
42 switch (evt->type) {
43 case MQTT_SN_EVT_CONNECTED: /* Connected to a gateway */
44 LOG_INF("MQTT-SN event EVT_CONNECTED");
45 mqtt_sn_connected = true;
46 break;
47 case MQTT_SN_EVT_DISCONNECTED: /* Disconnected */
48 LOG_INF("MQTT-SN event EVT_DISCONNECTED");
49 mqtt_sn_connected = false;
50 break;
51 case MQTT_SN_EVT_ASLEEP: /* Entered ASLEEP state */
52 LOG_INF("MQTT-SN event EVT_ASLEEP");
53 break;
54 case MQTT_SN_EVT_AWAKE: /* Entered AWAKE state */
55 LOG_INF("MQTT-SN event EVT_AWAKE");
56 break;
57 case MQTT_SN_EVT_PUBLISH: /* Received a PUBLISH message */
58 LOG_INF("MQTT-SN event EVT_PUBLISH");
59 LOG_HEXDUMP_INF(evt->param.publish.data.data, evt->param.publish.data.size,
60 "Published data");
61 break;
62 case MQTT_SN_EVT_PINGRESP: /* Received a PINGRESP */
63 LOG_INF("MQTT-SN event EVT_PINGRESP");
64 break;
65 case MQTT_SN_EVT_ADVERTISE: /* Received a ADVERTISE */
66 LOG_INF("MQTT-SN event EVT_ADVERTISE");
67 break;
68 case MQTT_SN_EVT_GWINFO: /* Received a GWINFO */
69 LOG_INF("MQTT-SN event EVT_GWINFO");
70 break;
71 case MQTT_SN_EVT_SEARCHGW: /* Received a SEARCHGW */
72 LOG_INF("MQTT-SN event EVT_SEARCHGW");
73 break;
74 default:
75 break;
76 }
77 }
78
do_work(void)79 static int do_work(void)
80 {
81 static APP_BMEM bool subscribed;
82 static APP_BMEM int64_t ts;
83 static APP_DMEM struct mqtt_sn_data topic_p = MQTT_SN_DATA_STRING_LITERAL("/uptime");
84 static APP_DMEM struct mqtt_sn_data topic_s = MQTT_SN_DATA_STRING_LITERAL("/number");
85 char out[20];
86 struct mqtt_sn_data pubdata = {.data = out};
87 const int64_t now = k_uptime_get();
88 int err;
89
90 err = mqtt_sn_input(&mqtt_client);
91 if (err < 0) {
92 LOG_ERR("failed: input: %d", err);
93 return err;
94 }
95
96 if (mqtt_sn_connected && !subscribed) {
97 err = mqtt_sn_subscribe(&mqtt_client, MQTT_SN_QOS_0, &topic_s);
98 if (err < 0) {
99 return err;
100 }
101 subscribed = true;
102 }
103
104 if (now - ts > 10000 && mqtt_sn_connected) {
105 LOG_INF("Publishing timestamp");
106
107 ts = now;
108
109 err = snprintk(out, sizeof(out), "%" PRIi64, ts);
110 if (err < 0) {
111 LOG_ERR("failed: snprintf");
112 return err;
113 }
114
115 pubdata.size = MIN(sizeof(out), err);
116
117 err = mqtt_sn_publish(&mqtt_client, MQTT_SN_QOS_0, &topic_p, false, &pubdata);
118 if (err < 0) {
119 LOG_ERR("failed: publish: %d", err);
120 return err;
121 }
122 }
123
124 return 0;
125 }
126
process_thread(void)127 static void process_thread(void)
128 {
129 struct sockaddr_in bcaddr = {0};
130 int err;
131 LOG_DBG("Parsing Broadcast IP " CONFIG_NET_SAMPLE_MQTT_SN_BROADCAST_IP);
132 bcaddr.sin_family = AF_INET;
133 bcaddr.sin_port = htons(CONFIG_NET_SAMPLE_MQTT_SN_BROADCAST_PORT);
134 err = inet_pton(AF_INET, CONFIG_NET_SAMPLE_MQTT_SN_BROADCAST_IP, &bcaddr.sin_addr);
135 __ASSERT(err == 1, "inet_pton() failed %d", err);
136
137 LOG_INF("Waiting for connection...");
138 LOG_HEXDUMP_DBG(&bcaddr, sizeof(bcaddr), " broadcast address");
139
140 err = mqtt_sn_transport_udp_init(&tp, (struct sockaddr *)&bcaddr, sizeof((bcaddr)));
141 __ASSERT(err == 0, "mqtt_sn_transport_udp_init() failed %d", err);
142
143 err = mqtt_sn_client_init(&mqtt_client, &client_id, &tp.tp, evt_cb, tx_buf, sizeof(tx_buf),
144 rx_buf, sizeof(rx_buf));
145 __ASSERT(err == 0, "mqtt_sn_client_init() failed %d", err);
146
147 if (IS_ENABLED(CONFIG_NET_SAMPLE_MQTT_SN_STATIC_GATEWAY)) {
148 LOG_INF("Adding predefined Gateway");
149 struct sockaddr_in gwaddr = {0};
150 int err;
151
152 LOG_DBG("Parsing Broadcast IP %s", SAMPLE_GW_IP);
153 gwaddr.sin_family = AF_INET;
154 gwaddr.sin_port = htons(CONFIG_NET_SAMPLE_MQTT_SN_GATEWAY_PORT);
155 err = inet_pton(AF_INET, SAMPLE_GW_IP, &gwaddr.sin_addr);
156 __ASSERT(err == 1, "inet_pton() failed %d", err);
157 struct mqtt_sn_data gwaddr_data = {.data = (uint8_t *)&bcaddr,
158 .size = sizeof(struct sockaddr)};
159
160 err = mqtt_sn_add_gw(&mqtt_client, 0x1f, gwaddr_data);
161 __ASSERT(err == 0, "mqtt_sn_add_gw() failed %d", err);
162 } else {
163 LOG_INF("Searching for Gateway");
164 err = mqtt_sn_search(&mqtt_client, 1);
165 k_sleep(K_SECONDS(10));
166 err = mqtt_sn_input(&mqtt_client);
167 __ASSERT(err == 0, "mqtt_sn_search() failed %d", err);
168 }
169
170 LOG_INF("Connecting client");
171 err = mqtt_sn_connect(&mqtt_client, false, true);
172 __ASSERT(err == 0, "mqtt_sn_connect() failed %d", err);
173
174 while (err == 0) {
175 k_sleep(K_MSEC(500));
176 err = do_work();
177 }
178
179 LOG_ERR("Exiting thread: %d", err);
180 }
181
start_thread(void)182 void start_thread(void)
183 {
184 int rc;
185 #if defined(CONFIG_USERSPACE)
186 rc = k_mem_domain_add_thread(&app_domain, udp_thread_id);
187 if (rc < 0) {
188 LOG_ERR("Failed: k_mem_domain_add_thread() %d", rc);
189 return;
190 }
191 #endif
192
193 rc = k_thread_name_set(udp_thread_id, "udp");
194 if (rc < 0 && rc != -ENOSYS) {
195 LOG_ERR("Failed: k_thread_name_set() %d", rc);
196 return;
197 }
198
199 k_thread_start(udp_thread_id);
200
201 rc = k_thread_join(udp_thread_id, K_FOREVER);
202
203 if (rc != 0) {
204 LOG_ERR("Failed: k_thread_join() %d", rc);
205 }
206 }
207