1 /*
2 * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 #include <zephyr/kernel.h>
7 #include <zephyr/logging/log.h>
8 #include <zephyr/sys/printk.h>
9 #include <zephyr/zbus/zbus.h>
10 LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL);
11
_zbus_timeout_remainder(uint64_t end_ticks)12 k_timeout_t _zbus_timeout_remainder(uint64_t end_ticks)
13 {
14 int64_t now_ticks = sys_clock_tick_get();
15
16 return K_TICKS((k_ticks_t)MAX(end_ticks - now_ticks, 0));
17 }
18
19 #if (CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0)
_zbus_notify_runtime_listeners(const struct zbus_channel * chan)20 static inline void _zbus_notify_runtime_listeners(const struct zbus_channel *chan)
21 {
22 __ASSERT(chan != NULL, "chan is required");
23
24 struct zbus_observer_node *obs_nd, *tmp;
25
26 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
27
28 __ASSERT(obs_nd != NULL, "observer node is NULL");
29
30 if (obs_nd->obs->enabled && (obs_nd->obs->callback != NULL)) {
31 obs_nd->obs->callback(chan);
32 }
33 }
34 }
35
_zbus_notify_runtime_subscribers(const struct zbus_channel * chan,uint64_t end_ticks)36 static inline int _zbus_notify_runtime_subscribers(const struct zbus_channel *chan,
37 uint64_t end_ticks)
38 {
39 __ASSERT(chan != NULL, "chan is required");
40
41 int last_error = 0, err;
42 struct zbus_observer_node *obs_nd, *tmp;
43
44 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
45
46 __ASSERT(obs_nd != NULL, "observer node is NULL");
47
48 if (obs_nd->obs->enabled && (obs_nd->obs->queue != NULL)) {
49 err = k_msgq_put(obs_nd->obs->queue, &chan,
50 _zbus_timeout_remainder(end_ticks));
51
52 _ZBUS_ASSERT(err == 0,
53 "could not deliver notification to observer %s. Error code %d",
54 _ZBUS_OBS_NAME(obs_nd->obs), err);
55
56 if (err) {
57 last_error = err;
58 }
59 }
60 }
61
62 return last_error;
63 }
64 #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
65
_zbus_notify_observers(const struct zbus_channel * chan,uint64_t end_ticks)66 static int _zbus_notify_observers(const struct zbus_channel *chan, uint64_t end_ticks)
67 {
68 int last_error = 0, err;
69 /* Notify static listeners */
70 for (const struct zbus_observer *const *obs = chan->observers; *obs != NULL; ++obs) {
71 if ((*obs)->enabled && ((*obs)->callback != NULL)) {
72 (*obs)->callback(chan);
73 }
74 }
75
76 #if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0
77 _zbus_notify_runtime_listeners(chan);
78 #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
79
80 /* Notify static subscribers */
81 for (const struct zbus_observer *const *obs = chan->observers; *obs != NULL; ++obs) {
82 if ((*obs)->enabled && ((*obs)->queue != NULL)) {
83 err = k_msgq_put((*obs)->queue, &chan, _zbus_timeout_remainder(end_ticks));
84 _ZBUS_ASSERT(err == 0, "could not deliver notification to observer %s.",
85 _ZBUS_OBS_NAME(*obs));
86 if (err) {
87 LOG_ERR("Observer %s at %p could not be notified. Error code %d",
88 _ZBUS_OBS_NAME(*obs), *obs, err);
89 last_error = err;
90 }
91 }
92 }
93
94 #if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0
95 err = _zbus_notify_runtime_subscribers(chan, end_ticks);
96 if (err) {
97 last_error = err;
98 }
99 #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
100 return last_error;
101 }
102
zbus_chan_pub(const struct zbus_channel * chan,const void * msg,k_timeout_t timeout)103 int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
104 {
105 int err;
106 uint64_t end_ticks = sys_clock_timeout_end_calc(timeout);
107
108 _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
109 _ZBUS_ASSERT(chan != NULL, "chan is required");
110 _ZBUS_ASSERT(msg != NULL, "msg is required");
111
112 if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) {
113 return -ENOMSG;
114 }
115
116 err = k_mutex_lock(chan->mutex, timeout);
117 if (err) {
118 return err;
119 }
120
121 memcpy(chan->message, msg, chan->message_size);
122
123 err = _zbus_notify_observers(chan, end_ticks);
124
125 k_mutex_unlock(chan->mutex);
126
127 return err;
128 }
129
zbus_chan_read(const struct zbus_channel * chan,void * msg,k_timeout_t timeout)130 int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout)
131 {
132 int err;
133
134 _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
135 _ZBUS_ASSERT(chan != NULL, "chan is required");
136 _ZBUS_ASSERT(msg != NULL, "msg is required");
137
138 err = k_mutex_lock(chan->mutex, timeout);
139 if (err) {
140 return err;
141 }
142
143 memcpy(msg, chan->message, chan->message_size);
144
145 return k_mutex_unlock(chan->mutex);
146 }
147
zbus_chan_notify(const struct zbus_channel * chan,k_timeout_t timeout)148 int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout)
149 {
150 int err;
151 uint64_t end_ticks = sys_clock_timeout_end_calc(timeout);
152
153 _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
154 _ZBUS_ASSERT(chan != NULL, "chan is required");
155
156 err = k_mutex_lock(chan->mutex, timeout);
157 if (err) {
158 return err;
159 }
160
161 err = _zbus_notify_observers(chan, end_ticks);
162
163 k_mutex_unlock(chan->mutex);
164
165 return err;
166 }
167
zbus_chan_claim(const struct zbus_channel * chan,k_timeout_t timeout)168 int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout)
169 {
170 _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
171 _ZBUS_ASSERT(chan != NULL, "chan is required");
172
173 int err = k_mutex_lock(chan->mutex, timeout);
174
175 if (err) {
176 return err;
177 }
178
179 return 0;
180 }
181
zbus_chan_finish(const struct zbus_channel * chan)182 int zbus_chan_finish(const struct zbus_channel *chan)
183 {
184 _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
185 _ZBUS_ASSERT(chan != NULL, "chan is required");
186
187 int err = k_mutex_unlock(chan->mutex);
188
189 return err;
190 }
191
zbus_sub_wait(const struct zbus_observer * sub,const struct zbus_channel ** chan,k_timeout_t timeout)192 int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan,
193 k_timeout_t timeout)
194 {
195 _ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
196 _ZBUS_ASSERT(sub != NULL, "sub is required");
197 _ZBUS_ASSERT(chan != NULL, "chan is required");
198
199 if (sub->queue == NULL) {
200 return -EINVAL;
201 }
202
203 return k_msgq_get(sub->queue, chan, timeout);
204 }
205