1 /*
2  * Copyright (c) 2023 Rodrigo Peixoto <rodrigopex@gmail.com>
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 
6 #include <zephyr/kernel.h>
7 #include <zephyr/logging/log.h>
8 #include <zephyr/zbus/zbus.h>
9 LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
10 
11 static atomic_t sub_count = ATOMIC_INIT(0);
12 
13 struct confirmed_msg {
14 	uint32_t payload;
15 };
16 
17 ZBUS_CHAN_DEFINE(confirmed_chan,       /* Name */
18 		 struct confirmed_msg, /* Message type */
19 
20 		 NULL,                                                  /* Validator */
21 		 &sub_count,                                            /* User data */
22 		 ZBUS_OBSERVERS(foo_lis, bar_sub1, bar_sub2, bar_sub3), /* observers */
23 		 ZBUS_MSG_INIT(.payload = 0)                            /* Initial value */
24 );
25 
listener_callback_example(const struct zbus_channel * chan)26 static void listener_callback_example(const struct zbus_channel *chan)
27 {
28 	const struct confirmed_msg *cm = zbus_chan_const_msg(chan);
29 
30 	LOG_INF("From listener -> Confirmed message payload = %u", cm->payload);
31 }
32 
33 ZBUS_LISTENER_DEFINE(foo_lis, listener_callback_example);
34 
35 ZBUS_SUBSCRIBER_DEFINE(bar_sub1, 4);
36 
bar_sub1_task(void)37 static void bar_sub1_task(void)
38 {
39 	const struct zbus_channel *chan;
40 
41 	while (!zbus_sub_wait(&bar_sub1, &chan, K_FOREVER)) {
42 		struct confirmed_msg cm;
43 
44 		if (&confirmed_chan != chan) {
45 			continue;
46 		}
47 
48 		zbus_chan_read(&confirmed_chan, &cm, K_MSEC(500));
49 
50 		k_msleep(2500);
51 
52 		atomic_dec(zbus_chan_user_data(&confirmed_chan));
53 
54 		LOG_INF("From bar_sub1 subscriber -> Confirmed "
55 			"message payload = "
56 			"%u",
57 			cm.payload);
58 	}
59 }
60 K_THREAD_DEFINE(bar_sub1_task_id, CONFIG_MAIN_STACK_SIZE, bar_sub1_task, NULL, NULL, NULL, 3, 0, 0);
61 ZBUS_SUBSCRIBER_DEFINE(bar_sub2, 4);
bar_sub2_task(void)62 static void bar_sub2_task(void)
63 {
64 	const struct zbus_channel *chan;
65 
66 	while (!zbus_sub_wait(&bar_sub2, &chan, K_FOREVER)) {
67 		struct confirmed_msg cm;
68 
69 		if (&confirmed_chan != chan) {
70 			continue;
71 		}
72 
73 		zbus_chan_read(&confirmed_chan, &cm, K_MSEC(500));
74 
75 		k_msleep(1000);
76 
77 		atomic_dec(zbus_chan_user_data(&confirmed_chan));
78 
79 		LOG_INF("From bar_sub2 subscriber -> Confirmed "
80 			"message payload = "
81 			"%u",
82 			cm.payload);
83 	}
84 }
85 K_THREAD_DEFINE(bar_sub2_task_id, CONFIG_MAIN_STACK_SIZE, bar_sub2_task, NULL, NULL, NULL, 3, 0, 0);
86 
87 ZBUS_SUBSCRIBER_DEFINE(bar_sub3, 4);
bar_sub3_task(void)88 static void bar_sub3_task(void)
89 {
90 	const struct zbus_channel *chan;
91 
92 	while (!zbus_sub_wait(&bar_sub3, &chan, K_FOREVER)) {
93 		struct confirmed_msg cm;
94 
95 		if (&confirmed_chan != chan) {
96 			continue;
97 		}
98 
99 		zbus_chan_read(&confirmed_chan, &cm, K_MSEC(500));
100 
101 		k_msleep(5000);
102 
103 		atomic_dec(zbus_chan_user_data(&confirmed_chan));
104 
105 		LOG_INF("From bar_sub3 subscriber -> Confirmed "
106 			"message payload = "
107 			"%u",
108 			cm.payload);
109 	}
110 }
111 K_THREAD_DEFINE(bar_sub3_task_id, CONFIG_MAIN_STACK_SIZE, bar_sub3_task, NULL, NULL, NULL, 3, 0, 0);
112 
pub_to_confirmed_channel(struct confirmed_msg * cm)113 static void pub_to_confirmed_channel(struct confirmed_msg *cm)
114 {
115 	/* Wait for channel be consumed */
116 	while (atomic_get(zbus_chan_user_data(&confirmed_chan)) > 0) {
117 		k_msleep(100);
118 	}
119 	/* Set the number of subscribers to consume the channel */
120 	atomic_set(zbus_chan_user_data(&confirmed_chan), 3);
121 
122 	zbus_chan_pub(&confirmed_chan, cm, K_MSEC(500));
123 }
124 
main(void)125 int main(void)
126 {
127 	struct confirmed_msg cm = {0};
128 
129 	while (1) {
130 		pub_to_confirmed_channel(&cm);
131 
132 		++cm.payload;
133 	}
134 
135 	return 0;
136 }
137