1 /*
2 * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 #include <zephyr/kernel.h>
7 #include <zephyr/init.h>
8 #include <zephyr/sys/iterable_sections.h>
9 #include <zephyr/logging/log.h>
10 #include <zephyr/sys/printk.h>
11 #include <zephyr/net_buf.h>
12 #include <zephyr/zbus/zbus.h>
13 LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL);
14
15 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
16 /* Available only when the priority boost is enabled */
17 static struct k_spinlock _zbus_chan_slock;
18 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
19
20 static struct k_spinlock obs_slock;
21
22 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
23
24 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC)
25
26 NET_BUF_POOL_HEAP_DEFINE(_zbus_msg_subscribers_pool, CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE,
27 sizeof(struct zbus_channel *), NULL);
28
_zbus_create_net_buf(struct net_buf_pool * pool,size_t size,k_timeout_t timeout)29 static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size,
30 k_timeout_t timeout)
31 {
32 return net_buf_alloc_len(pool, size, timeout);
33 }
34
35 #else
36
37 NET_BUF_POOL_FIXED_DEFINE(_zbus_msg_subscribers_pool,
38 (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE),
39 (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE),
40 sizeof(struct zbus_channel *), NULL);
41
_zbus_create_net_buf(struct net_buf_pool * pool,size_t size,k_timeout_t timeout)42 static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size,
43 k_timeout_t timeout)
44 {
45 __ASSERT(size <= CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE,
46 "CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE must be greater or equal to "
47 "%d",
48 (int)size);
49 return net_buf_alloc(pool, timeout);
50 }
51 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC */
52
53 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
54
_zbus_init(void)55 int _zbus_init(void)
56 {
57
58 const struct zbus_channel *curr = NULL;
59 const struct zbus_channel *prev = NULL;
60
61 STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) {
62 curr = observation->chan;
63
64 if (prev != curr) {
65 if (prev == NULL) {
66 curr->data->observers_start_idx = 0;
67 curr->data->observers_end_idx = 0;
68 } else {
69 curr->data->observers_start_idx = prev->data->observers_end_idx;
70 curr->data->observers_end_idx = prev->data->observers_end_idx;
71 }
72 prev = curr;
73 }
74
75 ++(curr->data->observers_end_idx);
76 }
77
78 #if defined(CONFIG_ZBUS_CHANNEL_ID)
79 STRUCT_SECTION_FOREACH(zbus_channel, chan) {
80 /* Check for duplicate channel IDs */
81 if (chan->id == ZBUS_CHAN_ID_INVALID) {
82 continue;
83 }
84 /* Iterate over all previous channels */
85 STRUCT_SECTION_FOREACH(zbus_channel, chan_prev) {
86 if (chan_prev == chan) {
87 break;
88 }
89 if (chan->id == chan_prev->id) {
90 #if defined(CONFIG_ZBUS_CHANNEL_NAME)
91 LOG_WRN("Channels %s and %s have matching IDs (%d)", chan->name,
92 chan_prev->name, chan->id);
93 #else
94 LOG_WRN("Channels %p and %p have matching IDs (%d)", chan,
95 chan_prev, chan->id);
96 #endif /* CONFIG_ZBUS_CHANNEL_NAME */
97 }
98 }
99 }
100 #endif /* CONFIG_ZBUS_CHANNEL_ID */
101
102 return 0;
103 }
104 SYS_INIT(_zbus_init, APPLICATION, CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY);
105
106 #if defined(CONFIG_ZBUS_CHANNEL_ID)
107
zbus_chan_from_id(uint32_t channel_id)108 const struct zbus_channel *zbus_chan_from_id(uint32_t channel_id)
109 {
110 if (channel_id == ZBUS_CHAN_ID_INVALID) {
111 return NULL;
112 }
113 STRUCT_SECTION_FOREACH(zbus_channel, chan) {
114 if (chan->id == channel_id) {
115 /* Found matching channel */
116 return chan;
117 }
118 }
119 /* No matching channel exists */
120 return NULL;
121 }
122
123 #endif /* CONFIG_ZBUS_CHANNEL_ID */
124
_zbus_notify_observer(const struct zbus_channel * chan,const struct zbus_observer * obs,k_timepoint_t end_time,struct net_buf * buf)125 static inline int _zbus_notify_observer(const struct zbus_channel *chan,
126 const struct zbus_observer *obs, k_timepoint_t end_time,
127 struct net_buf *buf)
128 {
129 switch (obs->type) {
130 case ZBUS_OBSERVER_LISTENER_TYPE: {
131 obs->callback(chan);
132 break;
133 }
134 case ZBUS_OBSERVER_SUBSCRIBER_TYPE: {
135 return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time));
136 }
137 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
138 case ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE: {
139 struct net_buf *cloned_buf = net_buf_clone(buf, sys_timepoint_timeout(end_time));
140
141 if (cloned_buf == NULL) {
142 return -ENOMEM;
143 }
144
145 k_fifo_put(obs->message_fifo, cloned_buf);
146
147 break;
148 }
149 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
150
151 default:
152 _ZBUS_ASSERT(false, "Unreachable");
153 }
154 return 0;
155 }
156
_zbus_vded_exec(const struct zbus_channel * chan,k_timepoint_t end_time)157 static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time)
158 {
159 int err = 0;
160 int last_error = 0;
161 struct net_buf *buf = NULL;
162
163 /* Static observer event dispatcher logic */
164 struct zbus_channel_observation *observation;
165 struct zbus_channel_observation_mask *observation_mask;
166
167 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
168 struct net_buf_pool *pool =
169 COND_CODE_1(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION,
170 (chan->data->msg_subscriber_pool), (&_zbus_msg_subscribers_pool));
171
172 buf = _zbus_create_net_buf(pool, zbus_chan_msg_size(chan), sys_timepoint_timeout(end_time));
173
174 _ZBUS_ASSERT(buf != NULL, "net_buf zbus_msg_subscribers_pool is "
175 "unavailable or heap is full");
176
177 memcpy(net_buf_user_data(buf), &chan, sizeof(struct zbus_channel *));
178
179 net_buf_add_mem(buf, zbus_chan_msg(chan), zbus_chan_msg_size(chan));
180 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
181
182 LOG_DBG("Notifing %s's observers. Starting VDED:", _ZBUS_CHAN_NAME(chan));
183
184 int __maybe_unused index = 0;
185
186 for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx;
187 i < limit; ++i) {
188 STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
189 STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
190
191 _ZBUS_ASSERT(observation != NULL, "observation must be not NULL");
192
193 const struct zbus_observer *obs = observation->obs;
194
195 if (!obs->data->enabled || observation_mask->enabled) {
196 continue;
197 }
198
199 err = _zbus_notify_observer(chan, obs, end_time, buf);
200
201 if (err) {
202 last_error = err;
203 LOG_ERR("could not deliver notification to observer %s. Error code %d",
204 _ZBUS_OBS_NAME(obs), err);
205 if (err == -ENOMEM) {
206 if (IS_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER)) {
207 net_buf_unref(buf);
208 }
209 return err;
210 }
211 }
212
213 LOG_DBG(" %d -> %s", index++, _ZBUS_OBS_NAME(obs));
214 }
215
216 #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS)
217 /* Dynamic observer event dispatcher logic */
218 struct zbus_observer_node *obs_nd, *tmp;
219
220 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
221 const struct zbus_observer *obs = obs_nd->obs;
222
223 if (!obs->data->enabled) {
224 continue;
225 }
226
227 err = _zbus_notify_observer(chan, obs, end_time, buf);
228
229 if (err) {
230 last_error = err;
231 }
232 }
233 #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */
234
235 IF_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER, (net_buf_unref(buf);))
236
237 return last_error;
238 }
239
240 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
241
chan_update_hop(const struct zbus_channel * chan)242 static inline void chan_update_hop(const struct zbus_channel *chan)
243 {
244 struct zbus_channel_observation *observation;
245 struct zbus_channel_observation_mask *observation_mask;
246
247 int chan_highest_observer_priority = ZBUS_MIN_THREAD_PRIORITY;
248
249 K_SPINLOCK(&_zbus_chan_slock) {
250 const int limit = chan->data->observers_end_idx;
251
252 for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) {
253 STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
254 STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
255
256 __ASSERT(observation != NULL, "observation must be not NULL");
257
258 const struct zbus_observer *obs = observation->obs;
259
260 if (!obs->data->enabled || observation_mask->enabled) {
261 continue;
262 }
263
264 if (chan_highest_observer_priority > obs->data->priority) {
265 chan_highest_observer_priority = obs->data->priority;
266 }
267 }
268 chan->data->highest_observer_priority = chan_highest_observer_priority;
269 }
270 }
271
update_all_channels_hop(const struct zbus_observer * obs)272 static inline void update_all_channels_hop(const struct zbus_observer *obs)
273 {
274 STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) {
275 if (obs != observation->obs) {
276 continue;
277 }
278
279 chan_update_hop(observation->chan);
280 }
281 }
282
zbus_obs_attach_to_thread(const struct zbus_observer * obs)283 int zbus_obs_attach_to_thread(const struct zbus_observer *obs)
284 {
285 _ZBUS_ASSERT(!k_is_in_isr(), "cannot attach to an ISR");
286 _ZBUS_ASSERT(obs != NULL, "obs is required");
287
288 int current_thread_priority = k_thread_priority_get(k_current_get());
289
290 K_SPINLOCK(&obs_slock) {
291 if (obs->data->priority != current_thread_priority) {
292 obs->data->priority = current_thread_priority;
293
294 update_all_channels_hop(obs);
295 }
296 }
297
298 return 0;
299 }
300
zbus_obs_detach_from_thread(const struct zbus_observer * obs)301 int zbus_obs_detach_from_thread(const struct zbus_observer *obs)
302 {
303 _ZBUS_ASSERT(!k_is_in_isr(), "cannot detach from an ISR");
304 _ZBUS_ASSERT(obs != NULL, "obs is required");
305
306 K_SPINLOCK(&obs_slock) {
307 obs->data->priority = ZBUS_MIN_THREAD_PRIORITY;
308
309 update_all_channels_hop(obs);
310 }
311
312 return 0;
313 }
314
315 #else
316
update_all_channels_hop(const struct zbus_observer * obs)317 static inline void update_all_channels_hop(const struct zbus_observer *obs)
318 {
319 }
320
321 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
322
chan_lock(const struct zbus_channel * chan,k_timeout_t timeout,int * prio)323 static inline int chan_lock(const struct zbus_channel *chan, k_timeout_t timeout, int *prio)
324 {
325 bool boosting = false;
326
327 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
328 if (!k_is_in_isr()) {
329 *prio = k_thread_priority_get(k_current_get());
330
331 K_SPINLOCK(&_zbus_chan_slock) {
332 if (*prio > chan->data->highest_observer_priority) {
333 int new_prio = chan->data->highest_observer_priority - 1;
334
335 new_prio = MAX(new_prio, 0);
336
337 /* Elevating priority since the highest_observer_priority is
338 * greater than the current thread
339 */
340 k_thread_priority_set(k_current_get(), new_prio);
341
342 boosting = true;
343 }
344 }
345 }
346 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
347
348 int err = k_sem_take(&chan->data->sem, timeout);
349
350 if (err) {
351 /* When the priority boost is disabled, this IF will be optimized out. */
352 if (boosting) {
353 /* Restoring thread priority since the semaphore is not available */
354 k_thread_priority_set(k_current_get(), *prio);
355 }
356
357 return err;
358 }
359
360 return 0;
361 }
362
chan_unlock(const struct zbus_channel * chan,int prio)363 static inline void chan_unlock(const struct zbus_channel *chan, int prio)
364 {
365 k_sem_give(&chan->data->sem);
366
367 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
368 /* During the unlock phase, with the priority boost enabled, the priority must be
369 * restored to the original value in case it was elevated
370 */
371 if (prio < ZBUS_MIN_THREAD_PRIORITY) {
372 k_thread_priority_set(k_current_get(), prio);
373 }
374 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
375 }
376
zbus_chan_pub(const struct zbus_channel * chan,const void * msg,k_timeout_t timeout)377 int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
378 {
379 int err;
380
381 _ZBUS_ASSERT(chan != NULL, "chan is required");
382 _ZBUS_ASSERT(msg != NULL, "msg is required");
383
384 if (k_is_in_isr()) {
385 timeout = K_NO_WAIT;
386 }
387
388 k_timepoint_t end_time = sys_timepoint_calc(timeout);
389
390 if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) {
391 return -ENOMSG;
392 }
393
394 int context_priority = ZBUS_MIN_THREAD_PRIORITY;
395
396 err = chan_lock(chan, timeout, &context_priority);
397 if (err) {
398 return err;
399 }
400
401 #if defined(CONFIG_ZBUS_CHANNEL_PUBLISH_STATS)
402 chan->data->publish_timestamp = k_uptime_ticks();
403 chan->data->publish_count += 1;
404 #endif /* CONFIG_ZBUS_CHANNEL_PUBLISH_STATS */
405
406 memcpy(chan->message, msg, chan->message_size);
407
408 err = _zbus_vded_exec(chan, end_time);
409
410 chan_unlock(chan, context_priority);
411
412 return err;
413 }
414
zbus_chan_read(const struct zbus_channel * chan,void * msg,k_timeout_t timeout)415 int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout)
416 {
417 _ZBUS_ASSERT(chan != NULL, "chan is required");
418 _ZBUS_ASSERT(msg != NULL, "msg is required");
419
420 if (k_is_in_isr()) {
421 timeout = K_NO_WAIT;
422 }
423
424 int err = k_sem_take(&chan->data->sem, timeout);
425 if (err) {
426 return err;
427 }
428
429 memcpy(msg, chan->message, chan->message_size);
430
431 k_sem_give(&chan->data->sem);
432
433 return 0;
434 }
435
zbus_chan_notify(const struct zbus_channel * chan,k_timeout_t timeout)436 int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout)
437 {
438 int err;
439
440 _ZBUS_ASSERT(chan != NULL, "chan is required");
441
442 if (k_is_in_isr()) {
443 timeout = K_NO_WAIT;
444 }
445
446 k_timepoint_t end_time = sys_timepoint_calc(timeout);
447
448 int context_priority = ZBUS_MIN_THREAD_PRIORITY;
449
450 err = chan_lock(chan, timeout, &context_priority);
451 if (err) {
452 return err;
453 }
454
455 err = _zbus_vded_exec(chan, end_time);
456
457 chan_unlock(chan, context_priority);
458
459 return err;
460 }
461
zbus_chan_claim(const struct zbus_channel * chan,k_timeout_t timeout)462 int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout)
463 {
464 _ZBUS_ASSERT(chan != NULL, "chan is required");
465
466 if (k_is_in_isr()) {
467 timeout = K_NO_WAIT;
468 }
469
470 int err = k_sem_take(&chan->data->sem, timeout);
471
472 if (err) {
473 return err;
474 }
475
476 return 0;
477 }
478
zbus_chan_finish(const struct zbus_channel * chan)479 int zbus_chan_finish(const struct zbus_channel *chan)
480 {
481 _ZBUS_ASSERT(chan != NULL, "chan is required");
482
483 k_sem_give(&chan->data->sem);
484
485 return 0;
486 }
487
zbus_sub_wait(const struct zbus_observer * sub,const struct zbus_channel ** chan,k_timeout_t timeout)488 int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan,
489 k_timeout_t timeout)
490 {
491 _ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait cannot be used inside ISRs");
492 _ZBUS_ASSERT(sub != NULL, "sub is required");
493 _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER");
494 _ZBUS_ASSERT(sub->queue != NULL, "sub queue is required");
495 _ZBUS_ASSERT(chan != NULL, "chan is required");
496
497 return k_msgq_get(sub->queue, chan, timeout);
498 }
499
500 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
501
zbus_sub_wait_msg(const struct zbus_observer * sub,const struct zbus_channel ** chan,void * msg,k_timeout_t timeout)502 int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg,
503 k_timeout_t timeout)
504 {
505 _ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait_msg cannot be used inside ISRs");
506 _ZBUS_ASSERT(sub != NULL, "sub is required");
507 _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE,
508 "sub must be a MSG_SUBSCRIBER");
509 _ZBUS_ASSERT(sub->message_fifo != NULL, "sub message_fifo is required");
510 _ZBUS_ASSERT(chan != NULL, "chan is required");
511 _ZBUS_ASSERT(msg != NULL, "msg is required");
512
513 struct net_buf *buf = k_fifo_get(sub->message_fifo, timeout);
514
515 if (buf == NULL) {
516 return -ENOMSG;
517 }
518
519 *chan = *((struct zbus_channel **)net_buf_user_data(buf));
520
521 memcpy(msg, net_buf_remove_mem(buf, zbus_chan_msg_size(*chan)), zbus_chan_msg_size(*chan));
522
523 net_buf_unref(buf);
524
525 return 0;
526 }
527
528 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
529
zbus_obs_set_chan_notification_mask(const struct zbus_observer * obs,const struct zbus_channel * chan,bool masked)530 int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs,
531 const struct zbus_channel *chan, bool masked)
532 {
533 _ZBUS_ASSERT(obs != NULL, "obs is required");
534 _ZBUS_ASSERT(chan != NULL, "chan is required");
535
536 int err = -ESRCH;
537
538 struct zbus_channel_observation *observation;
539 struct zbus_channel_observation_mask *observation_mask;
540
541 K_SPINLOCK(&obs_slock) {
542 for (int16_t i = chan->data->observers_start_idx,
543 limit = chan->data->observers_end_idx;
544 i < limit; ++i) {
545 STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
546 STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
547
548 __ASSERT(observation != NULL, "observation must be not NULL");
549
550 if (observation->obs == obs) {
551 if (observation_mask->enabled != masked) {
552 observation_mask->enabled = masked;
553
554 update_all_channels_hop(obs);
555 }
556
557 err = 0;
558
559 K_SPINLOCK_BREAK;
560 }
561 }
562 }
563
564 return err;
565 }
566
zbus_obs_is_chan_notification_masked(const struct zbus_observer * obs,const struct zbus_channel * chan,bool * masked)567 int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs,
568 const struct zbus_channel *chan, bool *masked)
569 {
570 _ZBUS_ASSERT(obs != NULL, "obs is required");
571 _ZBUS_ASSERT(chan != NULL, "chan is required");
572
573 int err = -ESRCH;
574
575 struct zbus_channel_observation *observation;
576 struct zbus_channel_observation_mask *observation_mask;
577
578 K_SPINLOCK(&obs_slock) {
579 const int limit = chan->data->observers_end_idx;
580
581 for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) {
582 STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
583 STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
584
585 __ASSERT(observation != NULL, "observation must be not NULL");
586
587 if (observation->obs == obs) {
588 *masked = observation_mask->enabled;
589
590 err = 0;
591
592 K_SPINLOCK_BREAK;
593 }
594 }
595 }
596
597 return err;
598 }
599
zbus_obs_set_enable(const struct zbus_observer * obs,bool enabled)600 int zbus_obs_set_enable(const struct zbus_observer *obs, bool enabled)
601 {
602 _ZBUS_ASSERT(obs != NULL, "obs is required");
603
604 K_SPINLOCK(&obs_slock) {
605 if (obs->data->enabled != enabled) {
606 obs->data->enabled = enabled;
607
608 update_all_channels_hop(obs);
609 }
610 }
611
612 return 0;
613 }
614