1 /*
2  * Copyright (c) 2023 Rodrigo Peixoto <rodrigopex@gmail.com>
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 
6 #include <zephyr/kernel.h>
7 #include <zephyr/logging/log.h>
8 #include <zephyr/zbus/zbus.h>
9 LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
10 
11 ZBUS_CHAN_DEFINE(chan_a, int, NULL, NULL, ZBUS_OBSERVERS(l1, ms1, ms2, s1, l2), 0);
12 
13 static void t1_thread(void *ptr1, void *ptr2, void *ptr3);
14 K_THREAD_DEFINE(t1_id, CONFIG_MAIN_STACK_SIZE, t1_thread, NULL, NULL, NULL, 5, 0, 0);
15 
16 ZBUS_SUBSCRIBER_DEFINE(s1, 4);
s1_thread(void * ptr1,void * ptr2,void * ptr3)17 static void s1_thread(void *ptr1, void *ptr2, void *ptr3)
18 {
19 	ARG_UNUSED(ptr1);
20 	ARG_UNUSED(ptr2);
21 	ARG_UNUSED(ptr3);
22 
23 	int err;
24 	int a = 0;
25 	const struct zbus_channel *chan;
26 
27 	IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, (zbus_obs_attach_to_thread(&s1);));
28 
29 	while (1) {
30 		err = zbus_sub_wait(&s1, &chan, K_FOREVER);
31 		if (err) {
32 			return;
33 		}
34 
35 		/* Faking some workload */
36 		k_busy_wait(200000);
37 
38 		LOG_INF("N -> S1:  T1 prio %d", k_thread_priority_get(t1_id));
39 
40 		err = zbus_chan_read(chan, &a, K_FOREVER);
41 		if (err) {
42 			return;
43 		}
44 		LOG_INF("%d -> S1:  T1 prio %d", a, k_thread_priority_get(t1_id));
45 	}
46 }
47 K_THREAD_DEFINE(s1_id, CONFIG_MAIN_STACK_SIZE, s1_thread, NULL, NULL, NULL, 2, 0, 0);
48 
49 ZBUS_MSG_SUBSCRIBER_DEFINE(ms1);
ms1_thread(void * ptr1,void * ptr2,void * ptr3)50 static void ms1_thread(void *ptr1, void *ptr2, void *ptr3)
51 {
52 	ARG_UNUSED(ptr1);
53 	ARG_UNUSED(ptr2);
54 	ARG_UNUSED(ptr3);
55 
56 	int err;
57 	const struct zbus_channel *chan;
58 	int a = 0;
59 
60 	IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, (zbus_obs_attach_to_thread(&ms1);));
61 
62 	while (1) {
63 		err = zbus_sub_wait_msg(&ms1, &chan, &a, K_FOREVER);
64 		if (err) {
65 			return;
66 		}
67 
68 		/* Faking some workload */
69 		k_busy_wait(200000);
70 
71 		LOG_INF("%d -> MS1:  T1 prio %d", a, k_thread_priority_get(t1_id));
72 	}
73 }
74 K_THREAD_DEFINE(ms1_id, CONFIG_MAIN_STACK_SIZE, ms1_thread, NULL, NULL, NULL, 3, 0, 0);
75 
76 ZBUS_MSG_SUBSCRIBER_DEFINE(ms2);
ms2_thread(void * ptr1,void * ptr2,void * ptr3)77 static void ms2_thread(void *ptr1, void *ptr2, void *ptr3)
78 {
79 	ARG_UNUSED(ptr1);
80 	ARG_UNUSED(ptr2);
81 	ARG_UNUSED(ptr3);
82 
83 	int err;
84 	const struct zbus_channel *chan;
85 	int a = 0;
86 
87 	IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, (zbus_obs_attach_to_thread(&ms2);));
88 
89 	while (1) {
90 		err = zbus_sub_wait_msg(&ms2, &chan, &a, K_FOREVER);
91 		if (err) {
92 			return;
93 		}
94 
95 		/* Faking some workload */
96 		k_busy_wait(200 * USEC_PER_MSEC);
97 
98 		LOG_INF("%d -> MS2:  T1 prio %d", a, k_thread_priority_get(t1_id));
99 	}
100 }
101 K_THREAD_DEFINE(ms2_id, CONFIG_MAIN_STACK_SIZE, ms2_thread, NULL, NULL, NULL, 4, 0, 0);
102 
l1_callback(const struct zbus_channel * chan)103 static void l1_callback(const struct zbus_channel *chan)
104 {
105 	LOG_INF("%d ---> L1: T1 prio %d", *((int *)zbus_chan_const_msg(chan)),
106 		k_thread_priority_get(t1_id));
107 }
108 ZBUS_LISTENER_DEFINE(l1, l1_callback);
109 
l2_callback(const struct zbus_channel * chan)110 static void l2_callback(const struct zbus_channel *chan)
111 {
112 	LOG_INF("%d ---> L2: T1 prio %d", *((int *)zbus_chan_const_msg(chan)),
113 		k_thread_priority_get(t1_id));
114 }
115 ZBUS_LISTENER_DEFINE(l2, l2_callback);
116 
t1_thread(void * ptr1,void * ptr2,void * ptr3)117 static void t1_thread(void *ptr1, void *ptr2, void *ptr3)
118 {
119 	ARG_UNUSED(ptr1);
120 	ARG_UNUSED(ptr2);
121 	ARG_UNUSED(ptr3);
122 
123 	int err;
124 	int a = 0;
125 
126 	while (1) {
127 		LOG_INF("--------------");
128 
129 		if (a == 2) {
130 			zbus_obs_set_enable(&s1, false);
131 		} else if (a == 4) {
132 			zbus_obs_set_enable(&ms1, false);
133 		} else if (a == 6) {
134 			zbus_obs_set_enable(&s1, true);
135 			zbus_obs_set_enable(&ms1, true);
136 		}
137 
138 		LOG_INF("%d -> T1: prio before %d", a, k_thread_priority_get(k_current_get()));
139 		err = zbus_chan_pub(&chan_a, &a, K_FOREVER);
140 		if (err) {
141 			return;
142 		}
143 		LOG_INF("%d -> T1: prio after %d", a, k_thread_priority_get(k_current_get()));
144 		++a;
145 
146 		k_msleep(2000);
147 	}
148 }
149