1 /*
2  * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 #include "messages.h"
6 
7 #include <zephyr/kernel.h>
8 #include <zephyr/logging/log.h>
9 #include <zephyr/sys/util_macro.h>
10 #include <zephyr/zbus/zbus.h>
11 #include <zephyr/ztest.h>
12 #include <zephyr/ztest_assert.h>
13 LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
14 
15 ZBUS_CHAN_DECLARE(version_chan, sensor_data_chan, net_pkt_chan, net_log_chan,
16 		  start_measurement_chan, busy_chan);
17 
18 static int count_callback;
19 
urgent_callback(const struct zbus_channel * chan)20 static void urgent_callback(const struct zbus_channel *chan)
21 {
22 	LOG_INF(" *** LISTENER activated for channel %s ***\n", zbus_chan_name(chan));
23 
24 	++count_callback;
25 }
26 
27 ZBUS_LISTENER_DEFINE(critical_lis, urgent_callback);
28 
29 static int count_core;
30 
31 ZBUS_SUBSCRIBER_DEFINE(core_sub, 1);
32 
core_thread(void)33 static void core_thread(void)
34 {
35 	const struct zbus_channel *chan = NULL;
36 
37 	while (1) {
38 		if (!zbus_sub_wait(&core_sub, &chan, K_FOREVER)) {
39 			count_core++;
40 
41 			struct sensor_data_msg data;
42 
43 			zbus_chan_read(&sensor_data_chan, &data, K_NO_WAIT);
44 
45 			struct net_pkt_msg pkt = {.total = data.a + data.b};
46 
47 			LOG_DBG("Sensor {a = %d, b = %d}. Sending pkt {total=%d}", data.a, data.b,
48 				pkt.total);
49 
50 			zbus_chan_pub(&net_pkt_chan, &pkt, K_MSEC(200));
51 		}
52 	}
53 }
54 
55 K_THREAD_DEFINE(core_thread_id, 1024, core_thread, NULL, NULL, NULL, 3, 0, 0);
56 
57 static int count_net;
58 static struct net_pkt_msg pkt = {0};
59 
60 ZBUS_SUBSCRIBER_DEFINE(net_sub, 4);
61 
net_thread(void)62 static void net_thread(void)
63 {
64 	const struct zbus_channel *chan;
65 
66 	while (1) {
67 		if (!zbus_sub_wait(&net_sub, &chan, K_FOREVER)) {
68 			count_net++;
69 
70 			zbus_chan_read(&net_pkt_chan, &pkt, K_NO_WAIT);
71 
72 			LOG_DBG("[Net] Total %d", pkt.total);
73 
74 			struct net_log_msg log_msg = {.count_net = count_net,
75 						      .pkt_total = pkt.total};
76 
77 			zbus_chan_pub(&net_log_chan, &log_msg, K_MSEC(500));
78 		}
79 	}
80 }
81 
82 K_THREAD_DEFINE(net_thread_id, 1024, net_thread, NULL, NULL, NULL, 3, 0, 0);
83 
84 static int count_net_log;
85 
86 ZBUS_MSG_SUBSCRIBER_DEFINE(net_log_sub);
87 
net_log_thread(void)88 static void net_log_thread(void)
89 {
90 	const struct zbus_channel *chan;
91 	struct net_log_msg log_msg;
92 
93 	while (1) {
94 		if (!zbus_sub_wait_msg(&net_log_sub, &chan, &log_msg, K_FOREVER)) {
95 			count_net_log++;
96 
97 			LOG_DBG("[Net log]: count_net = %d, pkt.total = %d", log_msg.count_net,
98 				log_msg.pkt_total);
99 		}
100 	}
101 }
102 
103 K_THREAD_DEFINE(net_log_thread_id, 1024, net_log_thread, NULL, NULL, NULL, 3, 0, 0);
104 
105 static int a;
106 static int b;
107 static int count_peripheral;
108 
109 ZBUS_SUBSCRIBER_DEFINE(peripheral_sub, 1);
110 
peripheral_thread(void)111 static void peripheral_thread(void)
112 {
113 	struct sensor_data_msg sd = {0, 0};
114 
115 	const struct zbus_channel *chan;
116 
117 	while (!zbus_sub_wait(&peripheral_sub, &chan, K_FOREVER)) {
118 		LOG_DBG("[Peripheral] starting measurement");
119 
120 		++count_peripheral;
121 		++a;
122 		++b;
123 
124 		sd.a = a;
125 		sd.b = b;
126 
127 		LOG_DBG("[Peripheral] sending sensor data");
128 
129 		zbus_chan_pub(&sensor_data_chan, &sd, K_MSEC(250));
130 
131 		k_msleep(150);
132 	}
133 }
134 
135 K_THREAD_DEFINE(peripheral_thread_id, 1024, peripheral_thread, NULL, NULL, NULL, 3, 0, 0);
136 
context_reset(void * f)137 static void context_reset(void *f)
138 {
139 	k_busy_wait(1000000);
140 
141 	a = 0;
142 	b = 0;
143 	count_callback = 0;
144 	count_core = 0;
145 	count_net = 0;
146 	count_net_log = 0;
147 	count_peripheral = 0;
148 	pkt.total = 0;
149 	struct net_pkt_msg *p;
150 
151 	zbus_chan_claim(&net_pkt_chan, K_NO_WAIT);
152 	p = zbus_chan_msg(&net_pkt_chan);
153 	p->total = 0;
154 	zbus_chan_finish(&net_pkt_chan);
155 	struct sensor_data_msg *sd;
156 
157 	zbus_chan_claim(&sensor_data_chan, K_NO_WAIT);
158 	sd = (struct sensor_data_msg *)sensor_data_chan.message;
159 	sd->a = 0;
160 	sd->b = 1;
161 	zbus_chan_finish(&sensor_data_chan);
162 	zbus_obs_set_enable(&critical_lis, true);
163 	zbus_obs_set_enable(&peripheral_sub, true);
164 	zbus_chan_claim(&start_measurement_chan, K_NO_WAIT);
165 	struct action_msg *act = (struct action_msg *)zbus_chan_msg(&start_measurement_chan);
166 
167 	act->status = false;
168 	zbus_chan_finish(&start_measurement_chan);
169 	zbus_chan_claim(&net_log_chan, K_NO_WAIT);
170 	struct net_log_msg *lm = (struct net_log_msg *)zbus_chan_msg(&net_log_chan);
171 
172 	lm->count_net = 0;
173 	lm->pkt_total = 0;
174 	zbus_chan_finish(&net_log_chan);
175 }
176 
ZTEST(integration,test_basic)177 ZTEST(integration, test_basic)
178 {
179 	struct action_msg start = {true};
180 	struct net_log_msg *lm = (struct net_log_msg *)zbus_chan_const_msg(&net_log_chan);
181 
182 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
183 
184 	k_msleep(200);
185 
186 	zassert_equal(count_callback, 1, NULL);
187 	zassert_equal(count_core, 1, NULL);
188 	zassert_equal(count_net, 1, NULL);
189 	zassert_equal(count_peripheral, 1, NULL);
190 	zassert_equal(count_net_log, 1, NULL);
191 	zassert_equal(count_net, lm->count_net, NULL);
192 
193 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
194 
195 	k_msleep(200);
196 
197 	zassert_equal(count_callback, 2, NULL);
198 	zassert_equal(count_core, 2, NULL);
199 	zassert_equal(count_net, 2, NULL);
200 	zassert_equal(count_peripheral, 2, NULL);
201 	zassert_equal(count_net_log, 2, NULL);
202 	zassert_equal(count_net, lm->count_net, NULL);
203 
204 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
205 
206 	k_msleep(200);
207 
208 	zassert_equal(count_callback, 3, NULL);
209 	zassert_equal(count_core, 3, NULL);
210 	zassert_equal(count_net, 3, NULL);
211 	zassert_equal(count_peripheral, 3, NULL);
212 	zassert_equal(count_net_log, 3, NULL);
213 	zassert_equal(count_net, lm->count_net, NULL);
214 
215 	zassert_equal(pkt.total, 6, "result was %d", pkt.total);
216 	zassert_equal(pkt.total, lm->pkt_total, NULL);
217 }
218 
ZTEST(integration,test_channel_set_enable)219 ZTEST(integration, test_channel_set_enable)
220 {
221 	struct action_msg start = {true};
222 	const struct net_log_msg *lm = zbus_chan_const_msg(&net_log_chan);
223 
224 	zassert_equal(0, zbus_obs_set_enable(&critical_lis, false), NULL);
225 	zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, false), NULL);
226 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
227 
228 	k_msleep(200);
229 
230 	zassert_equal(count_callback, 0, NULL);
231 	zassert_equal(count_core, 0, NULL);
232 	zassert_equal(count_peripheral, 0, NULL);
233 	zassert_equal(count_net, 0, NULL);
234 	zassert_equal(count_net_log, 0, NULL);
235 	zassert_equal(count_net, lm->count_net, NULL);
236 
237 	zassert_equal(0, zbus_obs_set_enable(&critical_lis, false), NULL);
238 	zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, true), NULL);
239 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
240 
241 	k_msleep(200);
242 
243 	zassert_equal(count_callback, 0, NULL);
244 	zassert_equal(count_core, 1, NULL);
245 	zassert_equal(count_net, 1, NULL);
246 	zassert_equal(count_peripheral, 1, NULL);
247 	zassert_equal(count_net_log, 1, NULL);
248 	zassert_equal(count_net, lm->count_net, NULL);
249 
250 	zassert_equal(0, zbus_obs_set_enable(&critical_lis, true), NULL);
251 	zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, false), NULL);
252 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
253 
254 	k_msleep(200);
255 
256 	zassert_equal(count_callback, 1, NULL);
257 	zassert_equal(count_core, 1, NULL);
258 	zassert_equal(count_net, 1, NULL);
259 	zassert_equal(count_peripheral, 1, NULL);
260 	zassert_equal(count_net_log, 1, NULL);
261 	zassert_equal(count_net, lm->count_net, NULL);
262 
263 	zassert_equal(0, zbus_obs_set_enable(&critical_lis, true), NULL);
264 	zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, true), NULL);
265 	zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
266 
267 	k_msleep(200);
268 
269 	zassert_equal(count_callback, 2, NULL);
270 	zassert_equal(count_core, 2, NULL);
271 	zassert_equal(count_peripheral, 2, NULL);
272 	zassert_equal(count_net, 2, NULL);
273 	zassert_equal(count_net_log, 2, NULL);
274 	zassert_equal(count_net, lm->count_net, NULL);
275 
276 	zassert_equal(pkt.total, 4, "result was %d", pkt.total);
277 	zassert_equal(pkt.total, lm->pkt_total, NULL);
278 }
279 
greedy_thread_entry(void * p1,void * p2,void * p3)280 static void greedy_thread_entry(void *p1, void *p2, void *p3)
281 {
282 	int err = zbus_chan_claim(&busy_chan, K_MSEC(500));
283 
284 	zassert_equal(err, 0, "Could not claim the channel");
285 	k_msleep(2000);
286 	zassert_equal(0, zbus_chan_finish(&busy_chan), NULL);
287 }
288 
289 K_THREAD_STACK_DEFINE(greedy_thread_stack_area, 1024);
290 
291 static struct k_thread greedy_thread_data;
292 
ZTEST(integration,test_event_dispatcher_mutex_timeout)293 ZTEST(integration, test_event_dispatcher_mutex_timeout)
294 {
295 	struct action_msg read;
296 	struct action_msg sent = {.status = true};
297 
298 	int err = zbus_chan_read(&busy_chan, &read, K_NO_WAIT);
299 
300 	zassert_equal(err, 0, "Could not read the channel");
301 
302 	zassert_equal(read.status, 0, "Read status must be false");
303 
304 	k_thread_create(&greedy_thread_data, greedy_thread_stack_area,
305 			K_THREAD_STACK_SIZEOF(greedy_thread_stack_area), greedy_thread_entry, NULL,
306 			NULL, NULL, CONFIG_ZTEST_THREAD_PRIORITY, K_USER | K_INHERIT_PERMS,
307 			K_NO_WAIT);
308 
309 	k_msleep(500);
310 
311 	err = zbus_chan_pub(&busy_chan, &sent, K_MSEC(200));
312 	zassert_equal(err, -EAGAIN, "Channel must be busy and could no be published %d", err);
313 	err = zbus_chan_read(&busy_chan, &read, K_MSEC(200));
314 	zassert_equal(err, -EAGAIN, "Channel must be busy and could no be published %d", err);
315 	err = zbus_chan_claim(&busy_chan, K_MSEC(200));
316 	zassert_equal(err, -EAGAIN, "Channel must be busy and could no be claimed %d", err);
317 	err = zbus_chan_pub(&busy_chan, &sent, K_MSEC(200));
318 	zassert_equal(err, -EAGAIN, "Channel must be busy and could no be published %d", err);
319 	/* Wait for the greedy thread to finish and pub and read successfully */
320 	err = zbus_chan_pub(&busy_chan, &sent, K_MSEC(2000));
321 	zassert_equal(err, 0, "Channel must be busy and could no be published %d", err);
322 	err = zbus_chan_read(&busy_chan, &read, K_MSEC(2000));
323 	zassert_equal(err, 0, "Could not read the channel");
324 
325 	zassert_equal(read.status, true, "Read status must be false");
326 }
327 
ZTEST(integration,test_event_dispatcher_queue_timeout)328 ZTEST(integration, test_event_dispatcher_queue_timeout)
329 {
330 	struct action_msg sent = {.status = true};
331 	struct action_msg read = {.status = true};
332 
333 	zassert_equal(0, zbus_obs_set_enable(&core_sub, false), NULL);
334 	zassert_equal(0, zbus_obs_set_enable(&net_sub, false), NULL);
335 	int err = zbus_chan_pub(&start_measurement_chan, &sent, K_MSEC(100));
336 
337 	zassert_equal(err, 0, "Could not pub the channel");
338 	k_msleep(10);
339 	sent.status = false;
340 	err = zbus_chan_pub(&start_measurement_chan, &sent, K_MSEC(100));
341 	zassert_equal(err, 0, "Could not pub the channel");
342 	k_msleep(10);
343 	err = zbus_chan_pub(&start_measurement_chan, &sent, K_MSEC(100));
344 	zassert_equal(err, -EAGAIN, "Pub to the channel %d must not occur here", err);
345 	err = zbus_chan_read(&start_measurement_chan, &read, K_NO_WAIT);
346 	zassert_equal(err, 0, "Could not read the channel");
347 	zassert_equal(read.status, false,
348 		      "Read status must be false. The notification was not sent, but "
349 		      "the channel actually changed");
350 	k_msleep(500);
351 	zassert_equal(count_callback, 3, NULL);
352 	zassert_equal(count_peripheral, 2, NULL);
353 }
354 
355 ZTEST_SUITE(integration, NULL, NULL, context_reset, NULL, NULL);
356