1 /*
2 * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5 #include "messages.h"
6
7 #include <zephyr/kernel.h>
8 #include <zephyr/logging/log.h>
9 #include <zephyr/sys/util_macro.h>
10 #include <zephyr/zbus/zbus.h>
11 #include <zephyr/ztest.h>
12 #include <zephyr/ztest_assert.h>
13 LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
14
15 ZBUS_CHAN_DECLARE(version_chan, sensor_data_chan, net_pkt_chan, net_log_chan,
16 start_measurement_chan, busy_chan);
17
18 static int count_callback;
19
urgent_callback(const struct zbus_channel * chan)20 static void urgent_callback(const struct zbus_channel *chan)
21 {
22 LOG_INF(" *** LISTENER activated for channel %s ***\n", zbus_chan_name(chan));
23
24 ++count_callback;
25 }
26
27 ZBUS_LISTENER_DEFINE(critical_lis, urgent_callback);
28
29 static int count_core;
30
31 ZBUS_SUBSCRIBER_DEFINE(core_sub, 1);
32
core_thread(void)33 static void core_thread(void)
34 {
35 const struct zbus_channel *chan = NULL;
36
37 while (1) {
38 if (!zbus_sub_wait(&core_sub, &chan, K_FOREVER)) {
39 count_core++;
40
41 struct sensor_data_msg data;
42
43 zbus_chan_read(&sensor_data_chan, &data, K_FOREVER);
44
45 struct net_pkt_msg pkt = {.total = data.a + data.b};
46
47 LOG_DBG("Sensor {a = %d, b = %d}. Sending pkt {total=%d}", data.a, data.b,
48 pkt.total);
49
50 zbus_chan_pub(&net_pkt_chan, &pkt, K_MSEC(200));
51 }
52 }
53 }
54
55 K_THREAD_DEFINE(core_thread_id, 1024, core_thread, NULL, NULL, NULL, 3, 0, 0);
56
57 static int count_net;
58 static struct net_pkt_msg pkt = {0};
59
60 ZBUS_SUBSCRIBER_DEFINE(net_sub, 4);
61
net_thread(void)62 static void net_thread(void)
63 {
64 const struct zbus_channel *chan;
65
66 while (1) {
67 if (!zbus_sub_wait(&net_sub, &chan, K_FOREVER)) {
68 count_net++;
69 zbus_chan_read(&net_pkt_chan, &pkt, K_FOREVER);
70
71 LOG_DBG("[Net] Total %d", pkt.total);
72
73 struct net_log_msg log_msg = {.count_net = count_net,
74 .pkt_total = pkt.total};
75
76 zbus_chan_pub(&net_log_chan, &log_msg, K_MSEC(500));
77 }
78 }
79 }
80
81 K_THREAD_DEFINE(net_thread_id, 1024, net_thread, NULL, NULL, NULL, 3, 0, 0);
82
83 static int count_net_log;
84
85 ZBUS_MSG_SUBSCRIBER_DEFINE(net_log_sub);
86
net_log_thread(void)87 static void net_log_thread(void)
88 {
89 const struct zbus_channel *chan;
90 struct net_log_msg log_msg;
91
92 while (1) {
93 if (!zbus_sub_wait_msg(&net_log_sub, &chan, &log_msg, K_FOREVER)) {
94 count_net_log++;
95
96 LOG_DBG("[Net log]: count_net = %d, pkt.total = %d", log_msg.count_net,
97 log_msg.pkt_total);
98 }
99 }
100 }
101
102 K_THREAD_DEFINE(net_log_thread_id, 1024, net_log_thread, NULL, NULL, NULL, 3, 0, 0);
103
104 static int a;
105 static int b;
106 static int count_peripheral;
107
108 ZBUS_SUBSCRIBER_DEFINE(peripheral_sub, 1);
109
peripheral_thread(void)110 static void peripheral_thread(void)
111 {
112 struct sensor_data_msg sd = {0, 0};
113
114 const struct zbus_channel *chan;
115
116 while (!zbus_sub_wait(&peripheral_sub, &chan, K_FOREVER)) {
117 LOG_DBG("[Peripheral] starting measurement");
118
119 ++count_peripheral;
120 ++a;
121 ++b;
122
123 sd.a = a;
124 sd.b = b;
125
126 LOG_DBG("[Peripheral] sending sensor data");
127
128 zbus_chan_pub(&sensor_data_chan, &sd, K_MSEC(250));
129
130 k_msleep(150);
131 }
132 }
133
134 K_THREAD_DEFINE(peripheral_thread_id, 1024, peripheral_thread, NULL, NULL, NULL, 3, 0, 0);
135
context_reset(void * f)136 static void context_reset(void *f)
137 {
138 k_busy_wait(1000000);
139
140 a = 0;
141 b = 0;
142 count_callback = 0;
143 count_core = 0;
144 count_net = 0;
145 count_net_log = 0;
146 count_peripheral = 0;
147 pkt.total = 0;
148 struct net_pkt_msg *p;
149
150 zbus_chan_claim(&net_pkt_chan, K_NO_WAIT);
151 p = zbus_chan_msg(&net_pkt_chan);
152 p->total = 0;
153 zbus_chan_finish(&net_pkt_chan);
154 struct sensor_data_msg *sd;
155
156 zbus_chan_claim(&sensor_data_chan, K_NO_WAIT);
157 sd = (struct sensor_data_msg *)sensor_data_chan.message;
158 sd->a = 0;
159 sd->b = 1;
160 zbus_chan_finish(&sensor_data_chan);
161 zbus_obs_set_enable(&critical_lis, true);
162 zbus_obs_set_enable(&peripheral_sub, true);
163 zbus_chan_claim(&start_measurement_chan, K_NO_WAIT);
164 struct action_msg *act = (struct action_msg *)zbus_chan_msg(&start_measurement_chan);
165
166 act->status = false;
167 zbus_chan_finish(&start_measurement_chan);
168 zbus_chan_claim(&net_log_chan, K_NO_WAIT);
169 struct net_log_msg *lm = (struct net_log_msg *)zbus_chan_msg(&net_log_chan);
170
171 lm->count_net = 0;
172 lm->pkt_total = 0;
173 zbus_chan_finish(&net_log_chan);
174 }
175
ZTEST(integration,test_basic)176 ZTEST(integration, test_basic)
177 {
178 struct action_msg start = {true};
179 struct net_log_msg *lm = (struct net_log_msg *)zbus_chan_const_msg(&net_log_chan);
180
181 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
182
183 k_msleep(200);
184
185 zassert_equal(count_callback, 1, NULL);
186 zassert_equal(count_core, 1, NULL);
187 zassert_equal(count_net, 1, NULL);
188 zassert_equal(count_peripheral, 1, NULL);
189 zassert_equal(count_net_log, 1, NULL);
190 zassert_equal(count_net, lm->count_net, NULL);
191
192 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
193
194 k_msleep(200);
195
196 zassert_equal(count_callback, 2, NULL);
197 zassert_equal(count_core, 2, NULL);
198 zassert_equal(count_net, 2, NULL);
199 zassert_equal(count_peripheral, 2, NULL);
200 zassert_equal(count_net_log, 2, NULL);
201 zassert_equal(count_net, lm->count_net, NULL);
202
203 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
204
205 k_msleep(200);
206
207 zassert_equal(count_callback, 3, NULL);
208 zassert_equal(count_core, 3, NULL);
209 zassert_equal(count_net, 3, NULL);
210 zassert_equal(count_peripheral, 3, NULL);
211 zassert_equal(count_net_log, 3, NULL);
212 zassert_equal(count_net, lm->count_net, NULL);
213
214 zassert_equal(pkt.total, 6, "result was %d", pkt.total);
215 zassert_equal(pkt.total, lm->pkt_total, NULL);
216 }
217
ZTEST(integration,test_channel_set_enable)218 ZTEST(integration, test_channel_set_enable)
219 {
220 struct action_msg start = {true};
221 const struct net_log_msg *lm = zbus_chan_const_msg(&net_log_chan);
222
223 zassert_equal(0, zbus_obs_set_enable(&critical_lis, false), NULL);
224 zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, false), NULL);
225 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
226
227 k_msleep(200);
228
229 zassert_equal(count_callback, 0, NULL);
230 zassert_equal(count_core, 0, NULL);
231 zassert_equal(count_peripheral, 0, NULL);
232 zassert_equal(count_net, 0, NULL);
233 zassert_equal(count_net_log, 0, NULL);
234 zassert_equal(count_net, lm->count_net, NULL);
235
236 zassert_equal(0, zbus_obs_set_enable(&critical_lis, false), NULL);
237 zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, true), NULL);
238 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
239
240 k_msleep(200);
241
242 zassert_equal(count_callback, 0, NULL);
243 zassert_equal(count_core, 1, NULL);
244 zassert_equal(count_net, 1, NULL);
245 zassert_equal(count_peripheral, 1, NULL);
246 zassert_equal(count_net_log, 1, NULL);
247 zassert_equal(count_net, lm->count_net, NULL);
248
249 zassert_equal(0, zbus_obs_set_enable(&critical_lis, true), NULL);
250 zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, false), NULL);
251 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
252
253 k_msleep(200);
254
255 zassert_equal(count_callback, 1, NULL);
256 zassert_equal(count_core, 1, NULL);
257 zassert_equal(count_net, 1, NULL);
258 zassert_equal(count_peripheral, 1, NULL);
259 zassert_equal(count_net_log, 1, NULL);
260 zassert_equal(count_net, lm->count_net, NULL);
261
262 zassert_equal(0, zbus_obs_set_enable(&critical_lis, true), NULL);
263 zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, true), NULL);
264 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
265
266 k_msleep(200);
267
268 zassert_equal(count_callback, 2, NULL);
269 zassert_equal(count_core, 2, NULL);
270 zassert_equal(count_peripheral, 2, NULL);
271 zassert_equal(count_net, 2, NULL);
272 zassert_equal(count_net_log, 2, NULL);
273 zassert_equal(count_net, lm->count_net, NULL);
274
275 zassert_equal(pkt.total, 4, "result was %d", pkt.total);
276 zassert_equal(pkt.total, lm->pkt_total, NULL);
277 }
278
greedy_thread_entry(void * p1,void * p2,void * p3)279 static void greedy_thread_entry(void *p1, void *p2, void *p3)
280 {
281 int err = zbus_chan_claim(&busy_chan, K_MSEC(500));
282
283 zassert_equal(err, 0, "Could not claim the channel");
284 k_msleep(2000);
285 zassert_equal(0, zbus_chan_finish(&busy_chan), NULL);
286 }
287
288 K_THREAD_STACK_DEFINE(greedy_thread_stack_area, 1024);
289
290 static struct k_thread greedy_thread_data;
291
ZTEST(integration,test_event_dispatcher_mutex_timeout)292 ZTEST(integration, test_event_dispatcher_mutex_timeout)
293 {
294 struct action_msg read;
295 struct action_msg sent = {.status = true};
296
297 int err = zbus_chan_read(&busy_chan, &read, K_NO_WAIT);
298
299 zassert_equal(err, 0, "Could not read the channel");
300
301 zassert_equal(read.status, 0, "Read status must be false");
302
303 k_thread_create(&greedy_thread_data, greedy_thread_stack_area,
304 K_THREAD_STACK_SIZEOF(greedy_thread_stack_area), greedy_thread_entry, NULL,
305 NULL, NULL, CONFIG_ZTEST_THREAD_PRIORITY, K_USER | K_INHERIT_PERMS,
306 K_NO_WAIT);
307
308 k_msleep(500);
309
310 err = zbus_chan_pub(&busy_chan, &sent, K_MSEC(200));
311 zassert_equal(err, -EAGAIN, "Channel must be busy and could no be published %d", err);
312 err = zbus_chan_read(&busy_chan, &read, K_MSEC(200));
313 zassert_equal(err, -EAGAIN, "Channel must be busy and could no be published %d", err);
314 err = zbus_chan_claim(&busy_chan, K_MSEC(200));
315 zassert_equal(err, -EAGAIN, "Channel must be busy and could no be claimed %d", err);
316 err = zbus_chan_pub(&busy_chan, &sent, K_MSEC(200));
317 zassert_equal(err, -EAGAIN, "Channel must be busy and could no be published %d", err);
318 /* Wait for the greedy thread to finish and pub and read successfully */
319 err = zbus_chan_pub(&busy_chan, &sent, K_MSEC(2000));
320 zassert_equal(err, 0, "Channel must be busy and could no be published %d", err);
321 err = zbus_chan_read(&busy_chan, &read, K_MSEC(2000));
322 zassert_equal(err, 0, "Could not read the channel");
323
324 zassert_equal(read.status, true, "Read status must be false");
325 }
326
ZTEST(integration,test_event_dispatcher_queue_timeout)327 ZTEST(integration, test_event_dispatcher_queue_timeout)
328 {
329 struct action_msg sent = {.status = true};
330 struct action_msg read = {.status = true};
331
332 zassert_equal(0, zbus_obs_set_enable(&core_sub, false), NULL);
333 zassert_equal(0, zbus_obs_set_enable(&net_sub, false), NULL);
334 int err = zbus_chan_pub(&start_measurement_chan, &sent, K_MSEC(100));
335
336 zassert_equal(err, 0, "Could not pub the channel");
337 k_msleep(10);
338 sent.status = false;
339 err = zbus_chan_pub(&start_measurement_chan, &sent, K_MSEC(100));
340 zassert_equal(err, 0, "Could not pub the channel");
341 k_msleep(10);
342 err = zbus_chan_pub(&start_measurement_chan, &sent, K_MSEC(100));
343 zassert_equal(err, -EAGAIN, "Pub to the channel %d must not occur here", err);
344 err = zbus_chan_read(&start_measurement_chan, &read, K_NO_WAIT);
345 zassert_equal(err, 0, "Could not read the channel");
346 zassert_equal(read.status, false,
347 "Read status must be false. The notification was not sent, but "
348 "the channel actually changed");
349 k_msleep(500);
350 zassert_equal(count_callback, 3, NULL);
351 zassert_equal(count_peripheral, 2, NULL);
352 }
353
354 ZTEST_SUITE(integration, NULL, NULL, context_reset, NULL, NULL);
355