1 /*
2 * Copyright (c) 2020 Nordic Semiconductor ASA
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /**
8 * @file
9 *
10 * Second generation work queue implementation
11 */
12
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15 #include <wait_q.h>
16 #include <zephyr/spinlock.h>
17 #include <errno.h>
18 #include <ksched.h>
19 #include <zephyr/sys/printk.h>
20
flag_clear(uint32_t * flagp,uint32_t bit)21 static inline void flag_clear(uint32_t *flagp,
22 uint32_t bit)
23 {
24 *flagp &= ~BIT(bit);
25 }
26
flag_set(uint32_t * flagp,uint32_t bit)27 static inline void flag_set(uint32_t *flagp,
28 uint32_t bit)
29 {
30 *flagp |= BIT(bit);
31 }
32
flag_test(const uint32_t * flagp,uint32_t bit)33 static inline bool flag_test(const uint32_t *flagp,
34 uint32_t bit)
35 {
36 return (*flagp & BIT(bit)) != 0U;
37 }
38
flag_test_and_clear(uint32_t * flagp,int bit)39 static inline bool flag_test_and_clear(uint32_t *flagp,
40 int bit)
41 {
42 bool ret = flag_test(flagp, bit);
43
44 flag_clear(flagp, bit);
45
46 return ret;
47 }
48
flags_set(uint32_t * flagp,uint32_t flags)49 static inline void flags_set(uint32_t *flagp,
50 uint32_t flags)
51 {
52 *flagp = flags;
53 }
54
flags_get(const uint32_t * flagp)55 static inline uint32_t flags_get(const uint32_t *flagp)
56 {
57 return *flagp;
58 }
59
60 /* Lock to protect the internal state of all work items, work queues,
61 * and pending_cancels.
62 */
63 static struct k_spinlock lock;
64
65 /* Invoked by work thread */
handle_flush(struct k_work * work)66 static void handle_flush(struct k_work *work) { }
67
init_flusher(struct z_work_flusher * flusher)68 static inline void init_flusher(struct z_work_flusher *flusher)
69 {
70 struct k_work *work = &flusher->work;
71 k_sem_init(&flusher->sem, 0, 1);
72 k_work_init(&flusher->work, handle_flush);
73 flag_set(&work->flags, K_WORK_FLUSHING_BIT);
74 }
75
76 /* List of pending cancellations. */
77 static sys_slist_t pending_cancels;
78
79 /* Initialize a canceler record and add it to the list of pending
80 * cancels.
81 *
82 * Invoked with work lock held.
83 *
84 * @param canceler the structure used to notify a waiting process.
85 * @param work the work structure that is to be canceled
86 */
init_work_cancel(struct z_work_canceller * canceler,struct k_work * work)87 static inline void init_work_cancel(struct z_work_canceller *canceler,
88 struct k_work *work)
89 {
90 k_sem_init(&canceler->sem, 0, 1);
91 canceler->work = work;
92 sys_slist_append(&pending_cancels, &canceler->node);
93 }
94
95 /* Complete flushing of a work item.
96 *
97 * Invoked with work lock held.
98 *
99 * Invoked from a work queue thread.
100 *
101 * Reschedules.
102 *
103 * @param work the work structure that has completed flushing.
104 */
finalize_flush_locked(struct k_work * work)105 static void finalize_flush_locked(struct k_work *work)
106 {
107 struct z_work_flusher *flusher
108 = CONTAINER_OF(work, struct z_work_flusher, work);
109
110 flag_clear(&work->flags, K_WORK_FLUSHING_BIT);
111
112 k_sem_give(&flusher->sem);
113 };
114
115 /* Complete cancellation of a work item and unlock held lock.
116 *
117 * Invoked with work lock held.
118 *
119 * Invoked from a work queue thread.
120 *
121 * Reschedules.
122 *
123 * @param work the work structure that has completed cancellation
124 */
finalize_cancel_locked(struct k_work * work)125 static void finalize_cancel_locked(struct k_work *work)
126 {
127 struct z_work_canceller *wc, *tmp;
128 sys_snode_t *prev = NULL;
129
130 /* Clear this first, so released high-priority threads don't
131 * see it when doing things.
132 */
133 flag_clear(&work->flags, K_WORK_CANCELING_BIT);
134
135 /* Search for and remove the matching container, and release
136 * what's waiting for the completion. The same work item can
137 * appear multiple times in the list if multiple threads
138 * attempt to cancel it.
139 */
140 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&pending_cancels, wc, tmp, node) {
141 if (wc->work == work) {
142 sys_slist_remove(&pending_cancels, prev, &wc->node);
143 k_sem_give(&wc->sem);
144 break;
145 }
146 prev = &wc->node;
147 }
148 }
149
k_work_init(struct k_work * work,k_work_handler_t handler)150 void k_work_init(struct k_work *work,
151 k_work_handler_t handler)
152 {
153 __ASSERT_NO_MSG(work != NULL);
154 __ASSERT_NO_MSG(handler != NULL);
155
156 *work = (struct k_work)Z_WORK_INITIALIZER(handler);
157
158 SYS_PORT_TRACING_OBJ_INIT(k_work, work);
159 }
160
work_busy_get_locked(const struct k_work * work)161 static inline int work_busy_get_locked(const struct k_work *work)
162 {
163 return flags_get(&work->flags) & K_WORK_MASK;
164 }
165
k_work_busy_get(const struct k_work * work)166 int k_work_busy_get(const struct k_work *work)
167 {
168 k_spinlock_key_t key = k_spin_lock(&lock);
169 int ret = work_busy_get_locked(work);
170
171 k_spin_unlock(&lock, key);
172
173 return ret;
174 }
175
176 /* Add a flusher work item to the queue.
177 *
178 * Invoked with work lock held.
179 *
180 * Caller must notify queue of pending work.
181 *
182 * @param queue queue on which a work item may appear.
183 * @param work the work item that is either queued or running on @p
184 * queue
185 * @param flusher an uninitialized/unused flusher object
186 */
queue_flusher_locked(struct k_work_q * queue,struct k_work * work,struct z_work_flusher * flusher)187 static void queue_flusher_locked(struct k_work_q *queue,
188 struct k_work *work,
189 struct z_work_flusher *flusher)
190 {
191 init_flusher(flusher);
192
193 if ((flags_get(&work->flags) & K_WORK_QUEUED) != 0U) {
194 sys_slist_insert(&queue->pending, &work->node,
195 &flusher->work.node);
196 } else {
197 sys_slist_prepend(&queue->pending, &flusher->work.node);
198 }
199 }
200
201 /* Try to remove a work item from the given queue.
202 *
203 * Invoked with work lock held.
204 *
205 * @param queue the queue from which the work should be removed
206 * @param work work that may be on the queue
207 */
queue_remove_locked(struct k_work_q * queue,struct k_work * work)208 static inline void queue_remove_locked(struct k_work_q *queue,
209 struct k_work *work)
210 {
211 if (flag_test_and_clear(&work->flags, K_WORK_QUEUED_BIT)) {
212 (void)sys_slist_find_and_remove(&queue->pending, &work->node);
213 }
214 }
215
216 /* Potentially notify a queue that it needs to look for pending work.
217 *
218 * This may make the work queue thread ready, but as the lock is held it
219 * will not be a reschedule point. Callers should yield after the lock is
220 * released where appropriate (generally if this returns true).
221 *
222 * @param queue to be notified. If this is null no notification is required.
223 *
224 * @return true if and only if the queue was notified and woken, i.e. a
225 * reschedule is pending.
226 */
notify_queue_locked(struct k_work_q * queue)227 static inline bool notify_queue_locked(struct k_work_q *queue)
228 {
229 bool rv = false;
230
231 if (queue != NULL) {
232 rv = z_sched_wake(&queue->notifyq, 0, NULL);
233 }
234
235 return rv;
236 }
237
238 /* Submit an work item to a queue if queue state allows new work.
239 *
240 * Submission is rejected if no queue is provided, or if the queue is
241 * draining and the work isn't being submitted from the queue's
242 * thread (chained submission).
243 *
244 * Invoked with work lock held.
245 * Conditionally notifies queue.
246 *
247 * @param queue the queue to which work should be submitted. This may
248 * be null, in which case the submission will fail.
249 *
250 * @param work to be submitted
251 *
252 * @retval 1 if successfully queued
253 * @retval -EINVAL if no queue is provided
254 * @retval -ENODEV if the queue is not started
255 * @retval -EBUSY if the submission was rejected (draining, plugged)
256 */
queue_submit_locked(struct k_work_q * queue,struct k_work * work)257 static inline int queue_submit_locked(struct k_work_q *queue,
258 struct k_work *work)
259 {
260 if (queue == NULL) {
261 return -EINVAL;
262 }
263
264 int ret;
265 bool chained = (arch_current_thread() == &queue->thread) && !k_is_in_isr();
266 bool draining = flag_test(&queue->flags, K_WORK_QUEUE_DRAIN_BIT);
267 bool plugged = flag_test(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT);
268
269 /* Test for acceptability, in priority order:
270 *
271 * * -ENODEV if the queue isn't running.
272 * * -EBUSY if draining and not chained
273 * * -EBUSY if plugged and not draining
274 * * otherwise OK
275 */
276 if (!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT)) {
277 ret = -ENODEV;
278 } else if (draining && !chained) {
279 ret = -EBUSY;
280 } else if (plugged && !draining) {
281 ret = -EBUSY;
282 } else {
283 sys_slist_append(&queue->pending, &work->node);
284 ret = 1;
285 (void)notify_queue_locked(queue);
286 }
287
288 return ret;
289 }
290
291 /* Attempt to submit work to a queue.
292 *
293 * The submission can fail if:
294 * * the work is cancelling,
295 * * no candidate queue can be identified;
296 * * the candidate queue rejects the submission.
297 *
298 * Invoked with work lock held.
299 * Conditionally notifies queue.
300 *
301 * @param work the work structure to be submitted
302
303 * @param queuep pointer to a queue reference. On input this should
304 * dereference to the proposed queue (which may be null); after completion it
305 * will be null if the work was not submitted or if submitted will reference
306 * the queue it was submitted to. That may or may not be the queue provided
307 * on input.
308 *
309 * @retval 0 if work was already submitted to a queue
310 * @retval 1 if work was not submitted and has been queued to @p queue
311 * @retval 2 if work was running and has been queued to the queue that was
312 * running it
313 * @retval -EBUSY if canceling or submission was rejected by queue
314 * @retval -EINVAL if no queue is provided
315 * @retval -ENODEV if the queue is not started
316 */
submit_to_queue_locked(struct k_work * work,struct k_work_q ** queuep)317 static int submit_to_queue_locked(struct k_work *work,
318 struct k_work_q **queuep)
319 {
320 int ret = 0;
321
322 if (flag_test(&work->flags, K_WORK_CANCELING_BIT)) {
323 /* Disallowed */
324 ret = -EBUSY;
325 } else if (!flag_test(&work->flags, K_WORK_QUEUED_BIT)) {
326 /* Not currently queued */
327 ret = 1;
328
329 /* If no queue specified resubmit to last queue.
330 */
331 if (*queuep == NULL) {
332 *queuep = work->queue;
333 }
334
335 /* If the work is currently running we have to use the
336 * queue it's running on to prevent handler
337 * re-entrancy.
338 */
339 if (flag_test(&work->flags, K_WORK_RUNNING_BIT)) {
340 __ASSERT_NO_MSG(work->queue != NULL);
341 *queuep = work->queue;
342 ret = 2;
343 }
344
345 int rc = queue_submit_locked(*queuep, work);
346
347 if (rc < 0) {
348 ret = rc;
349 } else {
350 flag_set(&work->flags, K_WORK_QUEUED_BIT);
351 work->queue = *queuep;
352 }
353 } else {
354 /* Already queued, do nothing. */
355 }
356
357 if (ret <= 0) {
358 *queuep = NULL;
359 }
360
361 return ret;
362 }
363
364 /* Submit work to a queue but do not yield the current thread.
365 *
366 * Intended for internal use.
367 *
368 * See also submit_to_queue_locked().
369 *
370 * @param queuep pointer to a queue reference.
371 * @param work the work structure to be submitted
372 *
373 * @retval see submit_to_queue_locked()
374 */
z_work_submit_to_queue(struct k_work_q * queue,struct k_work * work)375 int z_work_submit_to_queue(struct k_work_q *queue,
376 struct k_work *work)
377 {
378 __ASSERT_NO_MSG(work != NULL);
379 __ASSERT_NO_MSG(work->handler != NULL);
380
381 k_spinlock_key_t key = k_spin_lock(&lock);
382
383 int ret = submit_to_queue_locked(work, &queue);
384
385 k_spin_unlock(&lock, key);
386
387 return ret;
388 }
389
k_work_submit_to_queue(struct k_work_q * queue,struct k_work * work)390 int k_work_submit_to_queue(struct k_work_q *queue,
391 struct k_work *work)
392 {
393 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, submit_to_queue, queue, work);
394
395 int ret = z_work_submit_to_queue(queue, work);
396
397 /* submit_to_queue_locked() won't reschedule on its own
398 * (really it should, otherwise this process will result in
399 * spurious calls to z_swap() due to the race), so do it here
400 * if the queue state changed.
401 */
402 if (ret > 0) {
403 z_reschedule_unlocked();
404 }
405
406 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, submit_to_queue, queue, work, ret);
407
408 return ret;
409 }
410
k_work_submit(struct k_work * work)411 int k_work_submit(struct k_work *work)
412 {
413 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, submit, work);
414
415 int ret = k_work_submit_to_queue(&k_sys_work_q, work);
416
417 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, submit, work, ret);
418
419 return ret;
420 }
421
422 /* Flush the work item if necessary.
423 *
424 * Flushing is necessary only if the work is either queued or running.
425 *
426 * Invoked with work lock held by key.
427 * Sleeps.
428 *
429 * @param work the work item that is to be flushed
430 * @param flusher state used to synchronize the flush
431 *
432 * @retval true if work is queued or running. If this happens the
433 * caller must take the flusher semaphore after releasing the lock.
434 *
435 * @retval false otherwise. No wait required.
436 */
work_flush_locked(struct k_work * work,struct z_work_flusher * flusher)437 static bool work_flush_locked(struct k_work *work,
438 struct z_work_flusher *flusher)
439 {
440 bool need_flush = (flags_get(&work->flags)
441 & (K_WORK_QUEUED | K_WORK_RUNNING)) != 0U;
442
443 if (need_flush) {
444 struct k_work_q *queue = work->queue;
445
446 __ASSERT_NO_MSG(queue != NULL);
447
448 queue_flusher_locked(queue, work, flusher);
449 notify_queue_locked(queue);
450 }
451
452 return need_flush;
453 }
454
k_work_flush(struct k_work * work,struct k_work_sync * sync)455 bool k_work_flush(struct k_work *work,
456 struct k_work_sync *sync)
457 {
458 __ASSERT_NO_MSG(work != NULL);
459 __ASSERT_NO_MSG(!flag_test(&work->flags, K_WORK_DELAYABLE_BIT));
460 __ASSERT_NO_MSG(!k_is_in_isr());
461 __ASSERT_NO_MSG(sync != NULL);
462 #ifdef CONFIG_KERNEL_COHERENCE
463 __ASSERT_NO_MSG(arch_mem_coherent(sync));
464 #endif /* CONFIG_KERNEL_COHERENCE */
465
466 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, flush, work);
467
468 struct z_work_flusher *flusher = &sync->flusher;
469 k_spinlock_key_t key = k_spin_lock(&lock);
470
471 bool need_flush = work_flush_locked(work, flusher);
472
473 k_spin_unlock(&lock, key);
474
475 /* If necessary wait until the flusher item completes */
476 if (need_flush) {
477 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work, flush, work, K_FOREVER);
478
479 k_sem_take(&flusher->sem, K_FOREVER);
480 }
481
482 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, flush, work, need_flush);
483
484 return need_flush;
485 }
486
487 /* Execute the non-waiting steps necessary to cancel a work item.
488 *
489 * Invoked with work lock held.
490 *
491 * @param work the work item to be canceled.
492 *
493 * @retval true if we need to wait for the work item to finish canceling
494 * @retval false if the work item is idle
495 *
496 * @return k_busy_wait() captured under lock
497 */
cancel_async_locked(struct k_work * work)498 static int cancel_async_locked(struct k_work *work)
499 {
500 /* If we haven't already started canceling, do it now. */
501 if (!flag_test(&work->flags, K_WORK_CANCELING_BIT)) {
502 /* Remove it from the queue, if it's queued. */
503 queue_remove_locked(work->queue, work);
504 }
505
506 /* If it's still busy after it's been dequeued, then flag it
507 * as canceling.
508 */
509 int ret = work_busy_get_locked(work);
510
511 if (ret != 0) {
512 flag_set(&work->flags, K_WORK_CANCELING_BIT);
513 ret = work_busy_get_locked(work);
514 }
515
516 return ret;
517 }
518
519 /* Complete cancellation necessary, release work lock, and wait if
520 * necessary.
521 *
522 * Invoked with work lock held by key.
523 * Sleeps.
524 *
525 * @param work work that is being canceled
526 * @param canceller state used to synchronize the cancellation
527 * @param key used by work lock
528 *
529 * @retval true if and only if the work was still active on entry. The caller
530 * must wait on the canceller semaphore after releasing the lock.
531 *
532 * @retval false if work was idle on entry. The caller need not wait.
533 */
cancel_sync_locked(struct k_work * work,struct z_work_canceller * canceller)534 static bool cancel_sync_locked(struct k_work *work,
535 struct z_work_canceller *canceller)
536 {
537 bool ret = flag_test(&work->flags, K_WORK_CANCELING_BIT);
538
539 /* If something's still running then we have to wait for
540 * completion, which is indicated when finish_cancel() gets
541 * invoked.
542 */
543 if (ret) {
544 init_work_cancel(canceller, work);
545 }
546
547 return ret;
548 }
549
k_work_cancel(struct k_work * work)550 int k_work_cancel(struct k_work *work)
551 {
552 __ASSERT_NO_MSG(work != NULL);
553 __ASSERT_NO_MSG(!flag_test(&work->flags, K_WORK_DELAYABLE_BIT));
554
555 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel, work);
556
557 k_spinlock_key_t key = k_spin_lock(&lock);
558 int ret = cancel_async_locked(work);
559
560 k_spin_unlock(&lock, key);
561
562 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel, work, ret);
563
564 return ret;
565 }
566
k_work_cancel_sync(struct k_work * work,struct k_work_sync * sync)567 bool k_work_cancel_sync(struct k_work *work,
568 struct k_work_sync *sync)
569 {
570 __ASSERT_NO_MSG(work != NULL);
571 __ASSERT_NO_MSG(sync != NULL);
572 __ASSERT_NO_MSG(!flag_test(&work->flags, K_WORK_DELAYABLE_BIT));
573 __ASSERT_NO_MSG(!k_is_in_isr());
574 #ifdef CONFIG_KERNEL_COHERENCE
575 __ASSERT_NO_MSG(arch_mem_coherent(sync));
576 #endif /* CONFIG_KERNEL_COHERENCE */
577
578 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel_sync, work, sync);
579
580 struct z_work_canceller *canceller = &sync->canceller;
581 k_spinlock_key_t key = k_spin_lock(&lock);
582 bool pending = (work_busy_get_locked(work) != 0U);
583 bool need_wait = false;
584
585 if (pending) {
586 (void)cancel_async_locked(work);
587 need_wait = cancel_sync_locked(work, canceller);
588 }
589
590 k_spin_unlock(&lock, key);
591
592 if (need_wait) {
593 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work, cancel_sync, work, sync);
594
595 k_sem_take(&canceller->sem, K_FOREVER);
596 }
597
598 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel_sync, work, sync, pending);
599 return pending;
600 }
601
602 /* Loop executed by a work queue thread.
603 *
604 * @param workq_ptr pointer to the work queue structure
605 */
work_queue_main(void * workq_ptr,void * p2,void * p3)606 static void work_queue_main(void *workq_ptr, void *p2, void *p3)
607 {
608 ARG_UNUSED(p2);
609 ARG_UNUSED(p3);
610
611 struct k_work_q *queue = (struct k_work_q *)workq_ptr;
612
613 while (true) {
614 sys_snode_t *node;
615 struct k_work *work = NULL;
616 k_work_handler_t handler = NULL;
617 k_spinlock_key_t key = k_spin_lock(&lock);
618 bool yield;
619
620 /* Check for and prepare any new work. */
621 node = sys_slist_get(&queue->pending);
622 if (node != NULL) {
623 /* Mark that there's some work active that's
624 * not on the pending list.
625 */
626 flag_set(&queue->flags, K_WORK_QUEUE_BUSY_BIT);
627 work = CONTAINER_OF(node, struct k_work, node);
628 flag_set(&work->flags, K_WORK_RUNNING_BIT);
629 flag_clear(&work->flags, K_WORK_QUEUED_BIT);
630
631 /* Static code analysis tool can raise a false-positive violation
632 * in the line below that 'work' is checked for null after being
633 * dereferenced.
634 *
635 * The work is figured out by CONTAINER_OF, as a container
636 * of type struct k_work that contains the node.
637 * The only way for it to be NULL is if node would be a member
638 * of struct k_work object that has been placed at address NULL,
639 * which should never happen, even line 'if (work != NULL)'
640 * ensures that.
641 * This means that if node is not NULL, then work will not be NULL.
642 */
643 handler = work->handler;
644 } else if (flag_test_and_clear(&queue->flags,
645 K_WORK_QUEUE_DRAIN_BIT)) {
646 /* Not busy and draining: move threads waiting for
647 * drain to ready state. The held spinlock inhibits
648 * immediate reschedule; released threads get their
649 * chance when this invokes z_sched_wait() below.
650 *
651 * We don't touch K_WORK_QUEUE_PLUGGABLE, so getting
652 * here doesn't mean that the queue will allow new
653 * submissions.
654 */
655 (void)z_sched_wake_all(&queue->drainq, 1, NULL);
656 } else if (flag_test(&queue->flags, K_WORK_QUEUE_STOP_BIT)) {
657 /* User has requested that the queue stop. Clear the status flags and exit.
658 */
659 flags_set(&queue->flags, 0);
660 k_spin_unlock(&lock, key);
661 return;
662 } else {
663 /* No work is available and no queue state requires
664 * special handling.
665 */
666 ;
667 }
668
669 if (work == NULL) {
670 /* Nothing's had a chance to add work since we took
671 * the lock, and we didn't find work nor got asked to
672 * stop. Just go to sleep: when something happens the
673 * work thread will be woken and we can check again.
674 */
675
676 (void)z_sched_wait(&lock, key, &queue->notifyq,
677 K_FOREVER, NULL);
678 continue;
679 }
680
681 k_spin_unlock(&lock, key);
682
683 __ASSERT_NO_MSG(handler != NULL);
684 handler(work);
685
686 /* Mark the work item as no longer running and deal
687 * with any cancellation and flushing issued while it
688 * was running. Clear the BUSY flag and optionally
689 * yield to prevent starving other threads.
690 */
691 key = k_spin_lock(&lock);
692
693 flag_clear(&work->flags, K_WORK_RUNNING_BIT);
694 if (flag_test(&work->flags, K_WORK_FLUSHING_BIT)) {
695 finalize_flush_locked(work);
696 }
697 if (flag_test(&work->flags, K_WORK_CANCELING_BIT)) {
698 finalize_cancel_locked(work);
699 }
700
701 flag_clear(&queue->flags, K_WORK_QUEUE_BUSY_BIT);
702 yield = !flag_test(&queue->flags, K_WORK_QUEUE_NO_YIELD_BIT);
703 k_spin_unlock(&lock, key);
704
705 /* Optionally yield to prevent the work queue from
706 * starving other threads.
707 */
708 if (yield) {
709 k_yield();
710 }
711 }
712 }
713
k_work_queue_init(struct k_work_q * queue)714 void k_work_queue_init(struct k_work_q *queue)
715 {
716 __ASSERT_NO_MSG(queue != NULL);
717
718 *queue = (struct k_work_q) {
719 .flags = 0,
720 };
721
722 SYS_PORT_TRACING_OBJ_INIT(k_work_queue, queue);
723 }
724
k_work_queue_start(struct k_work_q * queue,k_thread_stack_t * stack,size_t stack_size,int prio,const struct k_work_queue_config * cfg)725 void k_work_queue_start(struct k_work_q *queue,
726 k_thread_stack_t *stack,
727 size_t stack_size,
728 int prio,
729 const struct k_work_queue_config *cfg)
730 {
731 __ASSERT_NO_MSG(queue);
732 __ASSERT_NO_MSG(stack);
733 __ASSERT_NO_MSG(!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT));
734 uint32_t flags = K_WORK_QUEUE_STARTED;
735
736 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, start, queue);
737
738 sys_slist_init(&queue->pending);
739 z_waitq_init(&queue->notifyq);
740 z_waitq_init(&queue->drainq);
741
742 if ((cfg != NULL) && cfg->no_yield) {
743 flags |= K_WORK_QUEUE_NO_YIELD;
744 }
745
746 /* It hasn't actually been started yet, but all the state is in place
747 * so we can submit things and once the thread gets control it's ready
748 * to roll.
749 */
750 flags_set(&queue->flags, flags);
751
752 (void)k_thread_create(&queue->thread, stack, stack_size,
753 work_queue_main, queue, NULL, NULL,
754 prio, 0, K_FOREVER);
755
756 if ((cfg != NULL) && (cfg->name != NULL)) {
757 k_thread_name_set(&queue->thread, cfg->name);
758 }
759
760 if ((cfg != NULL) && (cfg->essential)) {
761 queue->thread.base.user_options |= K_ESSENTIAL;
762 }
763
764 k_thread_start(&queue->thread);
765
766 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, start, queue);
767 }
768
k_work_queue_drain(struct k_work_q * queue,bool plug)769 int k_work_queue_drain(struct k_work_q *queue,
770 bool plug)
771 {
772 __ASSERT_NO_MSG(queue);
773 __ASSERT_NO_MSG(!k_is_in_isr());
774
775 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, drain, queue);
776
777 int ret = 0;
778 k_spinlock_key_t key = k_spin_lock(&lock);
779
780 if (((flags_get(&queue->flags)
781 & (K_WORK_QUEUE_BUSY | K_WORK_QUEUE_DRAIN)) != 0U)
782 || plug
783 || !sys_slist_is_empty(&queue->pending)) {
784 flag_set(&queue->flags, K_WORK_QUEUE_DRAIN_BIT);
785 if (plug) {
786 flag_set(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT);
787 }
788
789 notify_queue_locked(queue);
790 ret = z_sched_wait(&lock, key, &queue->drainq,
791 K_FOREVER, NULL);
792 } else {
793 k_spin_unlock(&lock, key);
794 }
795
796 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, drain, queue, ret);
797
798 return ret;
799 }
800
k_work_queue_unplug(struct k_work_q * queue)801 int k_work_queue_unplug(struct k_work_q *queue)
802 {
803 __ASSERT_NO_MSG(queue);
804
805 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, unplug, queue);
806
807 int ret = -EALREADY;
808 k_spinlock_key_t key = k_spin_lock(&lock);
809
810 if (flag_test_and_clear(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT)) {
811 ret = 0;
812 }
813
814 k_spin_unlock(&lock, key);
815
816 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, unplug, queue, ret);
817
818 return ret;
819 }
820
k_work_queue_stop(struct k_work_q * queue,k_timeout_t timeout)821 int k_work_queue_stop(struct k_work_q *queue, k_timeout_t timeout)
822 {
823 __ASSERT_NO_MSG(queue);
824
825 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, stop, queue, timeout);
826 k_spinlock_key_t key = k_spin_lock(&lock);
827
828 if (!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT)) {
829 k_spin_unlock(&lock, key);
830 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, stop, queue, timeout, -EALREADY);
831 return -EALREADY;
832 }
833
834 if (!flag_test(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT)) {
835 k_spin_unlock(&lock, key);
836 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, stop, queue, timeout, -EBUSY);
837 return -EBUSY;
838 }
839
840 flag_set(&queue->flags, K_WORK_QUEUE_STOP_BIT);
841 notify_queue_locked(queue);
842 k_spin_unlock(&lock, key);
843 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work_queue, stop, queue, timeout);
844 if (k_thread_join(&queue->thread, timeout)) {
845 key = k_spin_lock(&lock);
846 flag_clear(&queue->flags, K_WORK_QUEUE_STOP_BIT);
847 k_spin_unlock(&lock, key);
848 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, stop, queue, timeout, -ETIMEDOUT);
849 return -ETIMEDOUT;
850 }
851
852 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, stop, queue, timeout, 0);
853 return 0;
854 }
855
856 #ifdef CONFIG_SYS_CLOCK_EXISTS
857
858 /* Timeout handler for delayable work.
859 *
860 * Invoked by timeout infrastructure.
861 * Takes and releases work lock.
862 * Conditionally reschedules.
863 */
work_timeout(struct _timeout * to)864 static void work_timeout(struct _timeout *to)
865 {
866 struct k_work_delayable *dw
867 = CONTAINER_OF(to, struct k_work_delayable, timeout);
868 struct k_work *wp = &dw->work;
869 k_spinlock_key_t key = k_spin_lock(&lock);
870 struct k_work_q *queue = NULL;
871
872 /* If the work is still marked delayed (should be) then clear that
873 * state and submit it to the queue. If successful the queue will be
874 * notified of new work at the next reschedule point.
875 *
876 * If not successful there is no notification that the work has been
877 * abandoned. Sorry.
878 */
879 if (flag_test_and_clear(&wp->flags, K_WORK_DELAYED_BIT)) {
880 queue = dw->queue;
881 (void)submit_to_queue_locked(wp, &queue);
882 }
883
884 k_spin_unlock(&lock, key);
885 }
886
k_work_init_delayable(struct k_work_delayable * dwork,k_work_handler_t handler)887 void k_work_init_delayable(struct k_work_delayable *dwork,
888 k_work_handler_t handler)
889 {
890 __ASSERT_NO_MSG(dwork != NULL);
891 __ASSERT_NO_MSG(handler != NULL);
892
893 *dwork = (struct k_work_delayable){
894 .work = {
895 .handler = handler,
896 .flags = K_WORK_DELAYABLE,
897 },
898 };
899 z_init_timeout(&dwork->timeout);
900
901 SYS_PORT_TRACING_OBJ_INIT(k_work_delayable, dwork);
902 }
903
work_delayable_busy_get_locked(const struct k_work_delayable * dwork)904 static inline int work_delayable_busy_get_locked(const struct k_work_delayable *dwork)
905 {
906 return flags_get(&dwork->work.flags) & K_WORK_MASK;
907 }
908
k_work_delayable_busy_get(const struct k_work_delayable * dwork)909 int k_work_delayable_busy_get(const struct k_work_delayable *dwork)
910 {
911 __ASSERT_NO_MSG(dwork != NULL);
912
913 k_spinlock_key_t key = k_spin_lock(&lock);
914 int ret = work_delayable_busy_get_locked(dwork);
915
916 k_spin_unlock(&lock, key);
917 return ret;
918 }
919
920 /* Attempt to schedule a work item for future (maybe immediate)
921 * submission.
922 *
923 * Invoked with work lock held.
924 *
925 * See also submit_to_queue_locked(), which implements this for a no-wait
926 * delay.
927 *
928 * Invoked with work lock held.
929 *
930 * @param queuep pointer to a pointer to a queue. On input this
931 * should dereference to the proposed queue (which may be null); after
932 * completion it will be null if the work was not submitted or if
933 * submitted will reference the queue it was submitted to. That may
934 * or may not be the queue provided on input.
935 *
936 * @param dwork the delayed work structure
937 *
938 * @param delay the delay to use before scheduling.
939 *
940 * @retval from submit_to_queue_locked() if delay is K_NO_WAIT; otherwise
941 * @retval 1 to indicate successfully scheduled.
942 */
schedule_for_queue_locked(struct k_work_q ** queuep,struct k_work_delayable * dwork,k_timeout_t delay)943 static int schedule_for_queue_locked(struct k_work_q **queuep,
944 struct k_work_delayable *dwork,
945 k_timeout_t delay)
946 {
947 int ret = 1;
948 struct k_work *work = &dwork->work;
949
950 if (K_TIMEOUT_EQ(delay, K_NO_WAIT)) {
951 return submit_to_queue_locked(work, queuep);
952 }
953
954 flag_set(&work->flags, K_WORK_DELAYED_BIT);
955 dwork->queue = *queuep;
956
957 /* Add timeout */
958 z_add_timeout(&dwork->timeout, work_timeout, delay);
959
960 return ret;
961 }
962
963 /* Unschedule delayable work.
964 *
965 * If the work is delayed, cancel the timeout and clear the delayed
966 * flag.
967 *
968 * Invoked with work lock held.
969 *
970 * @param dwork pointer to delayable work structure.
971 *
972 * @return true if and only if work had been delayed so the timeout
973 * was cancelled.
974 */
unschedule_locked(struct k_work_delayable * dwork)975 static inline bool unschedule_locked(struct k_work_delayable *dwork)
976 {
977 bool ret = false;
978 struct k_work *work = &dwork->work;
979
980 /* If scheduled, try to cancel. If it fails, that means the
981 * callback has been dequeued and will inevitably run (or has
982 * already run), so treat that as "undelayed" and return
983 * false.
984 */
985 if (flag_test_and_clear(&work->flags, K_WORK_DELAYED_BIT)) {
986 ret = z_abort_timeout(&dwork->timeout) == 0;
987 }
988
989 return ret;
990 }
991
992 /* Full cancellation of a delayable work item.
993 *
994 * Unschedules the delayed part then delegates to standard work
995 * cancellation.
996 *
997 * Invoked with work lock held.
998 *
999 * @param dwork delayable work item
1000 *
1001 * @return k_work_busy_get() flags
1002 */
cancel_delayable_async_locked(struct k_work_delayable * dwork)1003 static int cancel_delayable_async_locked(struct k_work_delayable *dwork)
1004 {
1005 (void)unschedule_locked(dwork);
1006
1007 return cancel_async_locked(&dwork->work);
1008 }
1009
k_work_schedule_for_queue(struct k_work_q * queue,struct k_work_delayable * dwork,k_timeout_t delay)1010 int k_work_schedule_for_queue(struct k_work_q *queue, struct k_work_delayable *dwork,
1011 k_timeout_t delay)
1012 {
1013 __ASSERT_NO_MSG(queue != NULL);
1014 __ASSERT_NO_MSG(dwork != NULL);
1015
1016 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, schedule_for_queue, queue, dwork, delay);
1017
1018 struct k_work *work = &dwork->work;
1019 int ret = 0;
1020 k_spinlock_key_t key = k_spin_lock(&lock);
1021
1022 /* Schedule the work item if it's idle or running. */
1023 if ((work_busy_get_locked(work) & ~K_WORK_RUNNING) == 0U) {
1024 ret = schedule_for_queue_locked(&queue, dwork, delay);
1025 }
1026
1027 k_spin_unlock(&lock, key);
1028
1029 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, schedule_for_queue, queue, dwork, delay, ret);
1030
1031 return ret;
1032 }
1033
k_work_schedule(struct k_work_delayable * dwork,k_timeout_t delay)1034 int k_work_schedule(struct k_work_delayable *dwork, k_timeout_t delay)
1035 {
1036 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, schedule, dwork, delay);
1037
1038 int ret = k_work_schedule_for_queue(&k_sys_work_q, dwork, delay);
1039
1040 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, schedule, dwork, delay, ret);
1041
1042 return ret;
1043 }
1044
k_work_reschedule_for_queue(struct k_work_q * queue,struct k_work_delayable * dwork,k_timeout_t delay)1045 int k_work_reschedule_for_queue(struct k_work_q *queue, struct k_work_delayable *dwork,
1046 k_timeout_t delay)
1047 {
1048 __ASSERT_NO_MSG(queue != NULL);
1049 __ASSERT_NO_MSG(dwork != NULL);
1050
1051 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, reschedule_for_queue, queue, dwork, delay);
1052
1053 int ret;
1054 k_spinlock_key_t key = k_spin_lock(&lock);
1055
1056 /* Remove any active scheduling. */
1057 (void)unschedule_locked(dwork);
1058
1059 /* Schedule the work item with the new parameters. */
1060 ret = schedule_for_queue_locked(&queue, dwork, delay);
1061
1062 k_spin_unlock(&lock, key);
1063
1064 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, reschedule_for_queue, queue, dwork, delay, ret);
1065
1066 return ret;
1067 }
1068
k_work_reschedule(struct k_work_delayable * dwork,k_timeout_t delay)1069 int k_work_reschedule(struct k_work_delayable *dwork, k_timeout_t delay)
1070 {
1071 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, reschedule, dwork, delay);
1072
1073 int ret = k_work_reschedule_for_queue(&k_sys_work_q, dwork, delay);
1074
1075 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, reschedule, dwork, delay, ret);
1076
1077 return ret;
1078 }
1079
k_work_cancel_delayable(struct k_work_delayable * dwork)1080 int k_work_cancel_delayable(struct k_work_delayable *dwork)
1081 {
1082 __ASSERT_NO_MSG(dwork != NULL);
1083
1084 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel_delayable, dwork);
1085
1086 k_spinlock_key_t key = k_spin_lock(&lock);
1087 int ret = cancel_delayable_async_locked(dwork);
1088
1089 k_spin_unlock(&lock, key);
1090
1091 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel_delayable, dwork, ret);
1092
1093 return ret;
1094 }
1095
k_work_cancel_delayable_sync(struct k_work_delayable * dwork,struct k_work_sync * sync)1096 bool k_work_cancel_delayable_sync(struct k_work_delayable *dwork,
1097 struct k_work_sync *sync)
1098 {
1099 __ASSERT_NO_MSG(dwork != NULL);
1100 __ASSERT_NO_MSG(sync != NULL);
1101 __ASSERT_NO_MSG(!k_is_in_isr());
1102 #ifdef CONFIG_KERNEL_COHERENCE
1103 __ASSERT_NO_MSG(arch_mem_coherent(sync));
1104 #endif /* CONFIG_KERNEL_COHERENCE */
1105
1106 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel_delayable_sync, dwork, sync);
1107
1108 struct z_work_canceller *canceller = &sync->canceller;
1109 k_spinlock_key_t key = k_spin_lock(&lock);
1110 bool pending = (work_delayable_busy_get_locked(dwork) != 0U);
1111 bool need_wait = false;
1112
1113 if (pending) {
1114 (void)cancel_delayable_async_locked(dwork);
1115 need_wait = cancel_sync_locked(&dwork->work, canceller);
1116 }
1117
1118 k_spin_unlock(&lock, key);
1119
1120 if (need_wait) {
1121 k_sem_take(&canceller->sem, K_FOREVER);
1122 }
1123
1124 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel_delayable_sync, dwork, sync, pending);
1125 return pending;
1126 }
1127
k_work_flush_delayable(struct k_work_delayable * dwork,struct k_work_sync * sync)1128 bool k_work_flush_delayable(struct k_work_delayable *dwork,
1129 struct k_work_sync *sync)
1130 {
1131 __ASSERT_NO_MSG(dwork != NULL);
1132 __ASSERT_NO_MSG(sync != NULL);
1133 __ASSERT_NO_MSG(!k_is_in_isr());
1134 #ifdef CONFIG_KERNEL_COHERENCE
1135 __ASSERT_NO_MSG(arch_mem_coherent(sync));
1136 #endif /* CONFIG_KERNEL_COHERENCE */
1137
1138 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, flush_delayable, dwork, sync);
1139
1140 struct k_work *work = &dwork->work;
1141 struct z_work_flusher *flusher = &sync->flusher;
1142 k_spinlock_key_t key = k_spin_lock(&lock);
1143
1144 /* If it's idle release the lock and return immediately. */
1145 if (work_busy_get_locked(work) == 0U) {
1146 k_spin_unlock(&lock, key);
1147
1148 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, flush_delayable, dwork, sync, false);
1149
1150 return false;
1151 }
1152
1153 /* If unscheduling did something then submit it. Ignore a
1154 * failed submission (e.g. when cancelling).
1155 */
1156 if (unschedule_locked(dwork)) {
1157 struct k_work_q *queue = dwork->queue;
1158
1159 (void)submit_to_queue_locked(work, &queue);
1160 }
1161
1162 /* Wait for it to finish */
1163 bool need_flush = work_flush_locked(work, flusher);
1164
1165 k_spin_unlock(&lock, key);
1166
1167 /* If necessary wait until the flusher item completes */
1168 if (need_flush) {
1169 k_sem_take(&flusher->sem, K_FOREVER);
1170 }
1171
1172 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, flush_delayable, dwork, sync, need_flush);
1173
1174 return need_flush;
1175 }
1176
1177 #endif /* CONFIG_SYS_CLOCK_EXISTS */
1178