1 /*
2  * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 
6 #include <zephyr/kernel.h>
7 #include <zephyr/logging/log.h>
8 #include <zephyr/sys/heap_listener.h>
9 #include <zephyr/zbus/zbus.h>
10 LOG_MODULE_REGISTER(sample, CONFIG_LOG_MAX_LEVEL);
11 
12 extern struct k_heap _system_heap;
13 static size_t total_allocated;
14 
on_heap_alloc(uintptr_t heap_id,void * mem,size_t bytes)15 void on_heap_alloc(uintptr_t heap_id, void *mem, size_t bytes)
16 {
17 	total_allocated += bytes;
18 	LOG_INF(" AL Memory allocated %u bytes. Total allocated %u bytes", (unsigned int)bytes,
19 		(unsigned int)total_allocated);
20 }
21 
on_heap_free(uintptr_t heap_id,void * mem,size_t bytes)22 void on_heap_free(uintptr_t heap_id, void *mem, size_t bytes)
23 {
24 	total_allocated -= bytes;
25 	LOG_INF(" FR Memory freed %u bytes. Total allocated %u bytes", (unsigned int)bytes,
26 		(unsigned int)total_allocated);
27 }
28 
29 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC)
30 
31 HEAP_LISTENER_ALLOC_DEFINE(my_heap_listener_alloc, HEAP_ID_FROM_POINTER(&_system_heap.heap),
32 			   on_heap_alloc);
33 
34 HEAP_LISTENER_FREE_DEFINE(my_heap_listener_free, HEAP_ID_FROM_POINTER(&_system_heap.heap),
35 			  on_heap_free);
36 
37 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC */
38 struct acc_msg {
39 	int x;
40 	int y;
41 	int z;
42 };
43 
44 ZBUS_CHAN_DEFINE(acc_data_chan,  /* Name */
45 		 struct acc_msg, /* Message type */
46 
47 		 NULL, /* Validator */
48 		 NULL, /* User data */
49 		 ZBUS_OBSERVERS(bar_sub1, bar_msg_sub1, bar_msg_sub2, bar_msg_sub3, bar_msg_sub4,
50 				bar_msg_sub5, bar_msg_sub6, bar_msg_sub7, bar_msg_sub8,
51 				bar_msg_sub9, foo_lis), /* observers */
52 		 ZBUS_MSG_INIT(.x = 0, .y = 0, .z = 0)  /* Initial value */
53 );
54 
listener_callback_example(const struct zbus_channel * chan)55 static void listener_callback_example(const struct zbus_channel *chan)
56 {
57 	const struct acc_msg *acc = zbus_chan_const_msg(chan);
58 
59 	LOG_INF("From listener foo_lis -> Acc x=%d, y=%d, z=%d", acc->x, acc->y, acc->z);
60 }
61 
62 ZBUS_LISTENER_DEFINE(foo_lis, listener_callback_example);
63 
64 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub1);
65 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub2);
66 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub3);
67 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub4);
68 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub5);
69 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub6);
70 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub7);
71 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub8);
72 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub9);
73 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub10);
74 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub11);
75 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub12);
76 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub13);
77 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub14);
78 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub15);
79 ZBUS_MSG_SUBSCRIBER_DEFINE(bar_msg_sub16);
80 
81 ZBUS_SUBSCRIBER_DEFINE(bar_sub1, 4);
82 ZBUS_SUBSCRIBER_DEFINE(bar_sub2, 4);
83 
msg_subscriber_task(void * sub)84 static void msg_subscriber_task(void *sub)
85 {
86 	const struct zbus_channel *chan;
87 
88 	struct acc_msg acc;
89 
90 	const struct zbus_observer *subscriber = sub;
91 
92 	while (!zbus_sub_wait_msg(subscriber, &chan, &acc, K_FOREVER)) {
93 		if (&acc_data_chan != chan) {
94 			LOG_ERR("Wrong channel %p!", chan);
95 
96 			continue;
97 		}
98 		LOG_INF("From msg subscriber %s -> Acc x=%d, y=%d, z=%d", zbus_obs_name(subscriber),
99 			acc.x, acc.y, acc.z);
100 	}
101 }
102 
103 K_THREAD_DEFINE(subscriber_task_id1, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub1,
104 		NULL, NULL, 3, 0, 0);
105 K_THREAD_DEFINE(subscriber_task_id2, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub2,
106 		NULL, NULL, 3, 0, 0);
107 K_THREAD_DEFINE(subscriber_task_id3, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub3,
108 		NULL, NULL, 3, 0, 0);
109 K_THREAD_DEFINE(subscriber_task_id4, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub4,
110 		NULL, NULL, 3, 0, 0);
111 K_THREAD_DEFINE(subscriber_task_id5, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub5,
112 		NULL, NULL, 3, 0, 0);
113 K_THREAD_DEFINE(subscriber_task_id6, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub6,
114 		NULL, NULL, 3, 0, 0);
115 K_THREAD_DEFINE(subscriber_task_id7, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub7,
116 		NULL, NULL, 3, 0, 0);
117 K_THREAD_DEFINE(subscriber_task_id8, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub8,
118 		NULL, NULL, 3, 0, 0);
119 K_THREAD_DEFINE(subscriber_task_id9, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub9,
120 		NULL, NULL, 3, 0, 0);
121 K_THREAD_DEFINE(subscriber_task_id10, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub10,
122 		NULL, NULL, 3, 0, 0);
123 K_THREAD_DEFINE(subscriber_task_id11, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub11,
124 		NULL, NULL, 3, 0, 0);
125 K_THREAD_DEFINE(subscriber_task_id12, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub12,
126 		NULL, NULL, 3, 0, 0);
127 K_THREAD_DEFINE(subscriber_task_id13, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub13,
128 		NULL, NULL, 3, 0, 0);
129 K_THREAD_DEFINE(subscriber_task_id14, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub14,
130 		NULL, NULL, 3, 0, 0);
131 K_THREAD_DEFINE(subscriber_task_id15, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub15,
132 		NULL, NULL, 3, 0, 0);
133 K_THREAD_DEFINE(subscriber_task_id16, CONFIG_MAIN_STACK_SIZE, msg_subscriber_task, &bar_msg_sub16,
134 		NULL, NULL, 3, 0, 0);
135 
subscriber_task(void * sub)136 static void subscriber_task(void *sub)
137 {
138 	const struct zbus_channel *chan;
139 
140 	struct acc_msg acc;
141 
142 	const struct zbus_observer *subscriber = sub;
143 
144 	while (!zbus_sub_wait(subscriber, &chan, K_FOREVER)) {
145 		if (&acc_data_chan != chan) {
146 			LOG_ERR("Wrong channel %p!", chan);
147 
148 			continue;
149 		}
150 		zbus_chan_read(chan, &acc, K_MSEC(250));
151 
152 		LOG_INF("From subscriber %s -> Acc x=%d, y=%d, z=%d", zbus_obs_name(subscriber),
153 			acc.x, acc.y, acc.z);
154 	}
155 }
156 
157 K_THREAD_DEFINE(subscriber_task_id17, CONFIG_MAIN_STACK_SIZE, subscriber_task, &bar_sub1, NULL,
158 		NULL, 2, 0, 0);
159 K_THREAD_DEFINE(subscriber_task_id18, CONFIG_MAIN_STACK_SIZE, subscriber_task, &bar_sub2, NULL,
160 		NULL, 4, 0, 0);
161 
162 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_sub2, 3);
163 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub10, 3);
164 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub11, 3);
165 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub12, 3);
166 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub13, 3);
167 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub14, 3);
168 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub15, 3);
169 ZBUS_CHAN_ADD_OBS(acc_data_chan, bar_msg_sub16, 3);
170 
171 static struct acc_msg acc = {.x = 1, .y = 10, .z = 100};
172 
173 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION)
174 #include <zephyr/net_buf.h>
175 
176 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC)
177 NET_BUF_POOL_HEAP_DEFINE(isolated_pool, (CONFIG_ZBUS_MSG_SUBSCRIBER_SAMPLE_ISOLATED_BUF_POOL_SIZE),
178 			 (sizeof(struct zbus_channel *)), NULL);
179 #else
180 NET_BUF_POOL_FIXED_DEFINE(isolated_pool, (CONFIG_ZBUS_MSG_SUBSCRIBER_SAMPLE_ISOLATED_BUF_POOL_SIZE),
181 			  (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE),
182 			  sizeof(struct zbus_channel *), NULL);
183 #endif
184 #endif
185 
main(void)186 int main(void)
187 {
188 
189 	total_allocated = 0;
190 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION)
191 	zbus_chan_set_msg_sub_pool(&acc_data_chan, &isolated_pool);
192 #endif
193 
194 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC)
195 
196 	heap_listener_register(&my_heap_listener_alloc);
197 	heap_listener_register(&my_heap_listener_free);
198 
199 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC */
200 
201 	while (1) {
202 		LOG_INF("----> Publishing to %s channel", zbus_chan_name(&acc_data_chan));
203 		zbus_chan_pub(&acc_data_chan, &acc, K_NO_WAIT);
204 		acc.x += 1;
205 		acc.y += 10;
206 		acc.z += 100;
207 		k_msleep(1000);
208 	}
209 
210 	return 0;
211 }
212