1 /*
2  * Copyright (c) 2022 grandcentrix GmbH
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include "common.h"
8 
9 #include <errno.h>
10 #include <stdio.h>
11 #include <zephyr/kernel.h>
12 #include <zephyr/net/mqtt_sn.h>
13 #include <zephyr/net/conn_mgr_monitor.h>
14 #include <zephyr/net/net_mgmt.h>
15 #include <zephyr/net/socket.h>
16 
17 #include <zephyr/logging/log.h>
18 LOG_MODULE_DECLARE(mqtt_sn_publisher_sample);
19 
20 #ifdef CONFIG_NET_SAMPLE_MQTT_SN_STATIC_GATEWAY
21 #define SAMPLE_GW_IP CONFIG_NET_SAMPLE_MQTT_SN_GATEWAY_IP
22 #else
23 #define SAMPLE_GW_IP ""
24 #endif
25 
26 static void process_thread(void);
27 
28 K_THREAD_DEFINE(udp_thread_id, STACK_SIZE, process_thread, NULL, NULL, NULL, THREAD_PRIORITY,
29 		IS_ENABLED(CONFIG_USERSPACE) ? K_USER : 0, -1);
30 
31 static APP_BMEM struct mqtt_sn_client mqtt_client;
32 static APP_BMEM struct mqtt_sn_transport_udp tp;
33 static APP_DMEM struct mqtt_sn_data client_id = MQTT_SN_DATA_STRING_LITERAL("ZEPHYR");
34 
35 static APP_BMEM uint8_t tx_buf[CONFIG_NET_SAMPLE_MQTT_SN_BUFFER_SIZE];
36 static APP_BMEM uint8_t rx_buf[CONFIG_NET_SAMPLE_MQTT_SN_BUFFER_SIZE];
37 
38 static APP_BMEM bool mqtt_sn_connected;
39 
evt_cb(struct mqtt_sn_client * client,const struct mqtt_sn_evt * evt)40 static void evt_cb(struct mqtt_sn_client *client, const struct mqtt_sn_evt *evt)
41 {
42 	switch (evt->type) {
43 	case MQTT_SN_EVT_CONNECTED: /* Connected to a gateway */
44 		LOG_INF("MQTT-SN event EVT_CONNECTED");
45 		mqtt_sn_connected = true;
46 		break;
47 	case MQTT_SN_EVT_DISCONNECTED: /* Disconnected */
48 		LOG_INF("MQTT-SN event EVT_DISCONNECTED");
49 		mqtt_sn_connected = false;
50 		break;
51 	case MQTT_SN_EVT_ASLEEP: /* Entered ASLEEP state */
52 		LOG_INF("MQTT-SN event EVT_ASLEEP");
53 		break;
54 	case MQTT_SN_EVT_AWAKE: /* Entered AWAKE state */
55 		LOG_INF("MQTT-SN event EVT_AWAKE");
56 		break;
57 	case MQTT_SN_EVT_PUBLISH: /* Received a PUBLISH message */
58 		LOG_INF("MQTT-SN event EVT_PUBLISH");
59 		LOG_HEXDUMP_INF(evt->param.publish.data.data, evt->param.publish.data.size,
60 				"Published data");
61 		break;
62 	case MQTT_SN_EVT_PINGRESP: /* Received a PINGRESP */
63 		LOG_INF("MQTT-SN event EVT_PINGRESP");
64 		break;
65 	case MQTT_SN_EVT_ADVERTISE: /* Received a ADVERTISE */
66 		LOG_INF("MQTT-SN event EVT_ADVERTISE");
67 		break;
68 	case MQTT_SN_EVT_GWINFO: /* Received a GWINFO */
69 		LOG_INF("MQTT-SN event EVT_GWINFO");
70 		break;
71 	case MQTT_SN_EVT_SEARCHGW: /* Received a SEARCHGW */
72 		LOG_INF("MQTT-SN event EVT_SEARCHGW");
73 		break;
74 	default:
75 		break;
76 	}
77 }
78 
do_work(void)79 static int do_work(void)
80 {
81 	static APP_BMEM bool subscribed;
82 	static APP_BMEM int64_t ts;
83 	static APP_DMEM struct mqtt_sn_data topic_p = MQTT_SN_DATA_STRING_LITERAL("/uptime");
84 	static APP_DMEM struct mqtt_sn_data topic_s = MQTT_SN_DATA_STRING_LITERAL("/number");
85 	char out[20];
86 	struct mqtt_sn_data pubdata = {.data = out};
87 	const int64_t now = k_uptime_get();
88 	int err;
89 
90 	err = mqtt_sn_input(&mqtt_client);
91 	if (err < 0) {
92 		LOG_ERR("failed: input: %d", err);
93 		return err;
94 	}
95 
96 	if (mqtt_sn_connected && !subscribed) {
97 		err = mqtt_sn_subscribe(&mqtt_client, MQTT_SN_QOS_0, &topic_s);
98 		if (err < 0) {
99 			return err;
100 		}
101 		subscribed = true;
102 	}
103 
104 	if (now - ts > 10000 && mqtt_sn_connected) {
105 		LOG_INF("Publishing timestamp");
106 
107 		ts = now;
108 
109 		err = snprintk(out, sizeof(out), "%" PRIi64, ts);
110 		if (err < 0) {
111 			LOG_ERR("failed: snprintf");
112 			return err;
113 		}
114 
115 		pubdata.size = MIN(sizeof(out), err);
116 
117 		err = mqtt_sn_publish(&mqtt_client, MQTT_SN_QOS_0, &topic_p, false, &pubdata);
118 		if (err < 0) {
119 			LOG_ERR("failed: publish: %d", err);
120 			return err;
121 		}
122 	}
123 
124 	return 0;
125 }
126 
process_thread(void)127 static void process_thread(void)
128 {
129 	struct sockaddr_in bcaddr = {0};
130 	int err;
131 	LOG_DBG("Parsing Broadcast IP " CONFIG_NET_SAMPLE_MQTT_SN_BROADCAST_IP);
132 	bcaddr.sin_family = AF_INET;
133 	bcaddr.sin_port = htons(CONFIG_NET_SAMPLE_MQTT_SN_BROADCAST_PORT);
134 	err = inet_pton(AF_INET, CONFIG_NET_SAMPLE_MQTT_SN_BROADCAST_IP, &bcaddr.sin_addr);
135 	__ASSERT(err == 1, "inet_pton() failed %d", err);
136 
137 	LOG_INF("Waiting for connection...");
138 	LOG_HEXDUMP_DBG(&bcaddr, sizeof(bcaddr), " broadcast address");
139 
140 	err = mqtt_sn_transport_udp_init(&tp, (struct sockaddr *)&bcaddr, sizeof((bcaddr)));
141 	__ASSERT(err == 0, "mqtt_sn_transport_udp_init() failed %d", err);
142 
143 	err = mqtt_sn_client_init(&mqtt_client, &client_id, &tp.tp, evt_cb, tx_buf, sizeof(tx_buf),
144 				  rx_buf, sizeof(rx_buf));
145 	__ASSERT(err == 0, "mqtt_sn_client_init() failed %d", err);
146 
147 	if (IS_ENABLED(CONFIG_NET_SAMPLE_MQTT_SN_STATIC_GATEWAY)) {
148 		LOG_INF("Adding predefined Gateway");
149 		struct sockaddr_in gwaddr = {0};
150 		int err;
151 
152 		LOG_DBG("Parsing Broadcast IP %s", SAMPLE_GW_IP);
153 		gwaddr.sin_family = AF_INET;
154 		gwaddr.sin_port = htons(CONFIG_NET_SAMPLE_MQTT_SN_GATEWAY_PORT);
155 		err = inet_pton(AF_INET, SAMPLE_GW_IP, &gwaddr.sin_addr);
156 		__ASSERT(err == 1, "inet_pton() failed %d", err);
157 		struct mqtt_sn_data gwaddr_data = {.data = (uint8_t *)&bcaddr,
158 						   .size = sizeof(struct sockaddr)};
159 
160 		err = mqtt_sn_add_gw(&mqtt_client, 0x1f, gwaddr_data);
161 		__ASSERT(err == 0, "mqtt_sn_add_gw() failed %d", err);
162 	} else {
163 		LOG_INF("Searching for Gateway");
164 		err = mqtt_sn_search(&mqtt_client, 1);
165 		k_sleep(K_SECONDS(10));
166 		err = mqtt_sn_input(&mqtt_client);
167 		__ASSERT(err == 0, "mqtt_sn_search() failed %d", err);
168 	}
169 
170 	LOG_INF("Connecting client");
171 	err = mqtt_sn_connect(&mqtt_client, false, true);
172 	__ASSERT(err == 0, "mqtt_sn_connect() failed %d", err);
173 
174 	while (err == 0) {
175 		k_sleep(K_MSEC(500));
176 		err = do_work();
177 	}
178 
179 	LOG_ERR("Exiting thread: %d", err);
180 }
181 
start_thread(void)182 void start_thread(void)
183 {
184 	int rc;
185 #if defined(CONFIG_USERSPACE)
186 	rc = k_mem_domain_add_thread(&app_domain, udp_thread_id);
187 	if (rc < 0) {
188 		LOG_ERR("Failed: k_mem_domain_add_thread() %d", rc);
189 		return;
190 	}
191 #endif
192 
193 	rc = k_thread_name_set(udp_thread_id, "udp");
194 	if (rc < 0 && rc != -ENOSYS) {
195 		LOG_ERR("Failed: k_thread_name_set() %d", rc);
196 		return;
197 	}
198 
199 	k_thread_start(udp_thread_id);
200 
201 	rc = k_thread_join(udp_thread_id, K_FOREVER);
202 
203 	if (rc != 0) {
204 		LOG_ERR("Failed: k_thread_join() %d", rc);
205 	}
206 }
207