1 /*
2 * Copyright (c) 2022 Rodrigo Peixoto <rodrigopex@gmail.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 #include <zephyr/kernel.h>
7 #include <zephyr/init.h>
8 #include <zephyr/sys/iterable_sections.h>
9 #include <zephyr/logging/log.h>
10 #include <zephyr/sys/printk.h>
11 #include <zephyr/net_buf.h>
12 #include <zephyr/zbus/zbus.h>
13 LOG_MODULE_REGISTER(zbus, CONFIG_ZBUS_LOG_LEVEL);
14
15 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
16 /* Available only when the priority boost is enabled */
17 static struct k_spinlock _zbus_chan_slock;
18 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
19
20 static struct k_spinlock obs_slock;
21
22 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
23
24 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC)
25
26 NET_BUF_POOL_HEAP_DEFINE(_zbus_msg_subscribers_pool, CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE,
27 sizeof(struct zbus_channel *), NULL);
28
_zbus_create_net_buf(struct net_buf_pool * pool,size_t size,k_timeout_t timeout)29 static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size,
30 k_timeout_t timeout)
31 {
32 return net_buf_alloc_len(pool, size, timeout);
33 }
34
35 #else
36
37 NET_BUF_POOL_FIXED_DEFINE(_zbus_msg_subscribers_pool,
38 (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_SIZE),
39 (CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE),
40 sizeof(struct zbus_channel *), NULL);
41
_zbus_create_net_buf(struct net_buf_pool * pool,size_t size,k_timeout_t timeout)42 static inline struct net_buf *_zbus_create_net_buf(struct net_buf_pool *pool, size_t size,
43 k_timeout_t timeout)
44 {
45 __ASSERT(size <= CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE,
46 "CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_STATIC_DATA_SIZE must be greater or equal to "
47 "%d",
48 (int)size);
49 return net_buf_alloc(pool, timeout);
50 }
51 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER_BUF_ALLOC_DYNAMIC */
52
53 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
54
_zbus_init(void)55 int _zbus_init(void)
56 {
57
58 const struct zbus_channel *curr = NULL;
59 const struct zbus_channel *prev = NULL;
60
61 STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) {
62 curr = observation->chan;
63
64 if (prev != curr) {
65 if (prev == NULL) {
66 curr->data->observers_start_idx = 0;
67 curr->data->observers_end_idx = 0;
68 } else {
69 curr->data->observers_start_idx = prev->data->observers_end_idx;
70 curr->data->observers_end_idx = prev->data->observers_end_idx;
71 }
72 prev = curr;
73 }
74
75 ++(curr->data->observers_end_idx);
76 }
77
78 #if defined(CONFIG_ZBUS_CHANNEL_ID)
79 STRUCT_SECTION_FOREACH(zbus_channel, chan) {
80 /* Check for duplicate channel IDs */
81 if (chan->id == ZBUS_CHAN_ID_INVALID) {
82 continue;
83 }
84 /* Iterate over all previous channels */
85 STRUCT_SECTION_FOREACH(zbus_channel, chan_prev) {
86 if (chan_prev == chan) {
87 break;
88 }
89 if (chan->id == chan_prev->id) {
90 #if defined(CONFIG_ZBUS_CHANNEL_NAME)
91 LOG_WRN("Channels %s and %s have matching IDs (%d)", chan->name,
92 chan_prev->name, chan->id);
93 #else
94 LOG_WRN("Channels %p and %p have matching IDs (%d)", chan,
95 chan_prev, chan->id);
96 #endif /* CONFIG_ZBUS_CHANNEL_NAME */
97 }
98 }
99 }
100 #endif /* CONFIG_ZBUS_CHANNEL_ID */
101
102 return 0;
103 }
104 SYS_INIT(_zbus_init, APPLICATION, CONFIG_ZBUS_CHANNELS_SYS_INIT_PRIORITY);
105
106 #if defined(CONFIG_ZBUS_CHANNEL_ID)
107
zbus_chan_from_id(uint32_t channel_id)108 const struct zbus_channel *zbus_chan_from_id(uint32_t channel_id)
109 {
110 if (channel_id == ZBUS_CHAN_ID_INVALID) {
111 return NULL;
112 }
113 STRUCT_SECTION_FOREACH(zbus_channel, chan) {
114 if (chan->id == channel_id) {
115 /* Found matching channel */
116 return chan;
117 }
118 }
119 /* No matching channel exists */
120 return NULL;
121 }
122
123 #endif /* CONFIG_ZBUS_CHANNEL_ID */
124
_zbus_notify_observer(const struct zbus_channel * chan,const struct zbus_observer * obs,k_timepoint_t end_time,struct net_buf * buf)125 static inline int _zbus_notify_observer(const struct zbus_channel *chan,
126 const struct zbus_observer *obs, k_timepoint_t end_time,
127 struct net_buf *buf)
128 {
129 switch (obs->type) {
130 case ZBUS_OBSERVER_LISTENER_TYPE: {
131 obs->callback(chan);
132 break;
133 }
134 case ZBUS_OBSERVER_SUBSCRIBER_TYPE: {
135 return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time));
136 }
137 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
138 case ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE: {
139 struct net_buf *cloned_buf = net_buf_clone(buf, sys_timepoint_timeout(end_time));
140
141 if (cloned_buf == NULL) {
142 return -ENOMEM;
143 }
144
145 k_fifo_put(obs->message_fifo, cloned_buf);
146
147 break;
148 }
149 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
150
151 default:
152 _ZBUS_ASSERT(false, "Unreachable");
153 }
154 return 0;
155 }
156
_zbus_vded_exec(const struct zbus_channel * chan,k_timepoint_t end_time)157 static inline int _zbus_vded_exec(const struct zbus_channel *chan, k_timepoint_t end_time)
158 {
159 int err = 0;
160 int last_error = 0;
161 struct net_buf *buf = NULL;
162
163 /* Static observer event dispatcher logic */
164 struct zbus_channel_observation *observation;
165 struct zbus_channel_observation_mask *observation_mask;
166
167 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
168 struct net_buf_pool *pool =
169 COND_CODE_1(CONFIG_ZBUS_MSG_SUBSCRIBER_NET_BUF_POOL_ISOLATION,
170 (chan->data->msg_subscriber_pool), (&_zbus_msg_subscribers_pool));
171
172 buf = _zbus_create_net_buf(pool, zbus_chan_msg_size(chan), sys_timepoint_timeout(end_time));
173
174 _ZBUS_ASSERT(buf != NULL, "net_buf zbus_msg_subscribers_pool is "
175 "unavailable or heap is full");
176
177 memcpy(net_buf_user_data(buf), &chan, sizeof(struct zbus_channel *));
178
179 net_buf_add_mem(buf, zbus_chan_msg(chan), zbus_chan_msg_size(chan));
180 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
181
182 LOG_DBG("Notifing %s's observers. Starting VDED:", _ZBUS_CHAN_NAME(chan));
183
184 int __maybe_unused index = 0;
185
186 for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx;
187 i < limit; ++i) {
188 STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
189 STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
190
191 _ZBUS_ASSERT(observation != NULL, "observation must be not NULL");
192
193 const struct zbus_observer *obs = observation->obs;
194
195 if (!obs->data->enabled || observation_mask->enabled) {
196 continue;
197 }
198
199 err = _zbus_notify_observer(chan, obs, end_time, buf);
200
201 if (err) {
202 last_error = err;
203 LOG_ERR("could not deliver notification to observer %s. Error code %d",
204 _ZBUS_OBS_NAME(obs), err);
205 if (err == -ENOMEM) {
206 if (IS_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER)) {
207 net_buf_unref(buf);
208 }
209 return err;
210 }
211 }
212
213 LOG_DBG(" %d -> %s", index++, _ZBUS_OBS_NAME(obs));
214 }
215
216 #if defined(CONFIG_ZBUS_RUNTIME_OBSERVERS)
217 /* Dynamic observer event dispatcher logic */
218 struct zbus_observer_node *obs_nd, *tmp;
219
220 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) {
221 const struct zbus_observer *obs = obs_nd->obs;
222
223 if (!obs->data->enabled) {
224 continue;
225 }
226
227 err = _zbus_notify_observer(chan, obs, end_time, buf);
228
229 if (err) {
230 last_error = err;
231 }
232 }
233 #endif /* CONFIG_ZBUS_RUNTIME_OBSERVERS */
234
235 IF_ENABLED(CONFIG_ZBUS_MSG_SUBSCRIBER, (net_buf_unref(buf);))
236
237 return last_error;
238 }
239
240 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
241
chan_update_hop(const struct zbus_channel * chan)242 static inline void chan_update_hop(const struct zbus_channel *chan)
243 {
244 struct zbus_channel_observation *observation;
245 struct zbus_channel_observation_mask *observation_mask;
246
247 int chan_highest_observer_priority = ZBUS_MIN_THREAD_PRIORITY;
248
249 K_SPINLOCK(&_zbus_chan_slock) {
250 const int limit = chan->data->observers_end_idx;
251
252 for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) {
253 STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
254 STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
255
256 __ASSERT(observation != NULL, "observation must be not NULL");
257
258 const struct zbus_observer *obs = observation->obs;
259
260 if (!obs->data->enabled || observation_mask->enabled) {
261 continue;
262 }
263
264 if (chan_highest_observer_priority > obs->data->priority) {
265 chan_highest_observer_priority = obs->data->priority;
266 }
267 }
268 chan->data->highest_observer_priority = chan_highest_observer_priority;
269 }
270 }
271
update_all_channels_hop(const struct zbus_observer * obs)272 static inline void update_all_channels_hop(const struct zbus_observer *obs)
273 {
274 STRUCT_SECTION_FOREACH(zbus_channel_observation, observation) {
275 if (obs != observation->obs) {
276 continue;
277 }
278
279 chan_update_hop(observation->chan);
280 }
281 }
282
zbus_obs_attach_to_thread(const struct zbus_observer * obs)283 int zbus_obs_attach_to_thread(const struct zbus_observer *obs)
284 {
285 _ZBUS_ASSERT(!k_is_in_isr(), "cannot attach to an ISR");
286 _ZBUS_ASSERT(obs != NULL, "obs is required");
287
288 int current_thread_priority = k_thread_priority_get(k_current_get());
289
290 K_SPINLOCK(&obs_slock) {
291 if (obs->data->priority != current_thread_priority) {
292 obs->data->priority = current_thread_priority;
293
294 update_all_channels_hop(obs);
295 }
296 }
297
298 return 0;
299 }
300
zbus_obs_detach_from_thread(const struct zbus_observer * obs)301 int zbus_obs_detach_from_thread(const struct zbus_observer *obs)
302 {
303 _ZBUS_ASSERT(!k_is_in_isr(), "cannot detach from an ISR");
304 _ZBUS_ASSERT(obs != NULL, "obs is required");
305
306 K_SPINLOCK(&obs_slock) {
307 obs->data->priority = ZBUS_MIN_THREAD_PRIORITY;
308
309 update_all_channels_hop(obs);
310 }
311
312 return 0;
313 }
314
315 #else
316
update_all_channels_hop(const struct zbus_observer * obs)317 static inline void update_all_channels_hop(const struct zbus_observer *obs)
318 {
319 }
320
321 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
322
chan_lock(const struct zbus_channel * chan,k_timeout_t timeout,int * prio)323 static inline int chan_lock(const struct zbus_channel *chan, k_timeout_t timeout, int *prio)
324 {
325 bool boosting = false;
326
327 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
328 if (!k_is_in_isr()) {
329 *prio = k_thread_priority_get(k_current_get());
330
331 K_SPINLOCK(&_zbus_chan_slock) {
332 if (*prio > chan->data->highest_observer_priority) {
333 int new_prio = chan->data->highest_observer_priority - 1;
334
335 new_prio = MAX(new_prio, 0);
336
337 /* Elevating priority since the highest_observer_priority is
338 * greater than the current thread
339 */
340 k_thread_priority_set(k_current_get(), new_prio);
341
342 boosting = true;
343 }
344 }
345 }
346 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
347
348 int err = k_sem_take(&chan->data->sem, timeout);
349
350 if (err) {
351 /* When the priority boost is disabled, this IF will be optimized out. */
352 if (boosting) {
353 /* Restoring thread priority since the semaphore is not available */
354 k_thread_priority_set(k_current_get(), *prio);
355 }
356
357 return err;
358 }
359
360 return 0;
361 }
362
chan_unlock(const struct zbus_channel * chan,int prio)363 static inline void chan_unlock(const struct zbus_channel *chan, int prio)
364 {
365 k_sem_give(&chan->data->sem);
366
367 #if defined(CONFIG_ZBUS_PRIORITY_BOOST)
368 /* During the unlock phase, with the priority boost enabled, the priority must be
369 * restored to the original value in case it was elevated
370 */
371 if (prio < ZBUS_MIN_THREAD_PRIORITY) {
372 k_thread_priority_set(k_current_get(), prio);
373 }
374 #endif /* CONFIG_ZBUS_PRIORITY_BOOST */
375 }
376
zbus_chan_pub(const struct zbus_channel * chan,const void * msg,k_timeout_t timeout)377 int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout)
378 {
379 int err;
380
381 _ZBUS_ASSERT(chan != NULL, "chan is required");
382 _ZBUS_ASSERT(msg != NULL, "msg is required");
383 _ZBUS_ASSERT(k_is_in_isr() ? K_TIMEOUT_EQ(timeout, K_NO_WAIT) : true,
384 "inside an ISR, the timeout must be K_NO_WAIT");
385
386 if (k_is_in_isr()) {
387 timeout = K_NO_WAIT;
388 }
389
390 k_timepoint_t end_time = sys_timepoint_calc(timeout);
391
392 if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) {
393 return -ENOMSG;
394 }
395
396 int context_priority = ZBUS_MIN_THREAD_PRIORITY;
397
398 err = chan_lock(chan, timeout, &context_priority);
399 if (err) {
400 return err;
401 }
402
403 #if defined(CONFIG_ZBUS_CHANNEL_PUBLISH_STATS)
404 chan->data->publish_timestamp = k_uptime_ticks();
405 chan->data->publish_count += 1;
406 #endif /* CONFIG_ZBUS_CHANNEL_PUBLISH_STATS */
407
408 memcpy(chan->message, msg, chan->message_size);
409
410 err = _zbus_vded_exec(chan, end_time);
411
412 chan_unlock(chan, context_priority);
413
414 return err;
415 }
416
zbus_chan_read(const struct zbus_channel * chan,void * msg,k_timeout_t timeout)417 int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout)
418 {
419 _ZBUS_ASSERT(chan != NULL, "chan is required");
420 _ZBUS_ASSERT(msg != NULL, "msg is required");
421 _ZBUS_ASSERT(k_is_in_isr() ? K_TIMEOUT_EQ(timeout, K_NO_WAIT) : true,
422 "inside an ISR, the timeout must be K_NO_WAIT");
423
424 if (k_is_in_isr()) {
425 timeout = K_NO_WAIT;
426 }
427
428 int err = k_sem_take(&chan->data->sem, timeout);
429 if (err) {
430 return err;
431 }
432
433 memcpy(msg, chan->message, chan->message_size);
434
435 k_sem_give(&chan->data->sem);
436
437 return 0;
438 }
439
zbus_chan_notify(const struct zbus_channel * chan,k_timeout_t timeout)440 int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout)
441 {
442 int err;
443
444 _ZBUS_ASSERT(chan != NULL, "chan is required");
445 _ZBUS_ASSERT(k_is_in_isr() ? K_TIMEOUT_EQ(timeout, K_NO_WAIT) : true,
446 "inside an ISR, the timeout must be K_NO_WAIT");
447
448 if (k_is_in_isr()) {
449 timeout = K_NO_WAIT;
450 }
451
452 k_timepoint_t end_time = sys_timepoint_calc(timeout);
453
454 int context_priority = ZBUS_MIN_THREAD_PRIORITY;
455
456 err = chan_lock(chan, timeout, &context_priority);
457 if (err) {
458 return err;
459 }
460
461 err = _zbus_vded_exec(chan, end_time);
462
463 chan_unlock(chan, context_priority);
464
465 return err;
466 }
467
zbus_chan_claim(const struct zbus_channel * chan,k_timeout_t timeout)468 int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout)
469 {
470 _ZBUS_ASSERT(chan != NULL, "chan is required");
471 _ZBUS_ASSERT(k_is_in_isr() ? K_TIMEOUT_EQ(timeout, K_NO_WAIT) : true,
472 "inside an ISR, the timeout must be K_NO_WAIT");
473
474 if (k_is_in_isr()) {
475 timeout = K_NO_WAIT;
476 }
477
478 int err = k_sem_take(&chan->data->sem, timeout);
479
480 if (err) {
481 return err;
482 }
483
484 return 0;
485 }
486
zbus_chan_finish(const struct zbus_channel * chan)487 int zbus_chan_finish(const struct zbus_channel *chan)
488 {
489 _ZBUS_ASSERT(chan != NULL, "chan is required");
490
491 k_sem_give(&chan->data->sem);
492
493 return 0;
494 }
495
zbus_sub_wait(const struct zbus_observer * sub,const struct zbus_channel ** chan,k_timeout_t timeout)496 int zbus_sub_wait(const struct zbus_observer *sub, const struct zbus_channel **chan,
497 k_timeout_t timeout)
498 {
499 _ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait cannot be used inside ISRs");
500 _ZBUS_ASSERT(sub != NULL, "sub is required");
501 _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER");
502 _ZBUS_ASSERT(sub->queue != NULL, "sub queue is required");
503 _ZBUS_ASSERT(chan != NULL, "chan is required");
504
505 return k_msgq_get(sub->queue, chan, timeout);
506 }
507
508 #if defined(CONFIG_ZBUS_MSG_SUBSCRIBER)
509
zbus_sub_wait_msg(const struct zbus_observer * sub,const struct zbus_channel ** chan,void * msg,k_timeout_t timeout)510 int zbus_sub_wait_msg(const struct zbus_observer *sub, const struct zbus_channel **chan, void *msg,
511 k_timeout_t timeout)
512 {
513 _ZBUS_ASSERT(!k_is_in_isr(), "zbus_sub_wait_msg cannot be used inside ISRs");
514 _ZBUS_ASSERT(sub != NULL, "sub is required");
515 _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE,
516 "sub must be a MSG_SUBSCRIBER");
517 _ZBUS_ASSERT(sub->message_fifo != NULL, "sub message_fifo is required");
518 _ZBUS_ASSERT(chan != NULL, "chan is required");
519 _ZBUS_ASSERT(msg != NULL, "msg is required");
520
521 struct net_buf *buf = k_fifo_get(sub->message_fifo, timeout);
522
523 if (buf == NULL) {
524 return -ENOMSG;
525 }
526
527 *chan = *((struct zbus_channel **)net_buf_user_data(buf));
528
529 memcpy(msg, net_buf_remove_mem(buf, zbus_chan_msg_size(*chan)), zbus_chan_msg_size(*chan));
530
531 net_buf_unref(buf);
532
533 return 0;
534 }
535
536 #endif /* CONFIG_ZBUS_MSG_SUBSCRIBER */
537
zbus_obs_set_chan_notification_mask(const struct zbus_observer * obs,const struct zbus_channel * chan,bool masked)538 int zbus_obs_set_chan_notification_mask(const struct zbus_observer *obs,
539 const struct zbus_channel *chan, bool masked)
540 {
541 _ZBUS_ASSERT(obs != NULL, "obs is required");
542 _ZBUS_ASSERT(chan != NULL, "chan is required");
543
544 int err = -ESRCH;
545
546 struct zbus_channel_observation *observation;
547 struct zbus_channel_observation_mask *observation_mask;
548
549 K_SPINLOCK(&obs_slock) {
550 for (int16_t i = chan->data->observers_start_idx,
551 limit = chan->data->observers_end_idx;
552 i < limit; ++i) {
553 STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
554 STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
555
556 __ASSERT(observation != NULL, "observation must be not NULL");
557
558 if (observation->obs == obs) {
559 if (observation_mask->enabled != masked) {
560 observation_mask->enabled = masked;
561
562 update_all_channels_hop(obs);
563 }
564
565 err = 0;
566
567 K_SPINLOCK_BREAK;
568 }
569 }
570 }
571
572 return err;
573 }
574
zbus_obs_is_chan_notification_masked(const struct zbus_observer * obs,const struct zbus_channel * chan,bool * masked)575 int zbus_obs_is_chan_notification_masked(const struct zbus_observer *obs,
576 const struct zbus_channel *chan, bool *masked)
577 {
578 _ZBUS_ASSERT(obs != NULL, "obs is required");
579 _ZBUS_ASSERT(chan != NULL, "chan is required");
580
581 int err = -ESRCH;
582
583 struct zbus_channel_observation *observation;
584 struct zbus_channel_observation_mask *observation_mask;
585
586 K_SPINLOCK(&obs_slock) {
587 const int limit = chan->data->observers_end_idx;
588
589 for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) {
590 STRUCT_SECTION_GET(zbus_channel_observation, i, &observation);
591 STRUCT_SECTION_GET(zbus_channel_observation_mask, i, &observation_mask);
592
593 __ASSERT(observation != NULL, "observation must be not NULL");
594
595 if (observation->obs == obs) {
596 *masked = observation_mask->enabled;
597
598 err = 0;
599
600 K_SPINLOCK_BREAK;
601 }
602 }
603 }
604
605 return err;
606 }
607
zbus_obs_set_enable(const struct zbus_observer * obs,bool enabled)608 int zbus_obs_set_enable(const struct zbus_observer *obs, bool enabled)
609 {
610 _ZBUS_ASSERT(obs != NULL, "obs is required");
611
612 K_SPINLOCK(&obs_slock) {
613 if (obs->data->enabled != enabled) {
614 obs->data->enabled = enabled;
615
616 update_all_channels_hop(obs);
617 }
618 }
619
620 return 0;
621 }
622