1 /*
2  * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 
6 #include <zephyr/kernel.h>
7 #include <zephyr/init.h>
8 #include <zephyr/sys/iterable_sections.h>
9 #include <zephyr/logging/log.h>
10 #include <zephyr/sys/printk.h>
11 #include <zephyr/net_buf.h>
12 #include <zephyr/zbus/zbus.h>
13 LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL);
14 
15 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
16 /* Available only when the priority boost is enabled */
17 static struct k_spinlock _zbus_chan_slock;
18 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
19 
20 static struct k_spinlock obs_slock;
21 
22 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
23 
24 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC)
25 
26 NET_BUF_POOL_HEAP_DEFINE(_zbus_msg_subscribers_pool, CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE,
27 			 sizeof(struct zbus_channel *), NULL);
28 
_zbus_create_net_buf(struct net_buf_pool * pool,size_t size,k_timeout_t timeout)29 static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size,
30 						   k_timeout_t timeout)
31 {
32 	return net_buf_alloc_len(pool, size, timeout);
33 }
34 
35 #else
36 
37 NET_BUF_POOL_FIXED_DEFINE(_zbus_msg_subscribers_pool,
38 			  (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE),
39 			  (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE),
40 			  sizeof(struct zbus_channel *), NULL);
41 
_zbus_create_net_buf(struct net_buf_pool * pool,size_t size,k_timeout_t timeout)42 static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size,
43 						   k_timeout_t timeout)
44 {
45 	__ASSERT(size <= CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE,
46 		 "CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE must be greater or equal to "
47 		 "%d",
48 		 (int)size);
49 	return net_buf_alloc(pool, timeout);
50 }
51 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC */
52 
53 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
54 
_zbus_init(void)55 int _zbus_init(void)
56 {
57 
58 	const struct zbus_channel *curr = NULL;
59 	const struct zbus_channel *prev = NULL;
60 
61 	STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) {
62 		curr = observation->chan;
63 
64 		if (prev != curr) {
65 			if (prev == NULL) {
66 				curr->data->observers_start_idx = 0;
67 				curr->data->observers_end_idx = 0;
68 			} else {
69 				curr->data->observers_start_idx = prev->data->observers_end_idx;
70 				curr->data->observers_end_idx = prev->data->observers_end_idx;
71 			}
72 			prev = curr;
73 		}
74 
75 		++(curr->data->observers_end_idx);
76 	}
77 
78 #if defined(CONFIG_ZBUS_CHANNEL_ID)
79 	STRUCT_SECTION_FOREACH(zbus_channel, chan) {
80 		/* Check for duplicate channel IDs */
81 		if (chan->id == ZBUS_CHAN_ID_INVALID) {
82 			continue;
83 		}
84 		/* Iterate over all previous channels */
85 		STRUCT_SECTION_FOREACH(zbus_channel, chan_prev) {
86 			if (chan_prev == chan) {
87 				break;
88 			}
89 			if (chan->id == chan_prev->id) {
90 #if defined(CONFIG_ZBUS_CHANNEL_NAME)
91 				LOG_WRN("Channels %s and %s have matching IDs (%d)", chan->name,
92 					chan_prev->name, chan->id);
93 #else
94 				LOG_WRN("Channels %p and %p have matching IDs (%d)", chan,
95 					chan_prev, chan->id);
96 #endif /* CONFIG_ZBUS_CHANNEL_NAME */
97 			}
98 		}
99 	}
100 #endif /* CONFIG_ZBUS_CHANNEL_ID */
101 
102 	return 0;
103 }
104 SYS_INIT(_zbus_init, APPLICATION, CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY);
105 
106 #if defined(CONFIG_ZBUS_CHANNEL_ID)
107 
zbus_chan_from_id(uint32_t channel_id)108 const struct zbus_channel *zbus_chan_from_id(uint32_t channel_id)
109 {
110 	if (channel_id == ZBUS_CHAN_ID_INVALID) {
111 		return NULL;
112 	}
113 	STRUCT_SECTION_FOREACH(zbus_channel, chan) {
114 		if (chan->id == channel_id) {
115 			/* Found matching channel */
116 			return chan;
117 		}
118 	}
119 	/* No matching channel exists */
120 	return NULL;
121 }
122 
123 #endif /* CONFIG_ZBUS_CHANNEL_ID */
124 
_zbus_notify_observer(const struct zbus_channel * chan,const struct zbus_observer * obs,k_timepoint_t end_time,struct net_buf * buf)125 static inline int _zbus_notify_observer(const struct zbus_channel *chan,
126 					const struct zbus_observer *obs, k_timepoint_t end_time,
127 					struct net_buf *buf)
128 {
129 	switch (obs->type) {
130 	case ZBUS_OBSERVER_LISTENER_TYPE: {
131 		obs->callback(chan);
132 		break;
133 	}
134 	case ZBUS_OBSERVER_SUBSCRIBER_TYPE: {
135 		return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time));
136 	}
137 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
138 	case ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE: {
139 		struct net_buf *cloned_buf = net_buf_clone(buf, sys_timepoint_timeout(end_time));
140 
141 		if (cloned_buf == NULL) {
142 			return -ENOMEM;
143 		}
144 
145 		k_fifo_put(obs->message_fifo, cloned_buf);
146 
147 		break;
148 	}
149 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
150 
151 	default:
152 		_ZBUS_ASSERT(false, "Unreachable");
153 	}
154 	return 0;
155 }
156 
_zbus_vded_exec(const struct zbus_channel * chan,k_timepoint_t end_time)157 static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time)
158 {
159 	int err = 0;
160 	int last_error = 0;
161 	struct net_buf *buf = NULL;
162 
163 	/* Static observer event dispatcher logic */
164 	struct zbus_channel_observation *observation;
165 	struct zbus_channel_observation_mask *observation_mask;
166 
167 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
168 	struct net_buf_pool *pool =
169 		COND_CODE_1(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION,
170 			    (chan->data->msg_subscriber_pool), (&_zbus_msg_subscribers_pool));
171 
172 	buf = _zbus_create_net_buf(pool, zbus_chan_msg_size(chan), sys_timepoint_timeout(end_time));
173 
174 	_ZBUS_ASSERT(buf != NULL, "net_buf zbus_msg_subscribers_pool is "
175 				  "unavailable or heap is full");
176 
177 	memcpy(net_buf_user_data(buf), &chan, sizeof(struct zbus_channel *));
178 
179 	net_buf_add_mem(buf, zbus_chan_msg(chan), zbus_chan_msg_size(chan));
180 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
181 
182 	LOG_DBG("Notifing %s's observers. Starting VDED:", _ZBUS_CHAN_NAME(chan));
183 
184 	int __maybe_unused index = 0;
185 
186 	for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx;
187 	     i < limit; ++i) {
188 		STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
189 		STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
190 
191 		_ZBUS_ASSERT(observation != NULL, "observation must be not NULL");
192 
193 		const struct zbus_observer *obs = observation->obs;
194 
195 		if (!obs->data->enabled || observation_mask->enabled) {
196 			continue;
197 		}
198 
199 		err = _zbus_notify_observer(chan, obs, end_time, buf);
200 
201 		if (err) {
202 			last_error = err;
203 			LOG_ERR("could not deliver notification to observer %s. Error code %d",
204 				_ZBUS_OBS_NAME(obs), err);
205 			if (err == -ENOMEM) {
206 				if (IS_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER)) {
207 					net_buf_unref(buf);
208 				}
209 				return err;
210 			}
211 		}
212 
213 		LOG_DBG(" %d -> %s", index++, _ZBUS_OBS_NAME(obs));
214 	}
215 
216 #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS)
217 	/* Dynamic observer event dispatcher logic */
218 	struct zbus_observer_node *obs_nd, *tmp;
219 
220 	SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
221 		const struct zbus_observer *obs = obs_nd->obs;
222 
223 		if (!obs->data->enabled) {
224 			continue;
225 		}
226 
227 		err = _zbus_notify_observer(chan, obs, end_time, buf);
228 
229 		if (err) {
230 			last_error = err;
231 		}
232 	}
233 #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */
234 
235 	IF_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER, (net_buf_unref(buf);))
236 
237 	return last_error;
238 }
239 
240 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
241 
chan_update_hop(const struct zbus_channel * chan)242 static inline void chan_update_hop(const struct zbus_channel *chan)
243 {
244 	struct zbus_channel_observation *observation;
245 	struct zbus_channel_observation_mask *observation_mask;
246 
247 	int chan_highest_observer_priority = ZBUS_MIN_THREAD_PRIORITY;
248 
249 	K_SPINLOCK(&_zbus_chan_slock) {
250 		const int limit = chan->data->observers_end_idx;
251 
252 		for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) {
253 			STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
254 			STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
255 
256 			__ASSERT(observation != NULL, "observation must be not NULL");
257 
258 			const struct zbus_observer *obs = observation->obs;
259 
260 			if (!obs->data->enabled || observation_mask->enabled) {
261 				continue;
262 			}
263 
264 			if (chan_highest_observer_priority > obs->data->priority) {
265 				chan_highest_observer_priority = obs->data->priority;
266 			}
267 		}
268 		chan->data->highest_observer_priority = chan_highest_observer_priority;
269 	}
270 }
271 
update_all_channels_hop(const struct zbus_observer * obs)272 static inline void update_all_channels_hop(const struct zbus_observer *obs)
273 {
274 	STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) {
275 		if (obs != observation->obs) {
276 			continue;
277 		}
278 
279 		chan_update_hop(observation->chan);
280 	}
281 }
282 
zbus_obs_attach_to_thread(const struct zbus_observer * obs)283 int zbus_obs_attach_to_thread(const struct zbus_observer *obs)
284 {
285 	_ZBUS_ASSERT(!k_is_in_isr(), "cannot attach to an ISR");
286 	_ZBUS_ASSERT(obs != NULL, "obs is required");
287 
288 	int current_thread_priority = k_thread_priority_get(k_current_get());
289 
290 	K_SPINLOCK(&obs_slock) {
291 		if (obs->data->priority != current_thread_priority) {
292 			obs->data->priority = current_thread_priority;
293 
294 			update_all_channels_hop(obs);
295 		}
296 	}
297 
298 	return 0;
299 }
300 
zbus_obs_detach_from_thread(const struct zbus_observer * obs)301 int zbus_obs_detach_from_thread(const struct zbus_observer *obs)
302 {
303 	_ZBUS_ASSERT(!k_is_in_isr(), "cannot detach from an ISR");
304 	_ZBUS_ASSERT(obs != NULL, "obs is required");
305 
306 	K_SPINLOCK(&obs_slock) {
307 		obs->data->priority = ZBUS_MIN_THREAD_PRIORITY;
308 
309 		update_all_channels_hop(obs);
310 	}
311 
312 	return 0;
313 }
314 
315 #else
316 
update_all_channels_hop(const struct zbus_observer * obs)317 static inline void update_all_channels_hop(const struct zbus_observer *obs)
318 {
319 }
320 
321 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
322 
chan_lock(const struct zbus_channel * chan,k_timeout_t timeout,int * prio)323 static inline int chan_lock(const struct zbus_channel *chan, k_timeout_t timeout, int *prio)
324 {
325 	bool boosting = false;
326 
327 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
328 	if (!k_is_in_isr()) {
329 		*prio = k_thread_priority_get(k_current_get());
330 
331 		K_SPINLOCK(&_zbus_chan_slock) {
332 			if (*prio > chan->data->highest_observer_priority) {
333 				int new_prio = chan->data->highest_observer_priority - 1;
334 
335 				new_prio = MAX(new_prio, 0);
336 
337 				/* Elevating priority since the highest_observer_priority is
338 				 * greater than the current thread
339 				 */
340 				k_thread_priority_set(k_current_get(), new_prio);
341 
342 				boosting = true;
343 			}
344 		}
345 	}
346 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
347 
348 	int err = k_sem_take(&chan->data->sem, timeout);
349 
350 	if (err) {
351 		/* When the priority boost is disabled, this IF will be optimized out. */
352 		if (boosting) {
353 			/* Restoring thread priority since the semaphore is not available */
354 			k_thread_priority_set(k_current_get(), *prio);
355 		}
356 
357 		return err;
358 	}
359 
360 	return 0;
361 }
362 
chan_unlock(const struct zbus_channel * chan,int prio)363 static inline void chan_unlock(const struct zbus_channel *chan, int prio)
364 {
365 	k_sem_give(&chan->data->sem);
366 
367 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
368 	/* During the unlock phase, with the priority boost enabled, the priority must be
369 	 * restored to the original value in case it was elevated
370 	 */
371 	if (prio < ZBUS_MIN_THREAD_PRIORITY) {
372 		k_thread_priority_set(k_current_get(), prio);
373 	}
374 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
375 }
376 
zbus_chan_pub(const struct zbus_channel * chan,const void * msg,k_timeout_t timeout)377 int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
378 {
379 	int err;
380 
381 	_ZBUS_ASSERT(chan != NULL, "chan is required");
382 	_ZBUS_ASSERT(msg != NULL, "msg is required");
383 
384 	if (k_is_in_isr()) {
385 		timeout = K_NO_WAIT;
386 	}
387 
388 	k_timepoint_t end_time = sys_timepoint_calc(timeout);
389 
390 	if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) {
391 		return -ENOMSG;
392 	}
393 
394 	int context_priority = ZBUS_MIN_THREAD_PRIORITY;
395 
396 	err = chan_lock(chan, timeout, &context_priority);
397 	if (err) {
398 		return err;
399 	}
400 
401 #if defined(CONFIG_ZBUS_CHANNEL_PUBLISH_STATS)
402 	chan->data->publish_timestamp = k_uptime_ticks();
403 	chan->data->publish_count += 1;
404 #endif /* CONFIG_ZBUS_CHANNEL_PUBLISH_STATS */
405 
406 	memcpy(chan->message, msg, chan->message_size);
407 
408 	err = _zbus_vded_exec(chan, end_time);
409 
410 	chan_unlock(chan, context_priority);
411 
412 	return err;
413 }
414 
zbus_chan_read(const struct zbus_channel * chan,void * msg,k_timeout_t timeout)415 int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout)
416 {
417 	_ZBUS_ASSERT(chan != NULL, "chan is required");
418 	_ZBUS_ASSERT(msg != NULL, "msg is required");
419 
420 	if (k_is_in_isr()) {
421 		timeout = K_NO_WAIT;
422 	}
423 
424 	int err = k_sem_take(&chan->data->sem, timeout);
425 	if (err) {
426 		return err;
427 	}
428 
429 	memcpy(msg, chan->message, chan->message_size);
430 
431 	k_sem_give(&chan->data->sem);
432 
433 	return 0;
434 }
435 
zbus_chan_notify(const struct zbus_channel * chan,k_timeout_t timeout)436 int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout)
437 {
438 	int err;
439 
440 	_ZBUS_ASSERT(chan != NULL, "chan is required");
441 
442 	if (k_is_in_isr()) {
443 		timeout = K_NO_WAIT;
444 	}
445 
446 	k_timepoint_t end_time = sys_timepoint_calc(timeout);
447 
448 	int context_priority = ZBUS_MIN_THREAD_PRIORITY;
449 
450 	err = chan_lock(chan, timeout, &context_priority);
451 	if (err) {
452 		return err;
453 	}
454 
455 	err = _zbus_vded_exec(chan, end_time);
456 
457 	chan_unlock(chan, context_priority);
458 
459 	return err;
460 }
461 
zbus_chan_claim(const struct zbus_channel * chan,k_timeout_t timeout)462 int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout)
463 {
464 	_ZBUS_ASSERT(chan != NULL, "chan is required");
465 
466 	if (k_is_in_isr()) {
467 		timeout = K_NO_WAIT;
468 	}
469 
470 	int err = k_sem_take(&chan->data->sem, timeout);
471 
472 	if (err) {
473 		return err;
474 	}
475 
476 	return 0;
477 }
478 
zbus_chan_finish(const struct zbus_channel * chan)479 int zbus_chan_finish(const struct zbus_channel *chan)
480 {
481 	_ZBUS_ASSERT(chan != NULL, "chan is required");
482 
483 	k_sem_give(&chan->data->sem);
484 
485 	return 0;
486 }
487 
zbus_sub_wait(const struct zbus_observer * sub,const struct zbus_channel ** chan,k_timeout_t timeout)488 int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan,
489 		  k_timeout_t timeout)
490 {
491 	_ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait cannot be used inside ISRs");
492 	_ZBUS_ASSERT(sub != NULL, "sub is required");
493 	_ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER");
494 	_ZBUS_ASSERT(sub->queue != NULL, "sub queue is required");
495 	_ZBUS_ASSERT(chan != NULL, "chan is required");
496 
497 	return k_msgq_get(sub->queue, chan, timeout);
498 }
499 
500 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
501 
zbus_sub_wait_msg(const struct zbus_observer * sub,const struct zbus_channel ** chan,void * msg,k_timeout_t timeout)502 int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg,
503 		      k_timeout_t timeout)
504 {
505 	_ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait_msg cannot be used inside ISRs");
506 	_ZBUS_ASSERT(sub != NULL, "sub is required");
507 	_ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE,
508 		     "sub must be a MSG_SUBSCRIBER");
509 	_ZBUS_ASSERT(sub->message_fifo != NULL, "sub message_fifo is required");
510 	_ZBUS_ASSERT(chan != NULL, "chan is required");
511 	_ZBUS_ASSERT(msg != NULL, "msg is required");
512 
513 	struct net_buf *buf = k_fifo_get(sub->message_fifo, timeout);
514 
515 	if (buf == NULL) {
516 		return -ENOMSG;
517 	}
518 
519 	*chan = *((struct zbus_channel **)net_buf_user_data(buf));
520 
521 	memcpy(msg, net_buf_remove_mem(buf, zbus_chan_msg_size(*chan)), zbus_chan_msg_size(*chan));
522 
523 	net_buf_unref(buf);
524 
525 	return 0;
526 }
527 
528 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
529 
zbus_obs_set_chan_notification_mask(const struct zbus_observer * obs,const struct zbus_channel * chan,bool masked)530 int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs,
531 					const struct zbus_channel *chan, bool masked)
532 {
533 	_ZBUS_ASSERT(obs != NULL, "obs is required");
534 	_ZBUS_ASSERT(chan != NULL, "chan is required");
535 
536 	int err = -ESRCH;
537 
538 	struct zbus_channel_observation *observation;
539 	struct zbus_channel_observation_mask *observation_mask;
540 
541 	K_SPINLOCK(&obs_slock) {
542 		for (int16_t i = chan->data->observers_start_idx,
543 			     limit = chan->data->observers_end_idx;
544 		     i < limit; ++i) {
545 			STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
546 			STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
547 
548 			__ASSERT(observation != NULL, "observation must be not NULL");
549 
550 			if (observation->obs == obs) {
551 				if (observation_mask->enabled != masked) {
552 					observation_mask->enabled = masked;
553 
554 					update_all_channels_hop(obs);
555 				}
556 
557 				err = 0;
558 
559 				K_SPINLOCK_BREAK;
560 			}
561 		}
562 	}
563 
564 	return err;
565 }
566 
zbus_obs_is_chan_notification_masked(const struct zbus_observer * obs,const struct zbus_channel * chan,bool * masked)567 int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs,
568 					 const struct zbus_channel *chan, bool *masked)
569 {
570 	_ZBUS_ASSERT(obs != NULL, "obs is required");
571 	_ZBUS_ASSERT(chan != NULL, "chan is required");
572 
573 	int err = -ESRCH;
574 
575 	struct zbus_channel_observation *observation;
576 	struct zbus_channel_observation_mask *observation_mask;
577 
578 	K_SPINLOCK(&obs_slock) {
579 		const int limit = chan->data->observers_end_idx;
580 
581 		for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) {
582 			STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
583 			STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
584 
585 			__ASSERT(observation != NULL, "observation must be not NULL");
586 
587 			if (observation->obs == obs) {
588 				*masked = observation_mask->enabled;
589 
590 				err = 0;
591 
592 				K_SPINLOCK_BREAK;
593 			}
594 		}
595 	}
596 
597 	return err;
598 }
599 
zbus_obs_set_enable(const struct zbus_observer * obs,bool enabled)600 int zbus_obs_set_enable(const struct zbus_observer *obs, bool enabled)
601 {
602 	_ZBUS_ASSERT(obs != NULL, "obs is required");
603 
604 	K_SPINLOCK(&obs_slock) {
605 		if (obs->data->enabled != enabled) {
606 			obs->data->enabled = enabled;
607 
608 			update_all_channels_hop(obs);
609 		}
610 	}
611 
612 	return 0;
613 }
614