1 /*
2 * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 #include <zephyr/kernel.h>
7 #include <zephyr/logging/log.h>
8 #include <zephyr/sys/heap_listener.h>
9 #include <zephyr/zbus/zbus.h>
10 LOG_MODULE_REGISTER(sample, CONFIG_LOG_MAX_LEVEL);
11
12 extern struct sys_heap _system_heap;
13 static size_t total_allocated;
14
on_heap_alloc(uintptr_t heap_id,void * mem,size_t bytes)15 void on_heap_alloc(uintptr_t heap_id, void *mem, size_t bytes)
16 {
17 total_allocated += bytes;
18 LOG_INF(" AL Memory allocated %u bytes. Total allocated %u bytes", (unsigned int)bytes,
19 (unsigned int)total_allocated);
20 }
21
on_heap_free(uintptr_t heap_id,void * mem,size_t bytes)22 void on_heap_free(uintptr_t heap_id, void *mem, size_t bytes)
23 {
24 total_allocated -= bytes;
25 LOG_INF(" FR Memory freed %u bytes. Total allocated %u bytes", (unsigned int)bytes,
26 (unsigned int)total_allocated);
27 }
28
29 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC)
30
31 HEAP_LISTENER_ALLOC_DEFINE(my_heap_listener_alloc, HEAP_ID_FROM_POINTER(&_system_heap),
32 on_heap_alloc);
33
34 HEAP_LISTENER_FREE_DEFINE(my_heap_listener_free, HEAP_ID_FROM_POINTER(&_system_heap), on_heap_free);
35
36 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC */
37 struct acc_msg {
38 int x;
39 int y;
40 int z;
41 };
42
43 ZBUS_CHAN_DEFINE(acc_data_chan, /* Name */
44 struct acc_msg, /* Message type */
45
46 NULL, /* Validator */
47 NULL, /* User data */
48 ZBUS_OBSERVERS(bar_sub1, bar_msg_sub1, bar_msg_sub2, bar_msg_sub3, bar_msg_sub4,
49 bar_msg_sub5, bar_msg_sub6, bar_msg_sub7, bar_msg_sub8,
50 bar_msg_sub9, foo_lis), /* observers */
51 ZBUS_MSG_INIT(.x = 0, .y = 0, .z = 0) /* Initial value */
52 );
53
listener_callback_example(const struct zbus_channel * chan)54 static void listener_callback_example(const struct zbus_channel *chan)
55 {
56 const struct acc_msg *acc = zbus_chan_const_msg(chan);
57
58 LOG_INF("From listener foo_lis -> Acc x=%d, y=%d, z=%d", acc->x, acc->y, acc->z);
59 }
60
61 ZBUS_LISTENER_DEFINE(foo_lis, listener_callback_example);
62
63 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub1);
64 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub2);
65 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub3);
66 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub4);
67 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub5);
68 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub6);
69 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub7);
70 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub8);
71 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub9);
72 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub10);
73 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub11);
74 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub12);
75 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub13);
76 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub14);
77 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub15);
78 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub16);
79
80 ZBUS_SUBSCRIBER_DEFINE(bar_sub1, 4);
81 ZBUS_SUBSCRIBER_DEFINE(bar_sub2, 4);
82
msg_subscriber_task(void * sub)83 static void msg_subscriber_task(void *sub)
84 {
85 const struct zbus_channel *chan;
86
87 struct acc_msg acc;
88
89 const struct zbus_observer *subscriber = sub;
90
91 while (!zbus_sub_wait_msg(subscriber, &chan, &acc, K_FOREVER)) {
92 if (&acc_data_chan != chan) {
93 LOG_ERR("Wrong channel %p!", chan);
94
95 continue;
96 }
97 LOG_INF("From msg subscriber %s -> Acc x=%d, y=%d, z=%d", zbus_obs_name(subscriber),
98 acc.x, acc.y, acc.z);
99 }
100 }
101
102 K_THREAD_DEFINE(subscriber_task_id1, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub1,
103 NULL, NULL, 3, 0, 0);
104 K_THREAD_DEFINE(subscriber_task_id2, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub2,
105 NULL, NULL, 3, 0, 0);
106 K_THREAD_DEFINE(subscriber_task_id3, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub3,
107 NULL, NULL, 3, 0, 0);
108 K_THREAD_DEFINE(subscriber_task_id4, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub4,
109 NULL, NULL, 3, 0, 0);
110 K_THREAD_DEFINE(subscriber_task_id5, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub5,
111 NULL, NULL, 3, 0, 0);
112 K_THREAD_DEFINE(subscriber_task_id6, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub6,
113 NULL, NULL, 3, 0, 0);
114 K_THREAD_DEFINE(subscriber_task_id7, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub7,
115 NULL, NULL, 3, 0, 0);
116 K_THREAD_DEFINE(subscriber_task_id8, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub8,
117 NULL, NULL, 3, 0, 0);
118 K_THREAD_DEFINE(subscriber_task_id9, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub9,
119 NULL, NULL, 3, 0, 0);
120 K_THREAD_DEFINE(subscriber_task_id10, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub10,
121 NULL, NULL, 3, 0, 0);
122 K_THREAD_DEFINE(subscriber_task_id11, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub11,
123 NULL, NULL, 3, 0, 0);
124 K_THREAD_DEFINE(subscriber_task_id12, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub12,
125 NULL, NULL, 3, 0, 0);
126 K_THREAD_DEFINE(subscriber_task_id13, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub13,
127 NULL, NULL, 3, 0, 0);
128 K_THREAD_DEFINE(subscriber_task_id14, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub14,
129 NULL, NULL, 3, 0, 0);
130 K_THREAD_DEFINE(subscriber_task_id15, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub15,
131 NULL, NULL, 3, 0, 0);
132 K_THREAD_DEFINE(subscriber_task_id16, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub16,
133 NULL, NULL, 3, 0, 0);
134
subscriber_task(void * sub)135 static void subscriber_task(void *sub)
136 {
137 const struct zbus_channel *chan;
138
139 struct acc_msg acc;
140
141 const struct zbus_observer *subscriber = sub;
142
143 while (!zbus_sub_wait(subscriber, &chan, K_FOREVER)) {
144 if (&acc_data_chan != chan) {
145 LOG_ERR("Wrong channel %p!", chan);
146
147 continue;
148 }
149 zbus_chan_read(chan, &acc, K_MSEC(250));
150
151 LOG_INF("From subscriber %s -> Acc x=%d, y=%d, z=%d", zbus_obs_name(subscriber),
152 acc.x, acc.y, acc.z);
153 }
154 }
155
156 K_THREAD_DEFINE(subscriber_task_id17, CONFIG_MAIN_STACK_SIZE, subscriber_task, &bar_sub1, NULL,
157 NULL, 2, 0, 0);
158 K_THREAD_DEFINE(subscriber_task_id18, CONFIG_MAIN_STACK_SIZE, subscriber_task, &bar_sub2, NULL,
159 NULL, 4, 0, 0);
160
161 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_sub2, 3);
162 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub10, 3);
163 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub11, 3);
164 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub12, 3);
165 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub13, 3);
166 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub14, 3);
167 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub15, 3);
168 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub16, 3);
169
170 static struct acc_msg acc = {.x = 1, .y = 10, .z = 100};
171
172 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION)
173 #include <zephyr/net_buf.h>
174
175 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC)
176 NET_BUF_POOL_HEAP_DEFINE(isolated_pool, (CONFIG_ZBUS_MSG_SUBSCRIBER_SAMPLE_ISOLATED_BUF_POOL_SIZE),
177 (sizeof(struct zbus_channel *)), NULL);
178 #else
179 NET_BUF_POOL_FIXED_DEFINE(isolated_pool, (CONFIG_ZBUS_MSG_SUBSCRIBER_SAMPLE_ISOLATED_BUF_POOL_SIZE),
180 (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE),
181 sizeof(struct zbus_channel *), NULL);
182 #endif
183 #endif
184
main(void)185 int main(void)
186 {
187
188 total_allocated = 0;
189 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION)
190 zbus_chan_set_msg_sub_pool(&acc_data_chan, &isolated_pool);
191 #endif
192
193 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC)
194
195 heap_listener_register(&my_heap_listener_alloc);
196 heap_listener_register(&my_heap_listener_free);
197
198 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC */
199
200 while (1) {
201 LOG_INF("----> Publishing to %s channel", zbus_chan_name(&acc_data_chan));
202 zbus_chan_pub(&acc_data_chan, &acc, K_NO_WAIT);
203 acc.x += 1;
204 acc.y += 10;
205 acc.z += 100;
206 k_msleep(1000);
207 }
208
209 return 0;
210 }
211