1 /*
2 * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5 #include "messages.h"
6
7 #include <zephyr/kernel.h>
8 #include <zephyr/logging/log.h>
9 #include <zephyr/sys/util_macro.h>
10 #include <zephyr/zbus/zbus.h>
11 #include <zephyr/ztest.h>
12 #include <zephyr/ztest_assert.h>
13 LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
14
15 ZBUS_CHAN_DECLARE(version_chan, sensor_data_chan, net_pkt_chan, net_log_chan,
16 start_measurement_chan, busy_chan);
17
18 static int count_callback;
19
urgent_callback(const struct zbus_channel * chan)20 static void urgent_callback(const struct zbus_channel *chan)
21 {
22 LOG_INF(" *** LISTENER activated for channel %s ***\n", zbus_chan_name(chan));
23
24 ++count_callback;
25 }
26
27 ZBUS_LISTENER_DEFINE(critical_lis, urgent_callback);
28
29 static int count_core;
30
31 ZBUS_SUBSCRIBER_DEFINE(core_sub, 1);
32
core_thread(void)33 static void core_thread(void)
34 {
35 const struct zbus_channel *chan = NULL;
36
37 while (1) {
38 if (!zbus_sub_wait(&core_sub, &chan, K_FOREVER)) {
39 count_core++;
40
41 struct sensor_data_msg data;
42
43 zbus_chan_read(&sensor_data_chan, &data, K_NO_WAIT);
44
45 struct net_pkt_msg pkt = {.total = data.a + data.b};
46
47 LOG_DBG("Sensor {a = %d, b = %d}. Sending pkt {total=%d}", data.a, data.b,
48 pkt.total);
49
50 zbus_chan_pub(&net_pkt_chan, &pkt, K_MSEC(200));
51 }
52 }
53 }
54
55 K_THREAD_DEFINE(core_thread_id, 1024, core_thread, NULL, NULL, NULL, 3, 0, 0);
56
57 static int count_net;
58 static struct net_pkt_msg pkt = {0};
59
60 ZBUS_SUBSCRIBER_DEFINE(net_sub, 4);
61
net_thread(void)62 static void net_thread(void)
63 {
64 const struct zbus_channel *chan;
65
66 while (1) {
67 if (!zbus_sub_wait(&net_sub, &chan, K_FOREVER)) {
68 count_net++;
69
70 zbus_chan_read(&net_pkt_chan, &pkt, K_NO_WAIT);
71
72 LOG_DBG("[Net] Total %d", pkt.total);
73
74 struct net_log_msg log_msg = {.count_net = count_net,
75 .pkt_total = pkt.total};
76
77 zbus_chan_pub(&net_log_chan, &log_msg, K_MSEC(500));
78 }
79 }
80 }
81
82 K_THREAD_DEFINE(net_thread_id, 1024, net_thread, NULL, NULL, NULL, 3, 0, 0);
83
84 static int count_net_log;
85
86 ZBUS_MSG_SUBSCRIBER_DEFINE(net_log_sub);
87
net_log_thread(void)88 static void net_log_thread(void)
89 {
90 const struct zbus_channel *chan;
91 struct net_log_msg log_msg;
92
93 while (1) {
94 if (!zbus_sub_wait_msg(&net_log_sub, &chan, &log_msg, K_FOREVER)) {
95 count_net_log++;
96
97 LOG_DBG("[Net log]: count_net = %d, pkt.total = %d", log_msg.count_net,
98 log_msg.pkt_total);
99 }
100 }
101 }
102
103 K_THREAD_DEFINE(net_log_thread_id, 1024, net_log_thread, NULL, NULL, NULL, 3, 0, 0);
104
105 static int a;
106 static int b;
107 static int count_peripheral;
108
109 ZBUS_SUBSCRIBER_DEFINE(peripheral_sub, 1);
110
peripheral_thread(void)111 static void peripheral_thread(void)
112 {
113 struct sensor_data_msg sd = {0, 0};
114
115 const struct zbus_channel *chan;
116
117 while (!zbus_sub_wait(&peripheral_sub, &chan, K_FOREVER)) {
118 LOG_DBG("[Peripheral] starting measurement");
119
120 ++count_peripheral;
121 ++a;
122 ++b;
123
124 sd.a = a;
125 sd.b = b;
126
127 LOG_DBG("[Peripheral] sending sensor data");
128
129 zbus_chan_pub(&sensor_data_chan, &sd, K_MSEC(250));
130
131 k_msleep(150);
132 }
133 }
134
135 K_THREAD_DEFINE(peripheral_thread_id, 1024, peripheral_thread, NULL, NULL, NULL, 3, 0, 0);
136
context_reset(void * f)137 static void context_reset(void *f)
138 {
139 k_busy_wait(1000000);
140
141 a = 0;
142 b = 0;
143 count_callback = 0;
144 count_core = 0;
145 count_net = 0;
146 count_net_log = 0;
147 count_peripheral = 0;
148 pkt.total = 0;
149 struct net_pkt_msg *p;
150
151 zbus_chan_claim(&net_pkt_chan, K_NO_WAIT);
152 p = zbus_chan_msg(&net_pkt_chan);
153 p->total = 0;
154 zbus_chan_finish(&net_pkt_chan);
155 struct sensor_data_msg *sd;
156
157 zbus_chan_claim(&sensor_data_chan, K_NO_WAIT);
158 sd = (struct sensor_data_msg *)sensor_data_chan.message;
159 sd->a = 0;
160 sd->b = 1;
161 zbus_chan_finish(&sensor_data_chan);
162 zbus_obs_set_enable(&critical_lis, true);
163 zbus_obs_set_enable(&peripheral_sub, true);
164 zbus_chan_claim(&start_measurement_chan, K_NO_WAIT);
165 struct action_msg *act = (struct action_msg *)zbus_chan_msg(&start_measurement_chan);
166
167 act->status = false;
168 zbus_chan_finish(&start_measurement_chan);
169 zbus_chan_claim(&net_log_chan, K_NO_WAIT);
170 struct net_log_msg *lm = (struct net_log_msg *)zbus_chan_msg(&net_log_chan);
171
172 lm->count_net = 0;
173 lm->pkt_total = 0;
174 zbus_chan_finish(&net_log_chan);
175 }
176
ZTEST(integration,test_basic)177 ZTEST(integration, test_basic)
178 {
179 struct action_msg start = {true};
180 struct net_log_msg *lm = (struct net_log_msg *)zbus_chan_const_msg(&net_log_chan);
181
182 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
183
184 k_msleep(200);
185
186 zassert_equal(count_callback, 1, NULL);
187 zassert_equal(count_core, 1, NULL);
188 zassert_equal(count_net, 1, NULL);
189 zassert_equal(count_peripheral, 1, NULL);
190 zassert_equal(count_net_log, 1, NULL);
191 zassert_equal(count_net, lm->count_net, NULL);
192
193 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
194
195 k_msleep(200);
196
197 zassert_equal(count_callback, 2, NULL);
198 zassert_equal(count_core, 2, NULL);
199 zassert_equal(count_net, 2, NULL);
200 zassert_equal(count_peripheral, 2, NULL);
201 zassert_equal(count_net_log, 2, NULL);
202 zassert_equal(count_net, lm->count_net, NULL);
203
204 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
205
206 k_msleep(200);
207
208 zassert_equal(count_callback, 3, NULL);
209 zassert_equal(count_core, 3, NULL);
210 zassert_equal(count_net, 3, NULL);
211 zassert_equal(count_peripheral, 3, NULL);
212 zassert_equal(count_net_log, 3, NULL);
213 zassert_equal(count_net, lm->count_net, NULL);
214
215 zassert_equal(pkt.total, 6, "result was %d", pkt.total);
216 zassert_equal(pkt.total, lm->pkt_total, NULL);
217 }
218
ZTEST(integration,test_channel_set_enable)219 ZTEST(integration, test_channel_set_enable)
220 {
221 struct action_msg start = {true};
222 const struct net_log_msg *lm = zbus_chan_const_msg(&net_log_chan);
223
224 zassert_equal(0, zbus_obs_set_enable(&critical_lis, false), NULL);
225 zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, false), NULL);
226 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
227
228 k_msleep(200);
229
230 zassert_equal(count_callback, 0, NULL);
231 zassert_equal(count_core, 0, NULL);
232 zassert_equal(count_peripheral, 0, NULL);
233 zassert_equal(count_net, 0, NULL);
234 zassert_equal(count_net_log, 0, NULL);
235 zassert_equal(count_net, lm->count_net, NULL);
236
237 zassert_equal(0, zbus_obs_set_enable(&critical_lis, false), NULL);
238 zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, true), NULL);
239 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
240
241 k_msleep(200);
242
243 zassert_equal(count_callback, 0, NULL);
244 zassert_equal(count_core, 1, NULL);
245 zassert_equal(count_net, 1, NULL);
246 zassert_equal(count_peripheral, 1, NULL);
247 zassert_equal(count_net_log, 1, NULL);
248 zassert_equal(count_net, lm->count_net, NULL);
249
250 zassert_equal(0, zbus_obs_set_enable(&critical_lis, true), NULL);
251 zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, false), NULL);
252 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
253
254 k_msleep(200);
255
256 zassert_equal(count_callback, 1, NULL);
257 zassert_equal(count_core, 1, NULL);
258 zassert_equal(count_net, 1, NULL);
259 zassert_equal(count_peripheral, 1, NULL);
260 zassert_equal(count_net_log, 1, NULL);
261 zassert_equal(count_net, lm->count_net, NULL);
262
263 zassert_equal(0, zbus_obs_set_enable(&critical_lis, true), NULL);
264 zassert_equal(0, zbus_obs_set_enable(&peripheral_sub, true), NULL);
265 zassert_equal(0, zbus_chan_pub(&start_measurement_chan, &start, K_MSEC(200)), NULL);
266
267 k_msleep(200);
268
269 zassert_equal(count_callback, 2, NULL);
270 zassert_equal(count_core, 2, NULL);
271 zassert_equal(count_peripheral, 2, NULL);
272 zassert_equal(count_net, 2, NULL);
273 zassert_equal(count_net_log, 2, NULL);
274 zassert_equal(count_net, lm->count_net, NULL);
275
276 zassert_equal(pkt.total, 4, "result was %d", pkt.total);
277 zassert_equal(pkt.total, lm->pkt_total, NULL);
278 }
279
greedy_thread_entry(void * p1,void * p2,void * p3)280 static void greedy_thread_entry(void *p1, void *p2, void *p3)
281 {
282 int err = zbus_chan_claim(&busy_chan, K_MSEC(500));
283
284 zassert_equal(err, 0, "Could not claim the channel");
285 k_msleep(2000);
286 zassert_equal(0, zbus_chan_finish(&busy_chan), NULL);
287 }
288
289 K_THREAD_STACK_DEFINE(greedy_thread_stack_area, 1024);
290
291 static struct k_thread greedy_thread_data;
292
ZTEST(integration,test_event_dispatcher_mutex_timeout)293 ZTEST(integration, test_event_dispatcher_mutex_timeout)
294 {
295 struct action_msg read;
296 struct action_msg sent = {.status = true};
297
298 int err = zbus_chan_read(&busy_chan, &read, K_NO_WAIT);
299
300 zassert_equal(err, 0, "Could not read the channel");
301
302 zassert_equal(read.status, 0, "Read status must be false");
303
304 k_thread_create(&greedy_thread_data, greedy_thread_stack_area,
305 K_THREAD_STACK_SIZEOF(greedy_thread_stack_area), greedy_thread_entry, NULL,
306 NULL, NULL, CONFIG_ZTEST_THREAD_PRIORITY, K_USER | K_INHERIT_PERMS,
307 K_NO_WAIT);
308
309 k_msleep(500);
310
311 err = zbus_chan_pub(&busy_chan, &sent, K_MSEC(200));
312 zassert_equal(err, -EAGAIN, "Channel must be busy and could no be published %d", err);
313 err = zbus_chan_read(&busy_chan, &read, K_MSEC(200));
314 zassert_equal(err, -EAGAIN, "Channel must be busy and could no be published %d", err);
315 err = zbus_chan_claim(&busy_chan, K_MSEC(200));
316 zassert_equal(err, -EAGAIN, "Channel must be busy and could no be claimed %d", err);
317 err = zbus_chan_pub(&busy_chan, &sent, K_MSEC(200));
318 zassert_equal(err, -EAGAIN, "Channel must be busy and could no be published %d", err);
319 /* Wait for the greedy thread to finish and pub and read successfully */
320 err = zbus_chan_pub(&busy_chan, &sent, K_MSEC(2000));
321 zassert_equal(err, 0, "Channel must be busy and could no be published %d", err);
322 err = zbus_chan_read(&busy_chan, &read, K_MSEC(2000));
323 zassert_equal(err, 0, "Could not read the channel");
324
325 zassert_equal(read.status, true, "Read status must be false");
326 }
327
ZTEST(integration,test_event_dispatcher_queue_timeout)328 ZTEST(integration, test_event_dispatcher_queue_timeout)
329 {
330 struct action_msg sent = {.status = true};
331 struct action_msg read = {.status = true};
332
333 zassert_equal(0, zbus_obs_set_enable(&core_sub, false), NULL);
334 zassert_equal(0, zbus_obs_set_enable(&net_sub, false), NULL);
335 int err = zbus_chan_pub(&start_measurement_chan, &sent, K_MSEC(100));
336
337 zassert_equal(err, 0, "Could not pub the channel");
338 k_msleep(10);
339 sent.status = false;
340 err = zbus_chan_pub(&start_measurement_chan, &sent, K_MSEC(100));
341 zassert_equal(err, 0, "Could not pub the channel");
342 k_msleep(10);
343 err = zbus_chan_pub(&start_measurement_chan, &sent, K_MSEC(100));
344 zassert_equal(err, -EAGAIN, "Pub to the channel %d must not occur here", err);
345 err = zbus_chan_read(&start_measurement_chan, &read, K_NO_WAIT);
346 zassert_equal(err, 0, "Could not read the channel");
347 zassert_equal(read.status, false,
348 "Read status must be false. The notification was not sent, but "
349 "the channel actually changed");
350 k_msleep(500);
351 zassert_equal(count_callback, 3, NULL);
352 zassert_equal(count_peripheral, 2, NULL);
353 }
354
355 ZTEST_SUITE(integration, NULL, NULL, context_reset, NULL, NULL);
356