1 /*
2 * Copyright (c) 2023 Rodrigo Peixoto <rodrigopex@gmail.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 #include <zephyr/kernel.h>
7 #include <zephyr/logging/log.h>
8 #include <zephyr/zbus/zbus.h>
9 LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
10
11 static atomic_t sub_count = ATOMIC_INIT(0);
12
13 struct confirmed_msg {
14 uint32_t payload;
15 };
16
17 ZBUS_CHAN_DEFINE(confirmed_chan, /* Name */
18 struct confirmed_msg, /* Message type */
19
20 NULL, /* Validator */
21 &sub_count, /* User data */
22 ZBUS_OBSERVERS(foo_lis, bar_sub1, bar_sub2, bar_sub3), /* observers */
23 ZBUS_MSG_INIT(.payload = 0) /* Initial value */
24 );
25
listener_callback_example(const struct zbus_channel * chan)26 static void listener_callback_example(const struct zbus_channel *chan)
27 {
28 const struct confirmed_msg *cm = zbus_chan_const_msg(chan);
29
30 LOG_INF("From listener -> Confirmed message payload = %u", cm->payload);
31 }
32
33 ZBUS_LISTENER_DEFINE(foo_lis, listener_callback_example);
34
35 ZBUS_SUBSCRIBER_DEFINE(bar_sub1, 4);
36
bar_sub1_task(void)37 static void bar_sub1_task(void)
38 {
39 const struct zbus_channel *chan;
40
41 while (!zbus_sub_wait(&bar_sub1, &chan, K_FOREVER)) {
42 struct confirmed_msg cm;
43
44 if (&confirmed_chan != chan) {
45 continue;
46 }
47
48 zbus_chan_read(&confirmed_chan, &cm, K_MSEC(500));
49
50 k_msleep(2500);
51
52 atomic_dec(zbus_chan_user_data(&confirmed_chan));
53
54 LOG_INF("From bar_sub1 subscriber -> Confirmed "
55 "message payload = "
56 "%u",
57 cm.payload);
58 }
59 }
60 K_THREAD_DEFINE(bar_sub1_task_id, CONFIG_MAIN_STACK_SIZE, bar_sub1_task, NULL, NULL, NULL, 3, 0, 0);
61 ZBUS_SUBSCRIBER_DEFINE(bar_sub2, 4);
bar_sub2_task(void)62 static void bar_sub2_task(void)
63 {
64 const struct zbus_channel *chan;
65
66 while (!zbus_sub_wait(&bar_sub2, &chan, K_FOREVER)) {
67 struct confirmed_msg cm;
68
69 if (&confirmed_chan != chan) {
70 continue;
71 }
72
73 zbus_chan_read(&confirmed_chan, &cm, K_MSEC(500));
74
75 k_msleep(1000);
76
77 atomic_dec(zbus_chan_user_data(&confirmed_chan));
78
79 LOG_INF("From bar_sub2 subscriber -> Confirmed "
80 "message payload = "
81 "%u",
82 cm.payload);
83 }
84 }
85 K_THREAD_DEFINE(bar_sub2_task_id, CONFIG_MAIN_STACK_SIZE, bar_sub2_task, NULL, NULL, NULL, 3, 0, 0);
86
87 ZBUS_SUBSCRIBER_DEFINE(bar_sub3, 4);
bar_sub3_task(void)88 static void bar_sub3_task(void)
89 {
90 const struct zbus_channel *chan;
91
92 while (!zbus_sub_wait(&bar_sub3, &chan, K_FOREVER)) {
93 struct confirmed_msg cm;
94
95 if (&confirmed_chan != chan) {
96 continue;
97 }
98
99 zbus_chan_read(&confirmed_chan, &cm, K_MSEC(500));
100
101 k_msleep(5000);
102
103 atomic_dec(zbus_chan_user_data(&confirmed_chan));
104
105 LOG_INF("From bar_sub3 subscriber -> Confirmed "
106 "message payload = "
107 "%u",
108 cm.payload);
109 }
110 }
111 K_THREAD_DEFINE(bar_sub3_task_id, CONFIG_MAIN_STACK_SIZE, bar_sub3_task, NULL, NULL, NULL, 3, 0, 0);
112
pub_to_confirmed_channel(struct confirmed_msg * cm)113 static void pub_to_confirmed_channel(struct confirmed_msg *cm)
114 {
115 /* Wait for channel be consumed */
116 while (atomic_get(zbus_chan_user_data(&confirmed_chan)) > 0) {
117 k_msleep(100);
118 }
119 /* Set the number of subscribers to consume the channel */
120 atomic_set(zbus_chan_user_data(&confirmed_chan), 3);
121
122 zbus_chan_pub(&confirmed_chan, cm, K_MSEC(500));
123 }
124
main(void)125 int main(void)
126 {
127 struct confirmed_msg cm = {0};
128
129 while (1) {
130 pub_to_confirmed_channel(&cm);
131
132 ++cm.payload;
133 }
134
135 return 0;
136 }
137