1 /*
2  * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 #include "messages.h"
6 
7 #include <zephyr/kernel.h>
8 #include <zephyr/logging/log.h>
9 #include <zephyr/sys/util_macro.h>
10 #include <zephyr/zbus/zbus.h>
11 #include <zephyr/ztest.h>
12 #include <zephyr/ztest_assert.h>
13 LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
14 
15 ZBUS_CHAN_DECLARE(version_chan, sensor_data_chan, net_pkt_chan, net_log_chan,
16 		  start_measurement_chan, busy_chan);
17 
18 static int count_callback;
19 
urgent_callback(const struct zbus_channel * chan)20 static void urgent_callback(const struct zbus_channel *chan)
21 {
22 	LOG_INF(" *** LISTENER activated for channel %s ***\n", zbus_chan_name(chan));
23 
24 	++count_callback;
25 }
26 
27 ZBUS_LISTENER_DEFINE(critical_lis, urgent_callback);
28 
29 static int count_core;
30 
31 ZBUS_SUBSCRIBER_DEFINE(core_sub, 1);
32 
core_thread(void)33 static void core_thread(void)
34 {
35 	const struct zbus_channel *chan = NULL;
36 
37 	while (1) {
38 		if (!zbus_sub_wait(&core_sub, &chan, K_FOREVER)) {
39 			count_core++;
40 
41 			struct sensor_data_msg data;
42 
43 			zbus_chan_read(&sensor_data_chan, &data, K_FOREVER);
44 
45 			struct net_pkt_msg pkt = {.total = data.a + data.b};
46 
47 			LOG_DBG("Sensor {a = %d, b = %d}. Sending pkt {total=%d}", data.a, data.b,
48 				pkt.total);
49 
50 			zbus_chan_pub(&net_pkt_chan, &pkt, K_MSEC(200));
51 		}
52 	}
53 }
54 
55 K_THREAD_DEFINE(core_thread_id, 1024, core_thread, NULL, NULL, NULL, 3, 0, 0);
56 
57 static int count_net;
58 static struct net_pkt_msg pkt = {0};
59 
60 ZBUS_SUBSCRIBER_DEFINE(net_sub, 4);
61 
net_thread(void)62 static void net_thread(void)
63 {
64 	const struct zbus_channel *chan;
65 
66 	while (1) {
67 		if (!zbus_sub_wait(&net_sub, &chan, K_FOREVER)) {
68 			count_net++;
69 			zbus_chan_read(&net_pkt_chan, &pkt, K_FOREVER);
70 
71 			LOG_DBG("[Net] Total %d", pkt.total);
72 
73 			struct net_log_msg log_msg = {.count_net = count_net,
74 						      .pkt_total = pkt.total};
75 
76 			zbus_chan_pub(&net_log_chan, &log_msg, K_MSEC(500));
77 		}
78 	}
79 }
80 
81 K_THREAD_DEFINE(net_thread_id, 1024, net_thread, NULL, NULL, NULL, 3, 0, 0);
82 
83 static int count_net_log;
84 
85 ZBUS_MSG_SUBSCRIBER_DEFINE(net_log_sub);
86 
net_log_thread(void)87 static void net_log_thread(void)
88 {
89 	const struct zbus_channel *chan;
90 	struct net_log_msg log_msg;
91 
92 	while (1) {
93 		if (!zbus_sub_wait_msg(&net_log_sub, &chan, &log_msg, K_FOREVER)) {
94 			count_net_log++;
95 
96 			LOG_DBG("[Net log]: count_net = %d, pkt.total = %d", log_msg.count_net,
97 				log_msg.pkt_total);
98 		}
99 	}
100 }
101 
102 K_THREAD_DEFINE(net_log_thread_id, 1024, net_log_thread, NULL, NULL, NULL, 3, 0, 0);
103 
104 static int a;
105 static int b;
106 static int count_peripheral;
107 
108 ZBUS_SUBSCRIBER_DEFINE(peripheral_sub, 1);
109 
peripheral_thread(void)110 static void peripheral_thread(void)
111 {
112 	struct sensor_data_msg sd = {0, 0};
113 
114 	const struct zbus_channel *chan;
115 
116 	while (!zbus_sub_wait(&peripheral_sub, &chan, K_FOREVER)) {
117 		LOG_DBG("[Peripheral] starting measurement");
118 
119 		++count_peripheral;
120 		++a;
121 		++b;
122 
123 		sd.a = a;
124 		sd.b = b;
125 
126 		LOG_DBG("[Peripheral] sending sensor data");
127 
128 		zbus_chan_pub(&sensor_data_chan, &sd, K_MSEC(250));
129 
130 		k_msleep(150);
131 	}
132 }
133 
134 K_THREAD_DEFINE(peripheral_thread_id, 1024, peripheral_thread, NULL, NULL, NULL, 3, 0, 0);
135 
context_reset(void * f)136 static void context_reset(void *f)
137 {
138 	k_busy_wait(1000000);
139 
140 	a = 0;
141 	b = 0;
142 	count_callback = 0;
143 	count_core = 0;
144 	count_net = 0;
145 	count_net_log = 0;
146 	count_peripheral = 0;
147 	pkt.total = 0;
148 	struct net_pkt_msg *p;
149 
150 	zbus_chan_claim(&net_pkt_chan, K_NO_WAIT);
151 	p = zbus_chan_msg(&net_pkt_chan);
152 	p->total = 0;
153 	zbus_chan_finish(&net_pkt_chan);
154 	struct sensor_data_msg *sd;
155 
156 	zbus_chan_claim(&sensor_data_chan, K_NO_WAIT);
157 	sd = (struct sensor_data_msg *)sensor_data_chan.message;
158 	sd->a = 0;
159 	sd->b = 1;
160 	zbus_chan_finish(&sensor_data_chan);
161 	zbus_obs_set_enable(&critical_lis, true);
162 	zbus_obs_set_enable(&peripheral_sub, true);
163 	zbus_chan_claim(&start_measurement_chan, K_NO_WAIT);
164 	struct action_msg *act = (struct action_msg *)zbus_chan_msg(&start_measurement_chan);
165 
166 	act->status = false;
167 	zbus_chan_finish(&start_measurement_chan);
168 	zbus_chan_claim(&net_log_chan, K_NO_WAIT);
169 	struct net_log_msg *lm = (struct net_log_msg *)zbus_chan_msg(&net_log_chan);
170 
171 	lm->count_net = 0;
172 	lm->pkt_total = 0;
173 	zbus_chan_finish(&net_log_chan);
174 }
175 
ZTEST(integration,test_basic)176 ZTEST(integration, test_basic)
177 {
178 	struct action_msg start = {true};
179 	struct net_log_msg *lm = (struct net_log_msg *)zbus_chan_const_msg(&net_log_chan);
180 
181 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
182 
183 	k_msleep(200);
184 
185 	zassert_equal(count_callback, 1, NULL);
186 	zassert_equal(count_core, 1, NULL);
187 	zassert_equal(count_net, 1, NULL);
188 	zassert_equal(count_peripheral, 1, NULL);
189 	zassert_equal(count_net_log, 1, NULL);
190 	zassert_equal(count_net, lm->count_net, NULL);
191 
192 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
193 
194 	k_msleep(200);
195 
196 	zassert_equal(count_callback, 2, NULL);
197 	zassert_equal(count_core, 2, NULL);
198 	zassert_equal(count_net, 2, NULL);
199 	zassert_equal(count_peripheral, 2, NULL);
200 	zassert_equal(count_net_log, 2, NULL);
201 	zassert_equal(count_net, lm->count_net, NULL);
202 
203 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
204 
205 	k_msleep(200);
206 
207 	zassert_equal(count_callback, 3, NULL);
208 	zassert_equal(count_core, 3, NULL);
209 	zassert_equal(count_net, 3, NULL);
210 	zassert_equal(count_peripheral, 3, NULL);
211 	zassert_equal(count_net_log, 3, NULL);
212 	zassert_equal(count_net, lm->count_net, NULL);
213 
214 	zassert_equal(pkt.total, 6, "result was %d", pkt.total);
215 	zassert_equal(pkt.total, lm->pkt_total, NULL);
216 }
217 
ZTEST(integration,test_channel_set_enable)218 ZTEST(integration, test_channel_set_enable)
219 {
220 	struct action_msg start = {true};
221 	const struct net_log_msg *lm = zbus_chan_const_msg(&net_log_chan);
222 
223 	zassert_equal(0, zbus_obs_set_enable(&critical_lis, false), NULL);
224 	zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, false), NULL);
225 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
226 
227 	k_msleep(200);
228 
229 	zassert_equal(count_callback, 0, NULL);
230 	zassert_equal(count_core, 0, NULL);
231 	zassert_equal(count_peripheral, 0, NULL);
232 	zassert_equal(count_net, 0, NULL);
233 	zassert_equal(count_net_log, 0, NULL);
234 	zassert_equal(count_net, lm->count_net, NULL);
235 
236 	zassert_equal(0, zbus_obs_set_enable(&critical_lis, false), NULL);
237 	zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, true), NULL);
238 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
239 
240 	k_msleep(200);
241 
242 	zassert_equal(count_callback, 0, NULL);
243 	zassert_equal(count_core, 1, NULL);
244 	zassert_equal(count_net, 1, NULL);
245 	zassert_equal(count_peripheral, 1, NULL);
246 	zassert_equal(count_net_log, 1, NULL);
247 	zassert_equal(count_net, lm->count_net, NULL);
248 
249 	zassert_equal(0, zbus_obs_set_enable(&critical_lis, true), NULL);
250 	zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, false), NULL);
251 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
252 
253 	k_msleep(200);
254 
255 	zassert_equal(count_callback, 1, NULL);
256 	zassert_equal(count_core, 1, NULL);
257 	zassert_equal(count_net, 1, NULL);
258 	zassert_equal(count_peripheral, 1, NULL);
259 	zassert_equal(count_net_log, 1, NULL);
260 	zassert_equal(count_net, lm->count_net, NULL);
261 
262 	zassert_equal(0, zbus_obs_set_enable(&critical_lis, true), NULL);
263 	zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, true), NULL);
264 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
265 
266 	k_msleep(200);
267 
268 	zassert_equal(count_callback, 2, NULL);
269 	zassert_equal(count_core, 2, NULL);
270 	zassert_equal(count_peripheral, 2, NULL);
271 	zassert_equal(count_net, 2, NULL);
272 	zassert_equal(count_net_log, 2, NULL);
273 	zassert_equal(count_net, lm->count_net, NULL);
274 
275 	zassert_equal(pkt.total, 4, "result was %d", pkt.total);
276 	zassert_equal(pkt.total, lm->pkt_total, NULL);
277 }
278 
greedy_thread_entry(void * p1,void * p2,void * p3)279 static void greedy_thread_entry(void *p1, void *p2, void *p3)
280 {
281 	int err = zbus_chan_claim(&busy_chan, K_MSEC(500));
282 
283 	zassert_equal(err, 0, "Could not claim the channel");
284 	k_msleep(2000);
285 	zassert_equal(0, zbus_chan_finish(&busy_chan), NULL);
286 }
287 
288 K_THREAD_STACK_DEFINE(greedy_thread_stack_area, 1024);
289 
290 static struct k_thread greedy_thread_data;
291 
ZTEST(integration,test_event_dispatcher_mutex_timeout)292 ZTEST(integration, test_event_dispatcher_mutex_timeout)
293 {
294 	struct action_msg read;
295 	struct action_msg sent = {.status = true};
296 
297 	int err = zbus_chan_read(&busy_chan, &read, K_NO_WAIT);
298 
299 	zassert_equal(err, 0, "Could not read the channel");
300 
301 	zassert_equal(read.status, 0, "Read status must be false");
302 
303 	k_thread_create(&greedy_thread_data, greedy_thread_stack_area,
304 			K_THREAD_STACK_SIZEOF(greedy_thread_stack_area), greedy_thread_entry, NULL,
305 			NULL, NULL, CONFIG_ZTEST_THREAD_PRIORITY, K_USER | K_INHERIT_PERMS,
306 			K_NO_WAIT);
307 
308 	k_msleep(500);
309 
310 	err = zbus_chan_pub(&busy_chan, &sent, K_MSEC(200));
311 	zassert_equal(err, -EAGAIN, "Channel must be busy and could no be published %d", err);
312 	err = zbus_chan_read(&busy_chan, &read, K_MSEC(200));
313 	zassert_equal(err, -EAGAIN, "Channel must be busy and could no be published %d", err);
314 	err = zbus_chan_claim(&busy_chan, K_MSEC(200));
315 	zassert_equal(err, -EAGAIN, "Channel must be busy and could no be claimed %d", err);
316 	err = zbus_chan_pub(&busy_chan, &sent, K_MSEC(200));
317 	zassert_equal(err, -EAGAIN, "Channel must be busy and could no be published %d", err);
318 	/* Wait for the greedy thread to finish and pub and read successfully */
319 	err = zbus_chan_pub(&busy_chan, &sent, K_MSEC(2000));
320 	zassert_equal(err, 0, "Channel must be busy and could no be published %d", err);
321 	err = zbus_chan_read(&busy_chan, &read, K_MSEC(2000));
322 	zassert_equal(err, 0, "Could not read the channel");
323 
324 	zassert_equal(read.status, true, "Read status must be false");
325 }
326 
ZTEST(integration,test_event_dispatcher_queue_timeout)327 ZTEST(integration, test_event_dispatcher_queue_timeout)
328 {
329 	struct action_msg sent = {.status = true};
330 	struct action_msg read = {.status = true};
331 
332 	zassert_equal(0, zbus_obs_set_enable(&core_sub, false), NULL);
333 	zassert_equal(0, zbus_obs_set_enable(&net_sub, false), NULL);
334 	int err = zbus_chan_pub(&start_measurement_chan, &sent, K_MSEC(100));
335 
336 	zassert_equal(err, 0, "Could not pub the channel");
337 	k_msleep(10);
338 	sent.status = false;
339 	err = zbus_chan_pub(&start_measurement_chan, &sent, K_MSEC(100));
340 	zassert_equal(err, 0, "Could not pub the channel");
341 	k_msleep(10);
342 	err = zbus_chan_pub(&start_measurement_chan, &sent, K_MSEC(100));
343 	zassert_equal(err, -EAGAIN, "Pub to the channel %d must not occur here", err);
344 	err = zbus_chan_read(&start_measurement_chan, &read, K_NO_WAIT);
345 	zassert_equal(err, 0, "Could not read the channel");
346 	zassert_equal(read.status, false,
347 		      "Read status must be false. The notification was not sent, but "
348 		      "the channel actually changed");
349 	k_msleep(500);
350 	zassert_equal(count_callback, 3, NULL);
351 	zassert_equal(count_peripheral, 2, NULL);
352 }
353 
354 ZTEST_SUITE(integration, NULL, NULL, context_reset, NULL, NULL);
355