1 /*
2  * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 
6 #include <zephyr/kernel.h>
7 #include <zephyr/init.h>
8 #include <zephyr/sys/iterable_sections.h>
9 #include <zephyr/logging/log.h>
10 #include <zephyr/sys/printk.h>
11 #include <zephyr/net_buf.h>
12 #include <zephyr/zbus/zbus.h>
13 LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL);
14 
15 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
16 /* Available only when the priority boost is enabled */
17 static struct k_spinlock _zbus_chan_slock;
18 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
19 
20 static struct k_spinlock obs_slock;
21 
22 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
23 
24 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC)
25 
26 NET_BUF_POOL_HEAP_DEFINE(_zbus_msg_subscribers_pool, CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE,
27 			 sizeof(struct zbus_channel *), NULL);
28 
_zbus_create_net_buf(struct net_buf_pool * pool,size_t size,k_timeout_t timeout)29 static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size,
30 						   k_timeout_t timeout)
31 {
32 	return net_buf_alloc_len(pool, size, timeout);
33 }
34 
35 #else
36 
37 NET_BUF_POOL_FIXED_DEFINE(_zbus_msg_subscribers_pool,
38 			  (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE),
39 			  (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE),
40 			  sizeof(struct zbus_channel *), NULL);
41 
_zbus_create_net_buf(struct net_buf_pool * pool,size_t size,k_timeout_t timeout)42 static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size,
43 						   k_timeout_t timeout)
44 {
45 	__ASSERT(size <= CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE,
46 		 "CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE must be greater or equal to "
47 		 "%d",
48 		 (int)size);
49 	return net_buf_alloc(pool, timeout);
50 }
51 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC */
52 
53 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
54 
_zbus_init(void)55 int _zbus_init(void)
56 {
57 
58 	const struct zbus_channel *curr = NULL;
59 	const struct zbus_channel *prev = NULL;
60 
61 	STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) {
62 		curr = observation->chan;
63 
64 		if (prev != curr) {
65 			if (prev == NULL) {
66 				curr->data->observers_start_idx = 0;
67 				curr->data->observers_end_idx = 0;
68 			} else {
69 				curr->data->observers_start_idx = prev->data->observers_end_idx;
70 				curr->data->observers_end_idx = prev->data->observers_end_idx;
71 			}
72 			prev = curr;
73 		}
74 
75 		++(curr->data->observers_end_idx);
76 	}
77 
78 #if defined(CONFIG_ZBUS_CHANNEL_ID)
79 	STRUCT_SECTION_FOREACH(zbus_channel, chan) {
80 		/* Check for duplicate channel IDs */
81 		if (chan->id == ZBUS_CHAN_ID_INVALID) {
82 			continue;
83 		}
84 		/* Iterate over all previous channels */
85 		STRUCT_SECTION_FOREACH(zbus_channel, chan_prev) {
86 			if (chan_prev == chan) {
87 				break;
88 			}
89 			if (chan->id == chan_prev->id) {
90 #if defined(CONFIG_ZBUS_CHANNEL_NAME)
91 				LOG_WRN("Channels %s and %s have matching IDs (%d)", chan->name,
92 					chan_prev->name, chan->id);
93 #else
94 				LOG_WRN("Channels %p and %p have matching IDs (%d)", chan,
95 					chan_prev, chan->id);
96 #endif /* CONFIG_ZBUS_CHANNEL_NAME */
97 			}
98 		}
99 	}
100 #endif /* CONFIG_ZBUS_CHANNEL_ID */
101 
102 	return 0;
103 }
104 SYS_INIT(_zbus_init, APPLICATION, CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY);
105 
106 #if defined(CONFIG_ZBUS_CHANNEL_ID)
107 
zbus_chan_from_id(uint32_t channel_id)108 const struct zbus_channel *zbus_chan_from_id(uint32_t channel_id)
109 {
110 	if (channel_id == ZBUS_CHAN_ID_INVALID) {
111 		return NULL;
112 	}
113 	STRUCT_SECTION_FOREACH(zbus_channel, chan) {
114 		if (chan->id == channel_id) {
115 			/* Found matching channel */
116 			return chan;
117 		}
118 	}
119 	/* No matching channel exists */
120 	return NULL;
121 }
122 
123 #endif /* CONFIG_ZBUS_CHANNEL_ID */
124 
_zbus_notify_observer(const struct zbus_channel * chan,const struct zbus_observer * obs,k_timepoint_t end_time,struct net_buf * buf)125 static inline int _zbus_notify_observer(const struct zbus_channel *chan,
126 					const struct zbus_observer *obs, k_timepoint_t end_time,
127 					struct net_buf *buf)
128 {
129 	switch (obs->type) {
130 	case ZBUS_OBSERVER_LISTENER_TYPE: {
131 		obs->callback(chan);
132 		break;
133 	}
134 	case ZBUS_OBSERVER_SUBSCRIBER_TYPE: {
135 		return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time));
136 	}
137 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
138 	case ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE: {
139 		struct net_buf *cloned_buf = net_buf_clone(buf, sys_timepoint_timeout(end_time));
140 
141 		if (cloned_buf == NULL) {
142 			return -ENOMEM;
143 		}
144 
145 		k_fifo_put(obs->message_fifo, cloned_buf);
146 
147 		break;
148 	}
149 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
150 
151 	default:
152 		_ZBUS_ASSERT(false, "Unreachable");
153 	}
154 	return 0;
155 }
156 
_zbus_vded_exec(const struct zbus_channel * chan,k_timepoint_t end_time)157 static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time)
158 {
159 	int err = 0;
160 	int last_error = 0;
161 	struct net_buf *buf = NULL;
162 
163 	/* Static observer event dispatcher logic */
164 	struct zbus_channel_observation *observation;
165 	struct zbus_channel_observation_mask *observation_mask;
166 
167 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
168 	struct net_buf_pool *pool =
169 		COND_CODE_1(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION,
170 			    (chan->data->msg_subscriber_pool), (&_zbus_msg_subscribers_pool));
171 
172 	buf = _zbus_create_net_buf(pool, zbus_chan_msg_size(chan), sys_timepoint_timeout(end_time));
173 
174 	_ZBUS_ASSERT(buf != NULL, "net_buf zbus_msg_subscribers_pool is "
175 				  "unavailable or heap is full");
176 
177 	memcpy(net_buf_user_data(buf), &chan, sizeof(struct zbus_channel *));
178 
179 	net_buf_add_mem(buf, zbus_chan_msg(chan), zbus_chan_msg_size(chan));
180 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
181 
182 	LOG_DBG("Notifing %s's observers. Starting VDED:", _ZBUS_CHAN_NAME(chan));
183 
184 	int __maybe_unused index = 0;
185 
186 	for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx;
187 	     i < limit; ++i) {
188 		STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
189 		STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
190 
191 		_ZBUS_ASSERT(observation != NULL, "observation must be not NULL");
192 
193 		const struct zbus_observer *obs = observation->obs;
194 
195 		if (!obs->data->enabled || observation_mask->enabled) {
196 			continue;
197 		}
198 
199 		err = _zbus_notify_observer(chan, obs, end_time, buf);
200 
201 		if (err) {
202 			last_error = err;
203 			LOG_ERR("could not deliver notification to observer %s. Error code %d",
204 				_ZBUS_OBS_NAME(obs), err);
205 			if (err == -ENOMEM) {
206 				if (IS_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER)) {
207 					net_buf_unref(buf);
208 				}
209 				return err;
210 			}
211 		}
212 
213 		LOG_DBG(" %d -> %s", index++, _ZBUS_OBS_NAME(obs));
214 	}
215 
216 #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS)
217 	/* Dynamic observer event dispatcher logic */
218 	struct zbus_observer_node *obs_nd, *tmp;
219 
220 	SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
221 		const struct zbus_observer *obs = obs_nd->obs;
222 
223 		if (!obs->data->enabled) {
224 			continue;
225 		}
226 
227 		err = _zbus_notify_observer(chan, obs, end_time, buf);
228 
229 		if (err) {
230 			last_error = err;
231 		}
232 	}
233 #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */
234 
235 	IF_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER, (net_buf_unref(buf);))
236 
237 	return last_error;
238 }
239 
240 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
241 
chan_update_hop(const struct zbus_channel * chan)242 static inline void chan_update_hop(const struct zbus_channel *chan)
243 {
244 	struct zbus_channel_observation *observation;
245 	struct zbus_channel_observation_mask *observation_mask;
246 
247 	int chan_highest_observer_priority = ZBUS_MIN_THREAD_PRIORITY;
248 
249 	K_SPINLOCK(&_zbus_chan_slock) {
250 		const int limit = chan->data->observers_end_idx;
251 
252 		for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) {
253 			STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
254 			STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
255 
256 			__ASSERT(observation != NULL, "observation must be not NULL");
257 
258 			const struct zbus_observer *obs = observation->obs;
259 
260 			if (!obs->data->enabled || observation_mask->enabled) {
261 				continue;
262 			}
263 
264 			if (chan_highest_observer_priority > obs->data->priority) {
265 				chan_highest_observer_priority = obs->data->priority;
266 			}
267 		}
268 		chan->data->highest_observer_priority = chan_highest_observer_priority;
269 	}
270 }
271 
update_all_channels_hop(const struct zbus_observer * obs)272 static inline void update_all_channels_hop(const struct zbus_observer *obs)
273 {
274 	STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) {
275 		if (obs != observation->obs) {
276 			continue;
277 		}
278 
279 		chan_update_hop(observation->chan);
280 	}
281 }
282 
zbus_obs_attach_to_thread(const struct zbus_observer * obs)283 int zbus_obs_attach_to_thread(const struct zbus_observer *obs)
284 {
285 	_ZBUS_ASSERT(!k_is_in_isr(), "cannot attach to an ISR");
286 	_ZBUS_ASSERT(obs != NULL, "obs is required");
287 
288 	int current_thread_priority = k_thread_priority_get(k_current_get());
289 
290 	K_SPINLOCK(&obs_slock) {
291 		if (obs->data->priority != current_thread_priority) {
292 			obs->data->priority = current_thread_priority;
293 
294 			update_all_channels_hop(obs);
295 		}
296 	}
297 
298 	return 0;
299 }
300 
zbus_obs_detach_from_thread(const struct zbus_observer * obs)301 int zbus_obs_detach_from_thread(const struct zbus_observer *obs)
302 {
303 	_ZBUS_ASSERT(!k_is_in_isr(), "cannot detach from an ISR");
304 	_ZBUS_ASSERT(obs != NULL, "obs is required");
305 
306 	K_SPINLOCK(&obs_slock) {
307 		obs->data->priority = ZBUS_MIN_THREAD_PRIORITY;
308 
309 		update_all_channels_hop(obs);
310 	}
311 
312 	return 0;
313 }
314 
315 #else
316 
update_all_channels_hop(const struct zbus_observer * obs)317 static inline void update_all_channels_hop(const struct zbus_observer *obs)
318 {
319 }
320 
321 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
322 
chan_lock(const struct zbus_channel * chan,k_timeout_t timeout,int * prio)323 static inline int chan_lock(const struct zbus_channel *chan, k_timeout_t timeout, int *prio)
324 {
325 	bool boosting = false;
326 
327 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
328 	if (!k_is_in_isr()) {
329 		*prio = k_thread_priority_get(k_current_get());
330 
331 		K_SPINLOCK(&_zbus_chan_slock) {
332 			if (*prio > chan->data->highest_observer_priority) {
333 				int new_prio = chan->data->highest_observer_priority - 1;
334 
335 				new_prio = MAX(new_prio, 0);
336 
337 				/* Elevating priority since the highest_observer_priority is
338 				 * greater than the current thread
339 				 */
340 				k_thread_priority_set(k_current_get(), new_prio);
341 
342 				boosting = true;
343 			}
344 		}
345 	}
346 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
347 
348 	int err = k_sem_take(&chan->data->sem, timeout);
349 
350 	if (err) {
351 		/* When the priority boost is disabled, this IF will be optimized out. */
352 		if (boosting) {
353 			/* Restoring thread priority since the semaphore is not available */
354 			k_thread_priority_set(k_current_get(), *prio);
355 		}
356 
357 		return err;
358 	}
359 
360 	return 0;
361 }
362 
chan_unlock(const struct zbus_channel * chan,int prio)363 static inline void chan_unlock(const struct zbus_channel *chan, int prio)
364 {
365 	k_sem_give(&chan->data->sem);
366 
367 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
368 	/* During the unlock phase, with the priority boost enabled, the priority must be
369 	 * restored to the original value in case it was elevated
370 	 */
371 	if (prio < ZBUS_MIN_THREAD_PRIORITY) {
372 		k_thread_priority_set(k_current_get(), prio);
373 	}
374 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
375 }
376 
zbus_chan_pub(const struct zbus_channel * chan,const void * msg,k_timeout_t timeout)377 int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
378 {
379 	int err;
380 
381 	_ZBUS_ASSERT(chan != NULL, "chan is required");
382 	_ZBUS_ASSERT(msg != NULL, "msg is required");
383 	_ZBUS_ASSERT(k_is_in_isr() ? K_TIMEOUT_EQ(timeout, K_NO_WAIT) : true,
384 		     "inside an ISR, the timeout must be K_NO_WAIT");
385 
386 	if (k_is_in_isr()) {
387 		timeout = K_NO_WAIT;
388 	}
389 
390 	k_timepoint_t end_time = sys_timepoint_calc(timeout);
391 
392 	if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) {
393 		return -ENOMSG;
394 	}
395 
396 	int context_priority = ZBUS_MIN_THREAD_PRIORITY;
397 
398 	err = chan_lock(chan, timeout, &context_priority);
399 	if (err) {
400 		return err;
401 	}
402 
403 #if defined(CONFIG_ZBUS_CHANNEL_PUBLISH_STATS)
404 	chan->data->publish_timestamp = k_uptime_ticks();
405 	chan->data->publish_count += 1;
406 #endif /* CONFIG_ZBUS_CHANNEL_PUBLISH_STATS */
407 
408 	memcpy(chan->message, msg, chan->message_size);
409 
410 	err = _zbus_vded_exec(chan, end_time);
411 
412 	chan_unlock(chan, context_priority);
413 
414 	return err;
415 }
416 
zbus_chan_read(const struct zbus_channel * chan,void * msg,k_timeout_t timeout)417 int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout)
418 {
419 	_ZBUS_ASSERT(chan != NULL, "chan is required");
420 	_ZBUS_ASSERT(msg != NULL, "msg is required");
421 	_ZBUS_ASSERT(k_is_in_isr() ? K_TIMEOUT_EQ(timeout, K_NO_WAIT) : true,
422 		     "inside an ISR, the timeout must be K_NO_WAIT");
423 
424 	if (k_is_in_isr()) {
425 		timeout = K_NO_WAIT;
426 	}
427 
428 	int err = k_sem_take(&chan->data->sem, timeout);
429 	if (err) {
430 		return err;
431 	}
432 
433 	memcpy(msg, chan->message, chan->message_size);
434 
435 	k_sem_give(&chan->data->sem);
436 
437 	return 0;
438 }
439 
zbus_chan_notify(const struct zbus_channel * chan,k_timeout_t timeout)440 int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout)
441 {
442 	int err;
443 
444 	_ZBUS_ASSERT(chan != NULL, "chan is required");
445 	_ZBUS_ASSERT(k_is_in_isr() ? K_TIMEOUT_EQ(timeout, K_NO_WAIT) : true,
446 		     "inside an ISR, the timeout must be K_NO_WAIT");
447 
448 	if (k_is_in_isr()) {
449 		timeout = K_NO_WAIT;
450 	}
451 
452 	k_timepoint_t end_time = sys_timepoint_calc(timeout);
453 
454 	int context_priority = ZBUS_MIN_THREAD_PRIORITY;
455 
456 	err = chan_lock(chan, timeout, &context_priority);
457 	if (err) {
458 		return err;
459 	}
460 
461 	err = _zbus_vded_exec(chan, end_time);
462 
463 	chan_unlock(chan, context_priority);
464 
465 	return err;
466 }
467 
zbus_chan_claim(const struct zbus_channel * chan,k_timeout_t timeout)468 int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout)
469 {
470 	_ZBUS_ASSERT(chan != NULL, "chan is required");
471 	_ZBUS_ASSERT(k_is_in_isr() ? K_TIMEOUT_EQ(timeout, K_NO_WAIT) : true,
472 		     "inside an ISR, the timeout must be K_NO_WAIT");
473 
474 	if (k_is_in_isr()) {
475 		timeout = K_NO_WAIT;
476 	}
477 
478 	int err = k_sem_take(&chan->data->sem, timeout);
479 
480 	if (err) {
481 		return err;
482 	}
483 
484 	return 0;
485 }
486 
zbus_chan_finish(const struct zbus_channel * chan)487 int zbus_chan_finish(const struct zbus_channel *chan)
488 {
489 	_ZBUS_ASSERT(chan != NULL, "chan is required");
490 
491 	k_sem_give(&chan->data->sem);
492 
493 	return 0;
494 }
495 
zbus_sub_wait(const struct zbus_observer * sub,const struct zbus_channel ** chan,k_timeout_t timeout)496 int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan,
497 		  k_timeout_t timeout)
498 {
499 	_ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait cannot be used inside ISRs");
500 	_ZBUS_ASSERT(sub != NULL, "sub is required");
501 	_ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER");
502 	_ZBUS_ASSERT(sub->queue != NULL, "sub queue is required");
503 	_ZBUS_ASSERT(chan != NULL, "chan is required");
504 
505 	return k_msgq_get(sub->queue, chan, timeout);
506 }
507 
508 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
509 
zbus_sub_wait_msg(const struct zbus_observer * sub,const struct zbus_channel ** chan,void * msg,k_timeout_t timeout)510 int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg,
511 		      k_timeout_t timeout)
512 {
513 	_ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait_msg cannot be used inside ISRs");
514 	_ZBUS_ASSERT(sub != NULL, "sub is required");
515 	_ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE,
516 		     "sub must be a MSG_SUBSCRIBER");
517 	_ZBUS_ASSERT(sub->message_fifo != NULL, "sub message_fifo is required");
518 	_ZBUS_ASSERT(chan != NULL, "chan is required");
519 	_ZBUS_ASSERT(msg != NULL, "msg is required");
520 
521 	struct net_buf *buf = k_fifo_get(sub->message_fifo, timeout);
522 
523 	if (buf == NULL) {
524 		return -ENOMSG;
525 	}
526 
527 	*chan = *((struct zbus_channel **)net_buf_user_data(buf));
528 
529 	memcpy(msg, net_buf_remove_mem(buf, zbus_chan_msg_size(*chan)), zbus_chan_msg_size(*chan));
530 
531 	net_buf_unref(buf);
532 
533 	return 0;
534 }
535 
536 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
537 
zbus_obs_set_chan_notification_mask(const struct zbus_observer * obs,const struct zbus_channel * chan,bool masked)538 int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs,
539 					const struct zbus_channel *chan, bool masked)
540 {
541 	_ZBUS_ASSERT(obs != NULL, "obs is required");
542 	_ZBUS_ASSERT(chan != NULL, "chan is required");
543 
544 	int err = -ESRCH;
545 
546 	struct zbus_channel_observation *observation;
547 	struct zbus_channel_observation_mask *observation_mask;
548 
549 	K_SPINLOCK(&obs_slock) {
550 		for (int16_t i = chan->data->observers_start_idx,
551 			     limit = chan->data->observers_end_idx;
552 		     i < limit; ++i) {
553 			STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
554 			STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
555 
556 			__ASSERT(observation != NULL, "observation must be not NULL");
557 
558 			if (observation->obs == obs) {
559 				if (observation_mask->enabled != masked) {
560 					observation_mask->enabled = masked;
561 
562 					update_all_channels_hop(obs);
563 				}
564 
565 				err = 0;
566 
567 				K_SPINLOCK_BREAK;
568 			}
569 		}
570 	}
571 
572 	return err;
573 }
574 
zbus_obs_is_chan_notification_masked(const struct zbus_observer * obs,const struct zbus_channel * chan,bool * masked)575 int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs,
576 					 const struct zbus_channel *chan, bool *masked)
577 {
578 	_ZBUS_ASSERT(obs != NULL, "obs is required");
579 	_ZBUS_ASSERT(chan != NULL, "chan is required");
580 
581 	int err = -ESRCH;
582 
583 	struct zbus_channel_observation *observation;
584 	struct zbus_channel_observation_mask *observation_mask;
585 
586 	K_SPINLOCK(&obs_slock) {
587 		const int limit = chan->data->observers_end_idx;
588 
589 		for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) {
590 			STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
591 			STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
592 
593 			__ASSERT(observation != NULL, "observation must be not NULL");
594 
595 			if (observation->obs == obs) {
596 				*masked = observation_mask->enabled;
597 
598 				err = 0;
599 
600 				K_SPINLOCK_BREAK;
601 			}
602 		}
603 	}
604 
605 	return err;
606 }
607 
zbus_obs_set_enable(const struct zbus_observer * obs,bool enabled)608 int zbus_obs_set_enable(const struct zbus_observer *obs, bool enabled)
609 {
610 	_ZBUS_ASSERT(obs != NULL, "obs is required");
611 
612 	K_SPINLOCK(&obs_slock) {
613 		if (obs->data->enabled != enabled) {
614 			obs->data->enabled = enabled;
615 
616 			update_all_channels_hop(obs);
617 		}
618 	}
619 
620 	return 0;
621 }
622