Lines Matching +full:data +full:- +full:timeout
3 * SPDX-License-Identifier: Apache-2.0
30 k_timeout_t timeout) in _zbus_create_net_buf() argument
32 return net_buf_alloc_len(pool, size, timeout); in _zbus_create_net_buf()
43 k_timeout_t timeout) in _zbus_create_net_buf() argument
49 return net_buf_alloc(pool, timeout); in _zbus_create_net_buf()
62 curr = observation->chan; in _zbus_init()
66 curr->data->observers_start_idx = 0; in _zbus_init()
67 curr->data->observers_end_idx = 0; in _zbus_init()
69 curr->data->observers_start_idx = prev->data->observers_end_idx; in _zbus_init()
70 curr->data->observers_end_idx = prev->data->observers_end_idx; in _zbus_init()
75 ++(curr->data->observers_end_idx); in _zbus_init()
81 if (chan->id == ZBUS_CHAN_ID_INVALID) { in _zbus_init()
89 if (chan->id == chan_prev->id) { in _zbus_init()
91 LOG_WRN("Channels %s and %s have matching IDs (%d)", chan->name, in _zbus_init()
92 chan_prev->name, chan->id); in _zbus_init()
95 chan_prev, chan->id); in _zbus_init()
114 if (chan->id == channel_id) { in zbus_chan_from_id()
129 switch (obs->type) { in _zbus_notify_observer()
131 obs->callback(chan); in _zbus_notify_observer()
135 return k_msgq_put(obs->queue, &chan, sys_timepoint_timeout(end_time)); in _zbus_notify_observer()
142 return -ENOMEM; in _zbus_notify_observer()
145 k_fifo_put(obs->message_fifo, cloned_buf); in _zbus_notify_observer()
170 (chan->data->msg_subscriber_pool), (&_zbus_msg_subscribers_pool)); in _zbus_vded_exec()
186 for (int16_t i = chan->data->observers_start_idx, limit = chan->data->observers_end_idx; in _zbus_vded_exec()
193 const struct zbus_observer *obs = observation->obs; in _zbus_vded_exec()
195 if (!obs->data->enabled || observation_mask->enabled) { in _zbus_vded_exec()
205 if (err == -ENOMEM) { in _zbus_vded_exec()
213 LOG_DBG(" %d -> %s", index++, _ZBUS_OBS_NAME(obs)); in _zbus_vded_exec()
220 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&chan->data->observers, obs_nd, tmp, node) { in _zbus_vded_exec()
221 const struct zbus_observer *obs = obs_nd->obs; in _zbus_vded_exec()
223 if (!obs->data->enabled) { in _zbus_vded_exec()
250 const int limit = chan->data->observers_end_idx; in chan_update_hop()
252 for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) { in chan_update_hop()
258 const struct zbus_observer *obs = observation->obs; in chan_update_hop()
260 if (!obs->data->enabled || observation_mask->enabled) { in chan_update_hop()
264 if (chan_highest_observer_priority > obs->data->priority) { in chan_update_hop()
265 chan_highest_observer_priority = obs->data->priority; in chan_update_hop()
268 chan->data->highest_observer_priority = chan_highest_observer_priority; in chan_update_hop()
275 if (obs != observation->obs) { in update_all_channels_hop()
279 chan_update_hop(observation->chan); in update_all_channels_hop()
291 if (obs->data->priority != current_thread_priority) { in zbus_obs_attach_to_thread()
292 obs->data->priority = current_thread_priority; in zbus_obs_attach_to_thread()
307 obs->data->priority = ZBUS_MIN_THREAD_PRIORITY; in zbus_obs_detach_from_thread()
323 static inline int chan_lock(const struct zbus_channel *chan, k_timeout_t timeout, int *prio) in chan_lock() argument
332 if (*prio > chan->data->highest_observer_priority) { in chan_lock()
333 int new_prio = chan->data->highest_observer_priority - 1; in chan_lock()
348 int err = k_sem_take(&chan->data->sem, timeout); in chan_lock()
365 k_sem_give(&chan->data->sem); in chan_unlock()
377 int zbus_chan_pub(const struct zbus_channel *chan, const void *msg, k_timeout_t timeout) in zbus_chan_pub() argument
385 timeout = K_NO_WAIT; in zbus_chan_pub()
388 k_timepoint_t end_time = sys_timepoint_calc(timeout); in zbus_chan_pub()
390 if (chan->validator != NULL && !chan->validator(msg, chan->message_size)) { in zbus_chan_pub()
391 return -ENOMSG; in zbus_chan_pub()
396 err = chan_lock(chan, timeout, &context_priority); in zbus_chan_pub()
402 chan->data->publish_timestamp = k_uptime_ticks(); in zbus_chan_pub()
403 chan->data->publish_count += 1; in zbus_chan_pub()
406 memcpy(chan->message, msg, chan->message_size); in zbus_chan_pub()
415 int zbus_chan_read(const struct zbus_channel *chan, void *msg, k_timeout_t timeout) in zbus_chan_read() argument
421 timeout = K_NO_WAIT; in zbus_chan_read()
424 int err = k_sem_take(&chan->data->sem, timeout); in zbus_chan_read()
429 memcpy(msg, chan->message, chan->message_size); in zbus_chan_read()
431 k_sem_give(&chan->data->sem); in zbus_chan_read()
436 int zbus_chan_notify(const struct zbus_channel *chan, k_timeout_t timeout) in zbus_chan_notify() argument
443 timeout = K_NO_WAIT; in zbus_chan_notify()
446 k_timepoint_t end_time = sys_timepoint_calc(timeout); in zbus_chan_notify()
450 err = chan_lock(chan, timeout, &context_priority); in zbus_chan_notify()
462 int zbus_chan_claim(const struct zbus_channel *chan, k_timeout_t timeout) in zbus_chan_claim() argument
467 timeout = K_NO_WAIT; in zbus_chan_claim()
470 int err = k_sem_take(&chan->data->sem, timeout); in zbus_chan_claim()
483 k_sem_give(&chan->data->sem); in zbus_chan_finish()
489 k_timeout_t timeout) in zbus_sub_wait() argument
493 _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_SUBSCRIBER_TYPE, "sub must be a SUBSCRIBER"); in zbus_sub_wait()
494 _ZBUS_ASSERT(sub->queue != NULL, "sub queue is required"); in zbus_sub_wait()
497 return k_msgq_get(sub->queue, chan, timeout); in zbus_sub_wait()
503 k_timeout_t timeout) in zbus_sub_wait_msg() argument
507 _ZBUS_ASSERT(sub->type == ZBUS_OBSERVER_MSG_SUBSCRIBER_TYPE, in zbus_sub_wait_msg()
509 _ZBUS_ASSERT(sub->message_fifo != NULL, "sub message_fifo is required"); in zbus_sub_wait_msg()
513 struct net_buf *buf = k_fifo_get(sub->message_fifo, timeout); in zbus_sub_wait_msg()
516 return -ENOMSG; in zbus_sub_wait_msg()
536 int err = -ESRCH; in zbus_obs_set_chan_notification_mask()
542 for (int16_t i = chan->data->observers_start_idx, in zbus_obs_set_chan_notification_mask()
543 limit = chan->data->observers_end_idx; in zbus_obs_set_chan_notification_mask()
550 if (observation->obs == obs) { in zbus_obs_set_chan_notification_mask()
551 if (observation_mask->enabled != masked) { in zbus_obs_set_chan_notification_mask()
552 observation_mask->enabled = masked; in zbus_obs_set_chan_notification_mask()
573 int err = -ESRCH; in zbus_obs_is_chan_notification_masked()
579 const int limit = chan->data->observers_end_idx; in zbus_obs_is_chan_notification_masked()
581 for (int16_t i = chan->data->observers_start_idx; i < limit; ++i) { in zbus_obs_is_chan_notification_masked()
587 if (observation->obs == obs) { in zbus_obs_is_chan_notification_masked()
588 *masked = observation_mask->enabled; in zbus_obs_is_chan_notification_masked()
605 if (obs->data->enabled != enabled) { in zbus_obs_set_enable()
606 obs->data->enabled = enabled; in zbus_obs_set_enable()