1 /*
2 * Copyright (c) 2023 Rodrigo Peixoto <rodrigopex@gmail.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 #include <zephyr/kernel.h>
7 #include <zephyr/logging/log.h>
8 #include <zephyr/zbus/zbus.h>
9 LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
10
11 ZBUS_CHAN_DEFINE(chan_a, int, NULL, NULL, ZBUS_OBSERVERS(l1, ms1, ms2, s1, l2), 0);
12
13 static void t1_thread(void *ptr1, void *ptr2, void *ptr3);
14 K_THREAD_DEFINE(t1_id, CONFIG_MAIN_STACK_SIZE, t1_thread, NULL, NULL, NULL, 5, 0, 0);
15
16 ZBUS_SUBSCRIBER_DEFINE(s1, 4);
s1_thread(void * ptr1,void * ptr2,void * ptr3)17 static void s1_thread(void *ptr1, void *ptr2, void *ptr3)
18 {
19 ARG_UNUSED(ptr1);
20 ARG_UNUSED(ptr2);
21 ARG_UNUSED(ptr3);
22
23 int err;
24 int a = 0;
25 const struct zbus_channel *chan;
26
27 IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, (zbus_obs_attach_to_thread(&s1);));
28
29 while (1) {
30 err = zbus_sub_wait(&s1, &chan, K_FOREVER);
31 if (err) {
32 return;
33 }
34
35 /* Faking some workload */
36 k_busy_wait(200000);
37
38 LOG_INF("N -> S1: T1 prio %d", k_thread_priority_get(t1_id));
39
40 err = zbus_chan_read(chan, &a, K_FOREVER);
41 if (err) {
42 return;
43 }
44 LOG_INF("%d -> S1: T1 prio %d", a, k_thread_priority_get(t1_id));
45 }
46 }
47 K_THREAD_DEFINE(s1_id, CONFIG_MAIN_STACK_SIZE, s1_thread, NULL, NULL, NULL, 2, 0, 0);
48
49 ZBUS_MSG_SUBSCRIBER_DEFINE(ms1);
ms1_thread(void * ptr1,void * ptr2,void * ptr3)50 static void ms1_thread(void *ptr1, void *ptr2, void *ptr3)
51 {
52 ARG_UNUSED(ptr1);
53 ARG_UNUSED(ptr2);
54 ARG_UNUSED(ptr3);
55
56 int err;
57 const struct zbus_channel *chan;
58 int a = 0;
59
60 IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, (zbus_obs_attach_to_thread(&ms1);));
61
62 while (1) {
63 err = zbus_sub_wait_msg(&ms1, &chan, &a, K_FOREVER);
64 if (err) {
65 return;
66 }
67
68 /* Faking some workload */
69 k_busy_wait(200000);
70
71 LOG_INF("%d -> MS1: T1 prio %d", a, k_thread_priority_get(t1_id));
72 }
73 }
74 K_THREAD_DEFINE(ms1_id, CONFIG_MAIN_STACK_SIZE, ms1_thread, NULL, NULL, NULL, 3, 0, 0);
75
76 ZBUS_MSG_SUBSCRIBER_DEFINE(ms2);
ms2_thread(void * ptr1,void * ptr2,void * ptr3)77 static void ms2_thread(void *ptr1, void *ptr2, void *ptr3)
78 {
79 ARG_UNUSED(ptr1);
80 ARG_UNUSED(ptr2);
81 ARG_UNUSED(ptr3);
82
83 int err;
84 const struct zbus_channel *chan;
85 int a = 0;
86
87 IF_ENABLED(CONFIG_ZBUS_PRIORITY_BOOST, (zbus_obs_attach_to_thread(&ms2);));
88
89 while (1) {
90 err = zbus_sub_wait_msg(&ms2, &chan, &a, K_FOREVER);
91 if (err) {
92 return;
93 }
94
95 /* Faking some workload */
96 k_busy_wait(200 * USEC_PER_MSEC);
97
98 LOG_INF("%d -> MS2: T1 prio %d", a, k_thread_priority_get(t1_id));
99 }
100 }
101 K_THREAD_DEFINE(ms2_id, CONFIG_MAIN_STACK_SIZE, ms2_thread, NULL, NULL, NULL, 4, 0, 0);
102
l1_callback(const struct zbus_channel * chan)103 static void l1_callback(const struct zbus_channel *chan)
104 {
105 LOG_INF("%d ---> L1: T1 prio %d", *((int *)zbus_chan_const_msg(chan)),
106 k_thread_priority_get(t1_id));
107 }
108 ZBUS_LISTENER_DEFINE(l1, l1_callback);
109
l2_callback(const struct zbus_channel * chan)110 static void l2_callback(const struct zbus_channel *chan)
111 {
112 LOG_INF("%d ---> L2: T1 prio %d", *((int *)zbus_chan_const_msg(chan)),
113 k_thread_priority_get(t1_id));
114 }
115 ZBUS_LISTENER_DEFINE(l2, l2_callback);
116
t1_thread(void * ptr1,void * ptr2,void * ptr3)117 static void t1_thread(void *ptr1, void *ptr2, void *ptr3)
118 {
119 ARG_UNUSED(ptr1);
120 ARG_UNUSED(ptr2);
121 ARG_UNUSED(ptr3);
122
123 int err;
124 int a = 0;
125
126 while (1) {
127 LOG_INF("--------------");
128
129 if (a == 2) {
130 zbus_obs_set_enable(&s1, false);
131 } else if (a == 4) {
132 zbus_obs_set_enable(&ms1, false);
133 } else if (a == 6) {
134 zbus_obs_set_enable(&s1, true);
135 zbus_obs_set_enable(&ms1, true);
136 }
137
138 LOG_INF("%d -> T1: prio before %d", a, k_thread_priority_get(k_current_get()));
139 err = zbus_chan_pub(&chan_a, &a, K_FOREVER);
140 if (err) {
141 return;
142 }
143 LOG_INF("%d -> T1: prio after %d", a, k_thread_priority_get(k_current_get()));
144 ++a;
145
146 k_msleep(2000);
147 }
148 }
149