1 /*
2  * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 
6 #include <zephyr/kernel.h>
7 #include <zephyr/logging/log.h>
8 #include <zephyr/sys/printk.h>
9 #include <zephyr/zbus/zbus.h>
10 LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL);
11 
_zbus_timeout_remainder(uint64_t end_ticks)12 k_timeout_t _zbus_timeout_remainder(uint64_t end_ticks)
13 {
14 	int64_t now_ticks = sys_clock_tick_get();
15 
16 	return K_TICKS((k_ticks_t)MAX(end_ticks - now_ticks, 0));
17 }
18 
19 #if (CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0)
_zbus_notify_runtime_listeners(const struct zbus_channel * chan)20 static inline void _zbus_notify_runtime_listeners(const struct zbus_channel *chan)
21 {
22 	__ASSERT(chan != NULL, "chan is required");
23 
24 	struct zbus_observer_node *obs_nd, *tmp;
25 
26 	SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
27 
28 		__ASSERT(obs_nd != NULL, "observer node is NULL");
29 
30 		if (obs_nd->obs->enabled && (obs_nd->obs->callback != NULL)) {
31 			obs_nd->obs->callback(chan);
32 		}
33 	}
34 }
35 
_zbus_notify_runtime_subscribers(const struct zbus_channel * chan,uint64_t end_ticks)36 static inline int _zbus_notify_runtime_subscribers(const struct zbus_channel *chan,
37 						   uint64_t end_ticks)
38 {
39 	__ASSERT(chan != NULL, "chan is required");
40 
41 	int last_error = 0, err;
42 	struct zbus_observer_node *obs_nd, *tmp;
43 
44 	SYS_SLIST_FOR_EACH_CONTAINER_SAFE(chan->runtime_observers, obs_nd, tmp, node) {
45 
46 		__ASSERT(obs_nd != NULL, "observer node is NULL");
47 
48 		if (obs_nd->obs->enabled && (obs_nd->obs->queue != NULL)) {
49 			err = k_msgq_put(obs_nd->obs->queue, &chan,
50 					 _zbus_timeout_remainder(end_ticks));
51 
52 			_ZBUS_ASSERT(err == 0,
53 				     "could not deliver notification to observer %s. Error code %d",
54 				     _ZBUS_OBS_NAME(obs_nd->obs), err);
55 
56 			if (err) {
57 				last_error = err;
58 			}
59 		}
60 	}
61 
62 	return last_error;
63 }
64 #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
65 
_zbus_notify_observers(const struct zbus_channel * chan,uint64_t end_ticks)66 static int _zbus_notify_observers(const struct zbus_channel *chan, uint64_t end_ticks)
67 {
68 	int last_error = 0, err;
69 	/* Notify static listeners */
70 	for (const struct zbus_observer *const *obs = chan->observers; *obs != NULL; ++obs) {
71 		if ((*obs)->enabled && ((*obs)->callback != NULL)) {
72 			(*obs)->callback(chan);
73 		}
74 	}
75 
76 #if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0
77 	_zbus_notify_runtime_listeners(chan);
78 #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
79 
80 	/* Notify static subscribers */
81 	for (const struct zbus_observer *const *obs = chan->observers; *obs != NULL; ++obs) {
82 		if ((*obs)->enabled && ((*obs)->queue != NULL)) {
83 			err = k_msgq_put((*obs)->queue, &chan, _zbus_timeout_remainder(end_ticks));
84 			_ZBUS_ASSERT(err == 0, "could not deliver notification to observer %s.",
85 				     _ZBUS_OBS_NAME(*obs));
86 			if (err) {
87 				LOG_ERR("Observer %s at %p could not be notified. Error code %d",
88 					_ZBUS_OBS_NAME(*obs), *obs, err);
89 				last_error = err;
90 			}
91 		}
92 	}
93 
94 #if CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE > 0
95 	err = _zbus_notify_runtime_subscribers(chan, end_ticks);
96 	if (err) {
97 		last_error = err;
98 	}
99 #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS_POOL_SIZE */
100 	return last_error;
101 }
102 
zbus_chan_pub(const struct zbus_channel * chan,const void * msg,k_timeout_t timeout)103 int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
104 {
105 	int err;
106 	uint64_t end_ticks = sys_clock_timeout_end_calc(timeout);
107 
108 	_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
109 	_ZBUS_ASSERT(chan != NULL, "chan is required");
110 	_ZBUS_ASSERT(msg != NULL, "msg is required");
111 
112 	if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) {
113 		return -ENOMSG;
114 	}
115 
116 	err = k_mutex_lock(chan->mutex, timeout);
117 	if (err) {
118 		return err;
119 	}
120 
121 	memcpy(chan->message, msg, chan->message_size);
122 
123 	err = _zbus_notify_observers(chan, end_ticks);
124 
125 	k_mutex_unlock(chan->mutex);
126 
127 	return err;
128 }
129 
zbus_chan_read(const struct zbus_channel * chan,void * msg,k_timeout_t timeout)130 int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout)
131 {
132 	int err;
133 
134 	_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
135 	_ZBUS_ASSERT(chan != NULL, "chan is required");
136 	_ZBUS_ASSERT(msg != NULL, "msg is required");
137 
138 	err = k_mutex_lock(chan->mutex, timeout);
139 	if (err) {
140 		return err;
141 	}
142 
143 	memcpy(msg, chan->message, chan->message_size);
144 
145 	return k_mutex_unlock(chan->mutex);
146 }
147 
zbus_chan_notify(const struct zbus_channel * chan,k_timeout_t timeout)148 int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout)
149 {
150 	int err;
151 	uint64_t end_ticks = sys_clock_timeout_end_calc(timeout);
152 
153 	_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
154 	_ZBUS_ASSERT(chan != NULL, "chan is required");
155 
156 	err = k_mutex_lock(chan->mutex, timeout);
157 	if (err) {
158 		return err;
159 	}
160 
161 	err = _zbus_notify_observers(chan, end_ticks);
162 
163 	k_mutex_unlock(chan->mutex);
164 
165 	return err;
166 }
167 
zbus_chan_claim(const struct zbus_channel * chan,k_timeout_t timeout)168 int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout)
169 {
170 	_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
171 	_ZBUS_ASSERT(chan != NULL, "chan is required");
172 
173 	int err = k_mutex_lock(chan->mutex, timeout);
174 
175 	if (err) {
176 		return err;
177 	}
178 
179 	return 0;
180 }
181 
zbus_chan_finish(const struct zbus_channel * chan)182 int zbus_chan_finish(const struct zbus_channel *chan)
183 {
184 	_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
185 	_ZBUS_ASSERT(chan != NULL, "chan is required");
186 
187 	int err = k_mutex_unlock(chan->mutex);
188 
189 	return err;
190 }
191 
zbus_sub_wait(const struct zbus_observer * sub,const struct zbus_channel ** chan,k_timeout_t timeout)192 int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan,
193 		  k_timeout_t timeout)
194 {
195 	_ZBUS_ASSERT(!k_is_in_isr(), "zbus cannot be used inside ISRs");
196 	_ZBUS_ASSERT(sub != NULL, "sub is required");
197 	_ZBUS_ASSERT(chan != NULL, "chan is required");
198 
199 	if (sub->queue == NULL) {
200 		return -EINVAL;
201 	}
202 
203 	return k_msgq_get(sub->queue, chan, timeout);
204 }
205