1 /*
2 * Copyright (c) 2020 Nordic Semiconductor ASA
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /**
8 * @file
9 *
10 * Second generation work queue implementation
11 */
12
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15 #include <wait_q.h>
16 #include <zephyr/spinlock.h>
17 #include <errno.h>
18 #include <ksched.h>
19 #include <zephyr/sys/printk.h>
20
flag_clear(uint32_t * flagp,uint32_t bit)21 static inline void flag_clear(uint32_t *flagp,
22 uint32_t bit)
23 {
24 *flagp &= ~BIT(bit);
25 }
26
flag_set(uint32_t * flagp,uint32_t bit)27 static inline void flag_set(uint32_t *flagp,
28 uint32_t bit)
29 {
30 *flagp |= BIT(bit);
31 }
32
flag_test(const uint32_t * flagp,uint32_t bit)33 static inline bool flag_test(const uint32_t *flagp,
34 uint32_t bit)
35 {
36 return (*flagp & BIT(bit)) != 0U;
37 }
38
flag_test_and_clear(uint32_t * flagp,int bit)39 static inline bool flag_test_and_clear(uint32_t *flagp,
40 int bit)
41 {
42 bool ret = flag_test(flagp, bit);
43
44 flag_clear(flagp, bit);
45
46 return ret;
47 }
48
flags_set(uint32_t * flagp,uint32_t flags)49 static inline void flags_set(uint32_t *flagp,
50 uint32_t flags)
51 {
52 *flagp = flags;
53 }
54
flags_get(const uint32_t * flagp)55 static inline uint32_t flags_get(const uint32_t *flagp)
56 {
57 return *flagp;
58 }
59
60 /* Lock to protect the internal state of all work items, work queues,
61 * and pending_cancels.
62 */
63 static struct k_spinlock lock;
64
65 /* Invoked by work thread */
handle_flush(struct k_work * work)66 static void handle_flush(struct k_work *work)
67 {
68 struct z_work_flusher *flusher
69 = CONTAINER_OF(work, struct z_work_flusher, work);
70
71 k_sem_give(&flusher->sem);
72 }
73
init_flusher(struct z_work_flusher * flusher)74 static inline void init_flusher(struct z_work_flusher *flusher)
75 {
76 k_sem_init(&flusher->sem, 0, 1);
77 k_work_init(&flusher->work, handle_flush);
78 }
79
80 /* List of pending cancellations. */
81 static sys_slist_t pending_cancels;
82
83 /* Initialize a canceler record and add it to the list of pending
84 * cancels.
85 *
86 * Invoked with work lock held.
87 *
88 * @param canceler the structure used to notify a waiting process.
89 * @param work the work structure that is to be canceled
90 */
init_work_cancel(struct z_work_canceller * canceler,struct k_work * work)91 static inline void init_work_cancel(struct z_work_canceller *canceler,
92 struct k_work *work)
93 {
94 k_sem_init(&canceler->sem, 0, 1);
95 canceler->work = work;
96 sys_slist_append(&pending_cancels, &canceler->node);
97 }
98
99 /* Complete cancellation of a work item and unlock held lock.
100 *
101 * Invoked with work lock held.
102 *
103 * Invoked from a work queue thread.
104 *
105 * Reschedules.
106 *
107 * @param work the work structure that has completed cancellation
108 */
finalize_cancel_locked(struct k_work * work)109 static void finalize_cancel_locked(struct k_work *work)
110 {
111 struct z_work_canceller *wc, *tmp;
112 sys_snode_t *prev = NULL;
113
114 /* Clear this first, so released high-priority threads don't
115 * see it when doing things.
116 */
117 flag_clear(&work->flags, K_WORK_CANCELING_BIT);
118
119 /* Search for and remove the matching container, and release
120 * what's waiting for the completion. The same work item can
121 * appear multiple times in the list if multiple threads
122 * attempt to cancel it.
123 */
124 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&pending_cancels, wc, tmp, node) {
125 if (wc->work == work) {
126 sys_slist_remove(&pending_cancels, prev, &wc->node);
127 k_sem_give(&wc->sem);
128 } else {
129 prev = &wc->node;
130 }
131 }
132 }
133
k_work_init(struct k_work * work,k_work_handler_t handler)134 void k_work_init(struct k_work *work,
135 k_work_handler_t handler)
136 {
137 __ASSERT_NO_MSG(work != NULL);
138 __ASSERT_NO_MSG(handler != NULL);
139
140 *work = (struct k_work)Z_WORK_INITIALIZER(handler);
141
142 SYS_PORT_TRACING_OBJ_INIT(k_work, work);
143 }
144
work_busy_get_locked(const struct k_work * work)145 static inline int work_busy_get_locked(const struct k_work *work)
146 {
147 return flags_get(&work->flags) & K_WORK_MASK;
148 }
149
k_work_busy_get(const struct k_work * work)150 int k_work_busy_get(const struct k_work *work)
151 {
152 k_spinlock_key_t key = k_spin_lock(&lock);
153 int ret = work_busy_get_locked(work);
154
155 k_spin_unlock(&lock, key);
156
157 return ret;
158 }
159
160 /* Add a flusher work item to the queue.
161 *
162 * Invoked with work lock held.
163 *
164 * Caller must notify queue of pending work.
165 *
166 * @param queue queue on which a work item may appear.
167 * @param work the work item that is either queued or running on @p
168 * queue
169 * @param flusher an uninitialized/unused flusher object
170 */
queue_flusher_locked(struct k_work_q * queue,struct k_work * work,struct z_work_flusher * flusher)171 static void queue_flusher_locked(struct k_work_q *queue,
172 struct k_work *work,
173 struct z_work_flusher *flusher)
174 {
175 bool in_list = false;
176 struct k_work *wn;
177
178 /* Determine whether the work item is still queued. */
179 SYS_SLIST_FOR_EACH_CONTAINER(&queue->pending, wn, node) {
180 if (wn == work) {
181 in_list = true;
182 break;
183 }
184 }
185
186 init_flusher(flusher);
187 if (in_list) {
188 sys_slist_insert(&queue->pending, &work->node,
189 &flusher->work.node);
190 } else {
191 sys_slist_prepend(&queue->pending, &flusher->work.node);
192 }
193 }
194
195 /* Try to remove a work item from the given queue.
196 *
197 * Invoked with work lock held.
198 *
199 * @param queue the queue from which the work should be removed
200 * @param work work that may be on the queue
201 */
queue_remove_locked(struct k_work_q * queue,struct k_work * work)202 static inline void queue_remove_locked(struct k_work_q *queue,
203 struct k_work *work)
204 {
205 if (flag_test_and_clear(&work->flags, K_WORK_QUEUED_BIT)) {
206 (void)sys_slist_find_and_remove(&queue->pending, &work->node);
207 }
208 }
209
210 /* Potentially notify a queue that it needs to look for pending work.
211 *
212 * This may make the work queue thread ready, but as the lock is held it
213 * will not be a reschedule point. Callers should yield after the lock is
214 * released where appropriate (generally if this returns true).
215 *
216 * @param queue to be notified. If this is null no notification is required.
217 *
218 * @return true if and only if the queue was notified and woken, i.e. a
219 * reschedule is pending.
220 */
notify_queue_locked(struct k_work_q * queue)221 static inline bool notify_queue_locked(struct k_work_q *queue)
222 {
223 bool rv = false;
224
225 if (queue != NULL) {
226 rv = z_sched_wake(&queue->notifyq, 0, NULL);
227 }
228
229 return rv;
230 }
231
232 /* Submit an work item to a queue if queue state allows new work.
233 *
234 * Submission is rejected if no queue is provided, or if the queue is
235 * draining and the work isn't being submitted from the queue's
236 * thread (chained submission).
237 *
238 * Invoked with work lock held.
239 * Conditionally notifies queue.
240 *
241 * @param queue the queue to which work should be submitted. This may
242 * be null, in which case the submission will fail.
243 *
244 * @param work to be submitted
245 *
246 * @retval 1 if successfully queued
247 * @retval -EINVAL if no queue is provided
248 * @retval -ENODEV if the queue is not started
249 * @retval -EBUSY if the submission was rejected (draining, plugged)
250 */
queue_submit_locked(struct k_work_q * queue,struct k_work * work)251 static inline int queue_submit_locked(struct k_work_q *queue,
252 struct k_work *work)
253 {
254 if (queue == NULL) {
255 return -EINVAL;
256 }
257
258 int ret = -EBUSY;
259 bool chained = (_current == &queue->thread) && !k_is_in_isr();
260 bool draining = flag_test(&queue->flags, K_WORK_QUEUE_DRAIN_BIT);
261 bool plugged = flag_test(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT);
262
263 /* Test for acceptability, in priority order:
264 *
265 * * -ENODEV if the queue isn't running.
266 * * -EBUSY if draining and not chained
267 * * -EBUSY if plugged and not draining
268 * * otherwise OK
269 */
270 if (!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT)) {
271 ret = -ENODEV;
272 } else if (draining && !chained) {
273 ret = -EBUSY;
274 } else if (plugged && !draining) {
275 ret = -EBUSY;
276 } else {
277 sys_slist_append(&queue->pending, &work->node);
278 ret = 1;
279 (void)notify_queue_locked(queue);
280 }
281
282 return ret;
283 }
284
285 /* Attempt to submit work to a queue.
286 *
287 * The submission can fail if:
288 * * the work is cancelling,
289 * * no candidate queue can be identified;
290 * * the candidate queue rejects the submission.
291 *
292 * Invoked with work lock held.
293 * Conditionally notifies queue.
294 *
295 * @param work the work structure to be submitted
296
297 * @param queuep pointer to a queue reference. On input this should
298 * dereference to the proposed queue (which may be null); after completion it
299 * will be null if the work was not submitted or if submitted will reference
300 * the queue it was submitted to. That may or may not be the queue provided
301 * on input.
302 *
303 * @retval 0 if work was already submitted to a queue
304 * @retval 1 if work was not submitted and has been queued to @p queue
305 * @retval 2 if work was running and has been queued to the queue that was
306 * running it
307 * @retval -EBUSY if canceling or submission was rejected by queue
308 * @retval -EINVAL if no queue is provided
309 * @retval -ENODEV if the queue is not started
310 */
submit_to_queue_locked(struct k_work * work,struct k_work_q ** queuep)311 static int submit_to_queue_locked(struct k_work *work,
312 struct k_work_q **queuep)
313 {
314 int ret = 0;
315
316 if (flag_test(&work->flags, K_WORK_CANCELING_BIT)) {
317 /* Disallowed */
318 ret = -EBUSY;
319 } else if (!flag_test(&work->flags, K_WORK_QUEUED_BIT)) {
320 /* Not currently queued */
321 ret = 1;
322
323 /* If no queue specified resubmit to last queue.
324 */
325 if (*queuep == NULL) {
326 *queuep = work->queue;
327 }
328
329 /* If the work is currently running we have to use the
330 * queue it's running on to prevent handler
331 * re-entrancy.
332 */
333 if (flag_test(&work->flags, K_WORK_RUNNING_BIT)) {
334 __ASSERT_NO_MSG(work->queue != NULL);
335 *queuep = work->queue;
336 ret = 2;
337 }
338
339 int rc = queue_submit_locked(*queuep, work);
340
341 if (rc < 0) {
342 ret = rc;
343 } else {
344 flag_set(&work->flags, K_WORK_QUEUED_BIT);
345 work->queue = *queuep;
346 }
347 } else {
348 /* Already queued, do nothing. */
349 }
350
351 if (ret <= 0) {
352 *queuep = NULL;
353 }
354
355 return ret;
356 }
357
358 /* Submit work to a queue but do not yield the current thread.
359 *
360 * Intended for internal use.
361 *
362 * See also submit_to_queue_locked().
363 *
364 * @param queuep pointer to a queue reference.
365 * @param work the work structure to be submitted
366 *
367 * @retval see submit_to_queue_locked()
368 */
z_work_submit_to_queue(struct k_work_q * queue,struct k_work * work)369 int z_work_submit_to_queue(struct k_work_q *queue,
370 struct k_work *work)
371 {
372 __ASSERT_NO_MSG(work != NULL);
373 __ASSERT_NO_MSG(work->handler != NULL);
374
375 k_spinlock_key_t key = k_spin_lock(&lock);
376
377 int ret = submit_to_queue_locked(work, &queue);
378
379 k_spin_unlock(&lock, key);
380
381 return ret;
382 }
383
k_work_submit_to_queue(struct k_work_q * queue,struct k_work * work)384 int k_work_submit_to_queue(struct k_work_q *queue,
385 struct k_work *work)
386 {
387 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, submit_to_queue, queue, work);
388
389 int ret = z_work_submit_to_queue(queue, work);
390
391 /* submit_to_queue_locked() won't reschedule on its own
392 * (really it should, otherwise this process will result in
393 * spurious calls to z_swap() due to the race), so do it here
394 * if the queue state changed.
395 */
396 if (ret > 0) {
397 z_reschedule_unlocked();
398 }
399
400 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, submit_to_queue, queue, work, ret);
401
402 return ret;
403 }
404
k_work_submit(struct k_work * work)405 int k_work_submit(struct k_work *work)
406 {
407 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, submit, work);
408
409 int ret = k_work_submit_to_queue(&k_sys_work_q, work);
410
411 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, submit, work, ret);
412
413 return ret;
414 }
415
416 /* Flush the work item if necessary.
417 *
418 * Flushing is necessary only if the work is either queued or running.
419 *
420 * Invoked with work lock held by key.
421 * Sleeps.
422 *
423 * @param work the work item that is to be flushed
424 * @param flusher state used to synchronize the flush
425 *
426 * @retval true if work is queued or running. If this happens the
427 * caller must take the flusher semaphore after releasing the lock.
428 *
429 * @retval false otherwise. No wait required.
430 */
work_flush_locked(struct k_work * work,struct z_work_flusher * flusher)431 static bool work_flush_locked(struct k_work *work,
432 struct z_work_flusher *flusher)
433 {
434 bool need_flush = (flags_get(&work->flags)
435 & (K_WORK_QUEUED | K_WORK_RUNNING)) != 0U;
436
437 if (need_flush) {
438 struct k_work_q *queue = work->queue;
439
440 __ASSERT_NO_MSG(queue != NULL);
441
442 queue_flusher_locked(queue, work, flusher);
443 notify_queue_locked(queue);
444 }
445
446 return need_flush;
447 }
448
k_work_flush(struct k_work * work,struct k_work_sync * sync)449 bool k_work_flush(struct k_work *work,
450 struct k_work_sync *sync)
451 {
452 __ASSERT_NO_MSG(work != NULL);
453 __ASSERT_NO_MSG(!flag_test(&work->flags, K_WORK_DELAYABLE_BIT));
454 __ASSERT_NO_MSG(!k_is_in_isr());
455 __ASSERT_NO_MSG(sync != NULL);
456 #ifdef CONFIG_KERNEL_COHERENCE
457 __ASSERT_NO_MSG(arch_mem_coherent(sync));
458 #endif
459
460 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, flush, work);
461
462 struct z_work_flusher *flusher = &sync->flusher;
463 k_spinlock_key_t key = k_spin_lock(&lock);
464
465 bool need_flush = work_flush_locked(work, flusher);
466
467 k_spin_unlock(&lock, key);
468
469 /* If necessary wait until the flusher item completes */
470 if (need_flush) {
471 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work, flush, work, K_FOREVER);
472
473 k_sem_take(&flusher->sem, K_FOREVER);
474 }
475
476 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, flush, work, need_flush);
477
478 return need_flush;
479 }
480
481 /* Execute the non-waiting steps necessary to cancel a work item.
482 *
483 * Invoked with work lock held.
484 *
485 * @param work the work item to be canceled.
486 *
487 * @retval true if we need to wait for the work item to finish canceling
488 * @retval false if the work item is idle
489 *
490 * @return k_busy_wait() captured under lock
491 */
cancel_async_locked(struct k_work * work)492 static int cancel_async_locked(struct k_work *work)
493 {
494 /* If we haven't already started canceling, do it now. */
495 if (!flag_test(&work->flags, K_WORK_CANCELING_BIT)) {
496 /* Remove it from the queue, if it's queued. */
497 queue_remove_locked(work->queue, work);
498 }
499
500 /* If it's still busy after it's been dequeued, then flag it
501 * as canceling.
502 */
503 int ret = work_busy_get_locked(work);
504
505 if (ret != 0) {
506 flag_set(&work->flags, K_WORK_CANCELING_BIT);
507 ret = work_busy_get_locked(work);
508 }
509
510 return ret;
511 }
512
513 /* Complete cancellation necessary, release work lock, and wait if
514 * necessary.
515 *
516 * Invoked with work lock held by key.
517 * Sleeps.
518 *
519 * @param work work that is being canceled
520 * @param canceller state used to synchronize the cancellation
521 * @param key used by work lock
522 *
523 * @retval true if and only if the work was still active on entry. The caller
524 * must wait on the canceller semaphore after releasing the lock.
525 *
526 * @retval false if work was idle on entry. The caller need not wait.
527 */
cancel_sync_locked(struct k_work * work,struct z_work_canceller * canceller)528 static bool cancel_sync_locked(struct k_work *work,
529 struct z_work_canceller *canceller)
530 {
531 bool ret = flag_test(&work->flags, K_WORK_CANCELING_BIT);
532
533 /* If something's still running then we have to wait for
534 * completion, which is indicated when finish_cancel() gets
535 * invoked.
536 */
537 if (ret) {
538 init_work_cancel(canceller, work);
539 }
540
541 return ret;
542 }
543
k_work_cancel(struct k_work * work)544 int k_work_cancel(struct k_work *work)
545 {
546 __ASSERT_NO_MSG(work != NULL);
547 __ASSERT_NO_MSG(!flag_test(&work->flags, K_WORK_DELAYABLE_BIT));
548
549 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel, work);
550
551 k_spinlock_key_t key = k_spin_lock(&lock);
552 int ret = cancel_async_locked(work);
553
554 k_spin_unlock(&lock, key);
555
556 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel, work, ret);
557
558 return ret;
559 }
560
k_work_cancel_sync(struct k_work * work,struct k_work_sync * sync)561 bool k_work_cancel_sync(struct k_work *work,
562 struct k_work_sync *sync)
563 {
564 __ASSERT_NO_MSG(work != NULL);
565 __ASSERT_NO_MSG(sync != NULL);
566 __ASSERT_NO_MSG(!flag_test(&work->flags, K_WORK_DELAYABLE_BIT));
567 __ASSERT_NO_MSG(!k_is_in_isr());
568 #ifdef CONFIG_KERNEL_COHERENCE
569 __ASSERT_NO_MSG(arch_mem_coherent(sync));
570 #endif
571
572 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel_sync, work, sync);
573
574 struct z_work_canceller *canceller = &sync->canceller;
575 k_spinlock_key_t key = k_spin_lock(&lock);
576 bool pending = (work_busy_get_locked(work) != 0U);
577 bool need_wait = false;
578
579 if (pending) {
580 (void)cancel_async_locked(work);
581 need_wait = cancel_sync_locked(work, canceller);
582 }
583
584 k_spin_unlock(&lock, key);
585
586 if (need_wait) {
587 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work, cancel_sync, work, sync);
588
589 k_sem_take(&canceller->sem, K_FOREVER);
590 }
591
592 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel_sync, work, sync, pending);
593 return pending;
594 }
595
596 /* Loop executed by a work queue thread.
597 *
598 * @param workq_ptr pointer to the work queue structure
599 */
work_queue_main(void * workq_ptr,void * p2,void * p3)600 static void work_queue_main(void *workq_ptr, void *p2, void *p3)
601 {
602 ARG_UNUSED(p2);
603 ARG_UNUSED(p3);
604
605 struct k_work_q *queue = (struct k_work_q *)workq_ptr;
606
607 while (true) {
608 sys_snode_t *node;
609 struct k_work *work = NULL;
610 k_work_handler_t handler = NULL;
611 k_spinlock_key_t key = k_spin_lock(&lock);
612 bool yield;
613
614 /* Check for and prepare any new work. */
615 node = sys_slist_get(&queue->pending);
616 if (node != NULL) {
617 /* Mark that there's some work active that's
618 * not on the pending list.
619 */
620 flag_set(&queue->flags, K_WORK_QUEUE_BUSY_BIT);
621 work = CONTAINER_OF(node, struct k_work, node);
622 flag_set(&work->flags, K_WORK_RUNNING_BIT);
623 flag_clear(&work->flags, K_WORK_QUEUED_BIT);
624
625 /* Static code analysis tool can raise a false-positive violation
626 * in the line below that 'work' is checked for null after being
627 * dereferenced.
628 *
629 * The work is figured out by CONTAINER_OF, as a container
630 * of type struct k_work that contains the node.
631 * The only way for it to be NULL is if node would be a member
632 * of struct k_work object that has been placed at address NULL,
633 * which should never happen, even line 'if (work != NULL)'
634 * ensures that.
635 * This means that if node is not NULL, then work will not be NULL.
636 */
637 handler = work->handler;
638 } else if (flag_test_and_clear(&queue->flags,
639 K_WORK_QUEUE_DRAIN_BIT)) {
640 /* Not busy and draining: move threads waiting for
641 * drain to ready state. The held spinlock inhibits
642 * immediate reschedule; released threads get their
643 * chance when this invokes z_sched_wait() below.
644 *
645 * We don't touch K_WORK_QUEUE_PLUGGABLE, so getting
646 * here doesn't mean that the queue will allow new
647 * submissions.
648 */
649 (void)z_sched_wake_all(&queue->drainq, 1, NULL);
650 } else {
651 /* No work is available and no queue state requires
652 * special handling.
653 */
654 ;
655 }
656
657 if (work == NULL) {
658 /* Nothing's had a chance to add work since we took
659 * the lock, and we didn't find work nor got asked to
660 * stop. Just go to sleep: when something happens the
661 * work thread will be woken and we can check again.
662 */
663
664 (void)z_sched_wait(&lock, key, &queue->notifyq,
665 K_FOREVER, NULL);
666 continue;
667 }
668
669 k_spin_unlock(&lock, key);
670
671 __ASSERT_NO_MSG(handler != NULL);
672 handler(work);
673
674 /* Mark the work item as no longer running and deal
675 * with any cancellation issued while it was running.
676 * Clear the BUSY flag and optionally yield to prevent
677 * starving other threads.
678 */
679 key = k_spin_lock(&lock);
680
681 flag_clear(&work->flags, K_WORK_RUNNING_BIT);
682 if (flag_test(&work->flags, K_WORK_CANCELING_BIT)) {
683 finalize_cancel_locked(work);
684 }
685
686 flag_clear(&queue->flags, K_WORK_QUEUE_BUSY_BIT);
687 yield = !flag_test(&queue->flags, K_WORK_QUEUE_NO_YIELD_BIT);
688 k_spin_unlock(&lock, key);
689
690 /* Optionally yield to prevent the work queue from
691 * starving other threads.
692 */
693 if (yield) {
694 k_yield();
695 }
696 }
697 }
698
k_work_queue_init(struct k_work_q * queue)699 void k_work_queue_init(struct k_work_q *queue)
700 {
701 __ASSERT_NO_MSG(queue != NULL);
702
703 *queue = (struct k_work_q) {
704 .flags = 0,
705 };
706
707 SYS_PORT_TRACING_OBJ_INIT(k_work_queue, queue);
708 }
709
k_work_queue_start(struct k_work_q * queue,k_thread_stack_t * stack,size_t stack_size,int prio,const struct k_work_queue_config * cfg)710 void k_work_queue_start(struct k_work_q *queue,
711 k_thread_stack_t *stack,
712 size_t stack_size,
713 int prio,
714 const struct k_work_queue_config *cfg)
715 {
716 __ASSERT_NO_MSG(queue);
717 __ASSERT_NO_MSG(stack);
718 __ASSERT_NO_MSG(!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT));
719 uint32_t flags = K_WORK_QUEUE_STARTED;
720
721 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, start, queue);
722
723 sys_slist_init(&queue->pending);
724 z_waitq_init(&queue->notifyq);
725 z_waitq_init(&queue->drainq);
726
727 if ((cfg != NULL) && cfg->no_yield) {
728 flags |= K_WORK_QUEUE_NO_YIELD;
729 }
730
731 /* It hasn't actually been started yet, but all the state is in place
732 * so we can submit things and once the thread gets control it's ready
733 * to roll.
734 */
735 flags_set(&queue->flags, flags);
736
737 (void)k_thread_create(&queue->thread, stack, stack_size,
738 work_queue_main, queue, NULL, NULL,
739 prio, 0, K_FOREVER);
740
741 if ((cfg != NULL) && (cfg->name != NULL)) {
742 k_thread_name_set(&queue->thread, cfg->name);
743 }
744
745 k_thread_start(&queue->thread);
746
747 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, start, queue);
748 }
749
k_work_queue_drain(struct k_work_q * queue,bool plug)750 int k_work_queue_drain(struct k_work_q *queue,
751 bool plug)
752 {
753 __ASSERT_NO_MSG(queue);
754 __ASSERT_NO_MSG(!k_is_in_isr());
755
756 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, drain, queue);
757
758 int ret = 0;
759 k_spinlock_key_t key = k_spin_lock(&lock);
760
761 if (((flags_get(&queue->flags)
762 & (K_WORK_QUEUE_BUSY | K_WORK_QUEUE_DRAIN)) != 0U)
763 || plug
764 || !sys_slist_is_empty(&queue->pending)) {
765 flag_set(&queue->flags, K_WORK_QUEUE_DRAIN_BIT);
766 if (plug) {
767 flag_set(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT);
768 }
769
770 notify_queue_locked(queue);
771 ret = z_sched_wait(&lock, key, &queue->drainq,
772 K_FOREVER, NULL);
773 } else {
774 k_spin_unlock(&lock, key);
775 }
776
777 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, drain, queue, ret);
778
779 return ret;
780 }
781
k_work_queue_unplug(struct k_work_q * queue)782 int k_work_queue_unplug(struct k_work_q *queue)
783 {
784 __ASSERT_NO_MSG(queue);
785
786 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, unplug, queue);
787
788 int ret = -EALREADY;
789 k_spinlock_key_t key = k_spin_lock(&lock);
790
791 if (flag_test_and_clear(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT)) {
792 ret = 0;
793 }
794
795 k_spin_unlock(&lock, key);
796
797 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, unplug, queue, ret);
798
799 return ret;
800 }
801
802 #ifdef CONFIG_SYS_CLOCK_EXISTS
803
804 /* Timeout handler for delayable work.
805 *
806 * Invoked by timeout infrastructure.
807 * Takes and releases work lock.
808 * Conditionally reschedules.
809 */
work_timeout(struct _timeout * to)810 static void work_timeout(struct _timeout *to)
811 {
812 struct k_work_delayable *dw
813 = CONTAINER_OF(to, struct k_work_delayable, timeout);
814 struct k_work *wp = &dw->work;
815 k_spinlock_key_t key = k_spin_lock(&lock);
816 struct k_work_q *queue = NULL;
817
818 /* If the work is still marked delayed (should be) then clear that
819 * state and submit it to the queue. If successful the queue will be
820 * notified of new work at the next reschedule point.
821 *
822 * If not successful there is no notification that the work has been
823 * abandoned. Sorry.
824 */
825 if (flag_test_and_clear(&wp->flags, K_WORK_DELAYED_BIT)) {
826 queue = dw->queue;
827 (void)submit_to_queue_locked(wp, &queue);
828 }
829
830 k_spin_unlock(&lock, key);
831 }
832
k_work_init_delayable(struct k_work_delayable * dwork,k_work_handler_t handler)833 void k_work_init_delayable(struct k_work_delayable *dwork,
834 k_work_handler_t handler)
835 {
836 __ASSERT_NO_MSG(dwork != NULL);
837 __ASSERT_NO_MSG(handler != NULL);
838
839 *dwork = (struct k_work_delayable){
840 .work = {
841 .handler = handler,
842 .flags = K_WORK_DELAYABLE,
843 },
844 };
845 z_init_timeout(&dwork->timeout);
846
847 SYS_PORT_TRACING_OBJ_INIT(k_work_delayable, dwork);
848 }
849
work_delayable_busy_get_locked(const struct k_work_delayable * dwork)850 static inline int work_delayable_busy_get_locked(const struct k_work_delayable *dwork)
851 {
852 return flags_get(&dwork->work.flags) & K_WORK_MASK;
853 }
854
k_work_delayable_busy_get(const struct k_work_delayable * dwork)855 int k_work_delayable_busy_get(const struct k_work_delayable *dwork)
856 {
857 k_spinlock_key_t key = k_spin_lock(&lock);
858 int ret = work_delayable_busy_get_locked(dwork);
859
860 k_spin_unlock(&lock, key);
861 return ret;
862 }
863
864 /* Attempt to schedule a work item for future (maybe immediate)
865 * submission.
866 *
867 * Invoked with work lock held.
868 *
869 * See also submit_to_queue_locked(), which implements this for a no-wait
870 * delay.
871 *
872 * Invoked with work lock held.
873 *
874 * @param queuep pointer to a pointer to a queue. On input this
875 * should dereference to the proposed queue (which may be null); after
876 * completion it will be null if the work was not submitted or if
877 * submitted will reference the queue it was submitted to. That may
878 * or may not be the queue provided on input.
879 *
880 * @param dwork the delayed work structure
881 *
882 * @param delay the delay to use before scheduling.
883 *
884 * @retval from submit_to_queue_locked() if delay is K_NO_WAIT; otherwise
885 * @retval 1 to indicate successfully scheduled.
886 */
schedule_for_queue_locked(struct k_work_q ** queuep,struct k_work_delayable * dwork,k_timeout_t delay)887 static int schedule_for_queue_locked(struct k_work_q **queuep,
888 struct k_work_delayable *dwork,
889 k_timeout_t delay)
890 {
891 int ret = 1;
892 struct k_work *work = &dwork->work;
893
894 if (K_TIMEOUT_EQ(delay, K_NO_WAIT)) {
895 return submit_to_queue_locked(work, queuep);
896 }
897
898 flag_set(&work->flags, K_WORK_DELAYED_BIT);
899 dwork->queue = *queuep;
900
901 /* Add timeout */
902 z_add_timeout(&dwork->timeout, work_timeout, delay);
903
904 return ret;
905 }
906
907 /* Unschedule delayable work.
908 *
909 * If the work is delayed, cancel the timeout and clear the delayed
910 * flag.
911 *
912 * Invoked with work lock held.
913 *
914 * @param dwork pointer to delayable work structure.
915 *
916 * @return true if and only if work had been delayed so the timeout
917 * was cancelled.
918 */
unschedule_locked(struct k_work_delayable * dwork)919 static inline bool unschedule_locked(struct k_work_delayable *dwork)
920 {
921 bool ret = false;
922 struct k_work *work = &dwork->work;
923
924 /* If scheduled, try to cancel. If it fails, that means the
925 * callback has been dequeued and will inevitably run (or has
926 * already run), so treat that as "undelayed" and return
927 * false.
928 */
929 if (flag_test_and_clear(&work->flags, K_WORK_DELAYED_BIT)) {
930 ret = z_abort_timeout(&dwork->timeout) == 0;
931 }
932
933 return ret;
934 }
935
936 /* Full cancellation of a delayable work item.
937 *
938 * Unschedules the delayed part then delegates to standard work
939 * cancellation.
940 *
941 * Invoked with work lock held.
942 *
943 * @param dwork delayable work item
944 *
945 * @return k_work_busy_get() flags
946 */
cancel_delayable_async_locked(struct k_work_delayable * dwork)947 static int cancel_delayable_async_locked(struct k_work_delayable *dwork)
948 {
949 (void)unschedule_locked(dwork);
950
951 return cancel_async_locked(&dwork->work);
952 }
953
k_work_schedule_for_queue(struct k_work_q * queue,struct k_work_delayable * dwork,k_timeout_t delay)954 int k_work_schedule_for_queue(struct k_work_q *queue,
955 struct k_work_delayable *dwork,
956 k_timeout_t delay)
957 {
958 __ASSERT_NO_MSG(dwork != NULL);
959
960 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, schedule_for_queue, queue, dwork, delay);
961
962 struct k_work *work = &dwork->work;
963 int ret = 0;
964 k_spinlock_key_t key = k_spin_lock(&lock);
965
966 /* Schedule the work item if it's idle or running. */
967 if ((work_busy_get_locked(work) & ~K_WORK_RUNNING) == 0U) {
968 ret = schedule_for_queue_locked(&queue, dwork, delay);
969 }
970
971 k_spin_unlock(&lock, key);
972
973 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, schedule_for_queue, queue, dwork, delay, ret);
974
975 return ret;
976 }
977
k_work_schedule(struct k_work_delayable * dwork,k_timeout_t delay)978 int k_work_schedule(struct k_work_delayable *dwork,
979 k_timeout_t delay)
980 {
981 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, schedule, dwork, delay);
982
983 int ret = k_work_schedule_for_queue(&k_sys_work_q, dwork, delay);
984
985 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, schedule, dwork, delay, ret);
986
987 return ret;
988 }
989
k_work_reschedule_for_queue(struct k_work_q * queue,struct k_work_delayable * dwork,k_timeout_t delay)990 int k_work_reschedule_for_queue(struct k_work_q *queue,
991 struct k_work_delayable *dwork,
992 k_timeout_t delay)
993 {
994 __ASSERT_NO_MSG(dwork != NULL);
995
996 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, reschedule_for_queue, queue, dwork, delay);
997
998 int ret = 0;
999 k_spinlock_key_t key = k_spin_lock(&lock);
1000
1001 /* Remove any active scheduling. */
1002 (void)unschedule_locked(dwork);
1003
1004 /* Schedule the work item with the new parameters. */
1005 ret = schedule_for_queue_locked(&queue, dwork, delay);
1006
1007 k_spin_unlock(&lock, key);
1008
1009 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, reschedule_for_queue, queue, dwork, delay, ret);
1010
1011 return ret;
1012 }
1013
k_work_reschedule(struct k_work_delayable * dwork,k_timeout_t delay)1014 int k_work_reschedule(struct k_work_delayable *dwork,
1015 k_timeout_t delay)
1016 {
1017 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, reschedule, dwork, delay);
1018
1019 int ret = k_work_reschedule_for_queue(&k_sys_work_q, dwork, delay);
1020
1021 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, reschedule, dwork, delay, ret);
1022
1023 return ret;
1024 }
1025
k_work_cancel_delayable(struct k_work_delayable * dwork)1026 int k_work_cancel_delayable(struct k_work_delayable *dwork)
1027 {
1028 __ASSERT_NO_MSG(dwork != NULL);
1029
1030 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel_delayable, dwork);
1031
1032 k_spinlock_key_t key = k_spin_lock(&lock);
1033 int ret = cancel_delayable_async_locked(dwork);
1034
1035 k_spin_unlock(&lock, key);
1036
1037 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel_delayable, dwork, ret);
1038
1039 return ret;
1040 }
1041
k_work_cancel_delayable_sync(struct k_work_delayable * dwork,struct k_work_sync * sync)1042 bool k_work_cancel_delayable_sync(struct k_work_delayable *dwork,
1043 struct k_work_sync *sync)
1044 {
1045 __ASSERT_NO_MSG(dwork != NULL);
1046 __ASSERT_NO_MSG(sync != NULL);
1047 __ASSERT_NO_MSG(!k_is_in_isr());
1048 #ifdef CONFIG_KERNEL_COHERENCE
1049 __ASSERT_NO_MSG(arch_mem_coherent(sync));
1050 #endif
1051
1052 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel_delayable_sync, dwork, sync);
1053
1054 struct z_work_canceller *canceller = &sync->canceller;
1055 k_spinlock_key_t key = k_spin_lock(&lock);
1056 bool pending = (work_delayable_busy_get_locked(dwork) != 0U);
1057 bool need_wait = false;
1058
1059 if (pending) {
1060 (void)cancel_delayable_async_locked(dwork);
1061 need_wait = cancel_sync_locked(&dwork->work, canceller);
1062 }
1063
1064 k_spin_unlock(&lock, key);
1065
1066 if (need_wait) {
1067 k_sem_take(&canceller->sem, K_FOREVER);
1068 }
1069
1070 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel_delayable_sync, dwork, sync, pending);
1071 return pending;
1072 }
1073
k_work_flush_delayable(struct k_work_delayable * dwork,struct k_work_sync * sync)1074 bool k_work_flush_delayable(struct k_work_delayable *dwork,
1075 struct k_work_sync *sync)
1076 {
1077 __ASSERT_NO_MSG(dwork != NULL);
1078 __ASSERT_NO_MSG(sync != NULL);
1079 __ASSERT_NO_MSG(!k_is_in_isr());
1080 #ifdef CONFIG_KERNEL_COHERENCE
1081 __ASSERT_NO_MSG(arch_mem_coherent(sync));
1082 #endif
1083
1084 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, flush_delayable, dwork, sync);
1085
1086 struct k_work *work = &dwork->work;
1087 struct z_work_flusher *flusher = &sync->flusher;
1088 k_spinlock_key_t key = k_spin_lock(&lock);
1089
1090 /* If it's idle release the lock and return immediately. */
1091 if (work_busy_get_locked(work) == 0U) {
1092 k_spin_unlock(&lock, key);
1093
1094 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, flush_delayable, dwork, sync, false);
1095
1096 return false;
1097 }
1098
1099 /* If unscheduling did something then submit it. Ignore a
1100 * failed submission (e.g. when cancelling).
1101 */
1102 if (unschedule_locked(dwork)) {
1103 struct k_work_q *queue = dwork->queue;
1104
1105 (void)submit_to_queue_locked(work, &queue);
1106 }
1107
1108 /* Wait for it to finish */
1109 bool need_flush = work_flush_locked(work, flusher);
1110
1111 k_spin_unlock(&lock, key);
1112
1113 /* If necessary wait until the flusher item completes */
1114 if (need_flush) {
1115 k_sem_take(&flusher->sem, K_FOREVER);
1116 }
1117
1118 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, flush_delayable, dwork, sync, need_flush);
1119
1120 return need_flush;
1121 }
1122
1123 #endif /* CONFIG_SYS_CLOCK_EXISTS */
1124