Lines Matching full:queue

10  * Second generation work queue implementation
99 * Invoked from a work queue thread.
119 * Invoked from a work queue thread.
176 /* Add a flusher work item to the queue.
180 * Caller must notify queue of pending work.
182 * @param queue queue on which a work item may appear.
184 * queue
187 static void queue_flusher_locked(struct k_work_q *queue, in queue_flusher_locked() argument
194 sys_slist_insert(&queue->pending, &work->node, in queue_flusher_locked()
197 sys_slist_prepend(&queue->pending, &flusher->work.node); in queue_flusher_locked()
201 /* Try to remove a work item from the given queue.
205 * @param queue the queue from which the work should be removed
206 * @param work work that may be on the queue
208 static inline void queue_remove_locked(struct k_work_q *queue, in queue_remove_locked() argument
212 (void)sys_slist_find_and_remove(&queue->pending, &work->node); in queue_remove_locked()
216 /* Potentially notify a queue that it needs to look for pending work.
218 * This may make the work queue thread ready, but as the lock is held it
222 * @param queue to be notified. If this is null no notification is required.
224 * @return true if and only if the queue was notified and woken, i.e. a
227 static inline bool notify_queue_locked(struct k_work_q *queue) in notify_queue_locked() argument
231 if (queue != NULL) { in notify_queue_locked()
232 rv = z_sched_wake(&queue->notifyq, 0, NULL); in notify_queue_locked()
238 /* Submit an work item to a queue if queue state allows new work.
240 * Submission is rejected if no queue is provided, or if the queue is
241 * draining and the work isn't being submitted from the queue's
245 * Conditionally notifies queue.
247 * @param queue the queue to which work should be submitted. This may
253 * @retval -EINVAL if no queue is provided
254 * @retval -ENODEV if the queue is not started
257 static inline int queue_submit_locked(struct k_work_q *queue, in queue_submit_locked() argument
260 if (queue == NULL) { in queue_submit_locked()
265 bool chained = (arch_current_thread() == &queue->thread) && !k_is_in_isr(); in queue_submit_locked()
266 bool draining = flag_test(&queue->flags, K_WORK_QUEUE_DRAIN_BIT); in queue_submit_locked()
267 bool plugged = flag_test(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT); in queue_submit_locked()
271 * * -ENODEV if the queue isn't running. in queue_submit_locked()
276 if (!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT)) { in queue_submit_locked()
283 sys_slist_append(&queue->pending, &work->node); in queue_submit_locked()
285 (void)notify_queue_locked(queue); in queue_submit_locked()
291 /* Attempt to submit work to a queue.
295 * * no candidate queue can be identified;
296 * * the candidate queue rejects the submission.
299 * Conditionally notifies queue.
303 * @param queuep pointer to a queue reference. On input this should
304 * dereference to the proposed queue (which may be null); after completion it
306 * the queue it was submitted to. That may or may not be the queue provided
309 * @retval 0 if work was already submitted to a queue
310 * @retval 1 if work was not submitted and has been queued to @p queue
311 * @retval 2 if work was running and has been queued to the queue that was
313 * @retval -EBUSY if canceling or submission was rejected by queue
314 * @retval -EINVAL if no queue is provided
315 * @retval -ENODEV if the queue is not started
329 /* If no queue specified resubmit to last queue. in submit_to_queue_locked()
332 *queuep = work->queue; in submit_to_queue_locked()
336 * queue it's running on to prevent handler in submit_to_queue_locked()
340 __ASSERT_NO_MSG(work->queue != NULL); in submit_to_queue_locked()
341 *queuep = work->queue; in submit_to_queue_locked()
351 work->queue = *queuep; in submit_to_queue_locked()
364 /* Submit work to a queue but do not yield the current thread.
370 * @param queuep pointer to a queue reference.
375 int z_work_submit_to_queue(struct k_work_q *queue, in z_work_submit_to_queue() argument
383 int ret = submit_to_queue_locked(work, &queue); in z_work_submit_to_queue()
390 int k_work_submit_to_queue(struct k_work_q *queue, in k_work_submit_to_queue() argument
393 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, submit_to_queue, queue, work); in k_work_submit_to_queue()
395 int ret = z_work_submit_to_queue(queue, work); in k_work_submit_to_queue()
400 * if the queue state changed. in k_work_submit_to_queue()
406 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, submit_to_queue, queue, work, ret); in k_work_submit_to_queue()
444 struct k_work_q *queue = work->queue; in work_flush_locked() local
446 __ASSERT_NO_MSG(queue != NULL); in work_flush_locked()
448 queue_flusher_locked(queue, work, flusher); in work_flush_locked()
449 notify_queue_locked(queue); in work_flush_locked()
502 /* Remove it from the queue, if it's queued. */ in cancel_async_locked()
503 queue_remove_locked(work->queue, work); in cancel_async_locked()
602 /* Loop executed by a work queue thread.
604 * @param workq_ptr pointer to the work queue structure
611 struct k_work_q *queue = (struct k_work_q *)workq_ptr; in work_queue_main() local
621 node = sys_slist_get(&queue->pending); in work_queue_main()
626 flag_set(&queue->flags, K_WORK_QUEUE_BUSY_BIT); in work_queue_main()
644 } else if (flag_test_and_clear(&queue->flags, in work_queue_main()
652 * here doesn't mean that the queue will allow new in work_queue_main()
655 (void)z_sched_wake_all(&queue->drainq, 1, NULL); in work_queue_main()
656 } else if (flag_test(&queue->flags, K_WORK_QUEUE_STOP_BIT)) { in work_queue_main()
657 /* User has requested that the queue stop. Clear the status flags and exit. in work_queue_main()
659 flags_set(&queue->flags, 0); in work_queue_main()
663 /* No work is available and no queue state requires in work_queue_main()
676 (void)z_sched_wait(&lock, key, &queue->notifyq, in work_queue_main()
701 flag_clear(&queue->flags, K_WORK_QUEUE_BUSY_BIT); in work_queue_main()
702 yield = !flag_test(&queue->flags, K_WORK_QUEUE_NO_YIELD_BIT); in work_queue_main()
705 /* Optionally yield to prevent the work queue from in work_queue_main()
714 void k_work_queue_init(struct k_work_q *queue) in k_work_queue_init() argument
716 __ASSERT_NO_MSG(queue != NULL); in k_work_queue_init()
718 *queue = (struct k_work_q) { in k_work_queue_init()
722 SYS_PORT_TRACING_OBJ_INIT(k_work_queue, queue); in k_work_queue_init()
725 void k_work_queue_start(struct k_work_q *queue, in k_work_queue_start() argument
731 __ASSERT_NO_MSG(queue); in k_work_queue_start()
733 __ASSERT_NO_MSG(!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT)); in k_work_queue_start()
736 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, start, queue); in k_work_queue_start()
738 sys_slist_init(&queue->pending); in k_work_queue_start()
739 z_waitq_init(&queue->notifyq); in k_work_queue_start()
740 z_waitq_init(&queue->drainq); in k_work_queue_start()
750 flags_set(&queue->flags, flags); in k_work_queue_start()
752 (void)k_thread_create(&queue->thread, stack, stack_size, in k_work_queue_start()
753 work_queue_main, queue, NULL, NULL, in k_work_queue_start()
757 k_thread_name_set(&queue->thread, cfg->name); in k_work_queue_start()
761 queue->thread.base.user_options |= K_ESSENTIAL; in k_work_queue_start()
764 k_thread_start(&queue->thread); in k_work_queue_start()
766 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, start, queue); in k_work_queue_start()
769 int k_work_queue_drain(struct k_work_q *queue, in k_work_queue_drain() argument
772 __ASSERT_NO_MSG(queue); in k_work_queue_drain()
775 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, drain, queue); in k_work_queue_drain()
780 if (((flags_get(&queue->flags) in k_work_queue_drain()
783 || !sys_slist_is_empty(&queue->pending)) { in k_work_queue_drain()
784 flag_set(&queue->flags, K_WORK_QUEUE_DRAIN_BIT); in k_work_queue_drain()
786 flag_set(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT); in k_work_queue_drain()
789 notify_queue_locked(queue); in k_work_queue_drain()
790 ret = z_sched_wait(&lock, key, &queue->drainq, in k_work_queue_drain()
796 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, drain, queue, ret); in k_work_queue_drain()
801 int k_work_queue_unplug(struct k_work_q *queue) in k_work_queue_unplug() argument
803 __ASSERT_NO_MSG(queue); in k_work_queue_unplug()
805 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, unplug, queue); in k_work_queue_unplug()
810 if (flag_test_and_clear(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT)) { in k_work_queue_unplug()
816 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, unplug, queue, ret); in k_work_queue_unplug()
821 int k_work_queue_stop(struct k_work_q *queue, k_timeout_t timeout) in k_work_queue_stop() argument
823 __ASSERT_NO_MSG(queue); in k_work_queue_stop()
825 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, stop, queue, timeout); in k_work_queue_stop()
828 if (!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT)) { in k_work_queue_stop()
830 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, stop, queue, timeout, -EALREADY); in k_work_queue_stop()
834 if (!flag_test(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT)) { in k_work_queue_stop()
836 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, stop, queue, timeout, -EBUSY); in k_work_queue_stop()
840 flag_set(&queue->flags, K_WORK_QUEUE_STOP_BIT); in k_work_queue_stop()
841 notify_queue_locked(queue); in k_work_queue_stop()
843 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work_queue, stop, queue, timeout); in k_work_queue_stop()
844 if (k_thread_join(&queue->thread, timeout)) { in k_work_queue_stop()
846 flag_clear(&queue->flags, K_WORK_QUEUE_STOP_BIT); in k_work_queue_stop()
848 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, stop, queue, timeout, -ETIMEDOUT); in k_work_queue_stop()
852 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, stop, queue, timeout, 0); in k_work_queue_stop()
870 struct k_work_q *queue = NULL; in work_timeout() local
873 * state and submit it to the queue. If successful the queue will be in work_timeout()
880 queue = dw->queue; in work_timeout()
881 (void)submit_to_queue_locked(wp, &queue); in work_timeout()
930 * @param queuep pointer to a pointer to a queue. On input this
931 * should dereference to the proposed queue (which may be null); after
933 * submitted will reference the queue it was submitted to. That may
934 * or may not be the queue provided on input.
955 dwork->queue = *queuep; in schedule_for_queue_locked()
1010 int k_work_schedule_for_queue(struct k_work_q *queue, struct k_work_delayable *dwork, in k_work_schedule_for_queue() argument
1013 __ASSERT_NO_MSG(queue != NULL); in k_work_schedule_for_queue()
1016 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, schedule_for_queue, queue, dwork, delay); in k_work_schedule_for_queue()
1024 ret = schedule_for_queue_locked(&queue, dwork, delay); in k_work_schedule_for_queue()
1029 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, schedule_for_queue, queue, dwork, delay, ret); in k_work_schedule_for_queue()
1045 int k_work_reschedule_for_queue(struct k_work_q *queue, struct k_work_delayable *dwork, in k_work_reschedule_for_queue() argument
1048 __ASSERT_NO_MSG(queue != NULL); in k_work_reschedule_for_queue()
1051 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, reschedule_for_queue, queue, dwork, delay); in k_work_reschedule_for_queue()
1060 ret = schedule_for_queue_locked(&queue, dwork, delay); in k_work_reschedule_for_queue()
1064 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, reschedule_for_queue, queue, dwork, delay, ret); in k_work_reschedule_for_queue()
1157 struct k_work_q *queue = dwork->queue; in k_work_flush_delayable() local
1159 (void)submit_to_queue_locked(work, &queue); in k_work_flush_delayable()