1 /*
2 * Copyright (c) 2020 Nordic Semiconductor ASA
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /**
8 * @file
9 *
10 * Second generation work queue implementation
11 */
12
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15 #include <zephyr/wait_q.h>
16 #include <zephyr/spinlock.h>
17 #include <errno.h>
18 #include <ksched.h>
19 #include <zephyr/sys/printk.h>
20
flag_clear(uint32_t * flagp,uint32_t bit)21 static inline void flag_clear(uint32_t *flagp,
22 uint32_t bit)
23 {
24 *flagp &= ~BIT(bit);
25 }
26
flag_set(uint32_t * flagp,uint32_t bit)27 static inline void flag_set(uint32_t *flagp,
28 uint32_t bit)
29 {
30 *flagp |= BIT(bit);
31 }
32
flag_test(const uint32_t * flagp,uint32_t bit)33 static inline bool flag_test(const uint32_t *flagp,
34 uint32_t bit)
35 {
36 return (*flagp & BIT(bit)) != 0U;
37 }
38
flag_test_and_clear(uint32_t * flagp,int bit)39 static inline bool flag_test_and_clear(uint32_t *flagp,
40 int bit)
41 {
42 bool ret = flag_test(flagp, bit);
43
44 flag_clear(flagp, bit);
45
46 return ret;
47 }
48
flags_set(uint32_t * flagp,uint32_t flags)49 static inline void flags_set(uint32_t *flagp,
50 uint32_t flags)
51 {
52 *flagp = flags;
53 }
54
flags_get(const uint32_t * flagp)55 static inline uint32_t flags_get(const uint32_t *flagp)
56 {
57 return *flagp;
58 }
59
60 /* Lock to protect the internal state of all work items, work queues,
61 * and pending_cancels.
62 */
63 static struct k_spinlock lock;
64
65 /* Invoked by work thread */
handle_flush(struct k_work * work)66 static void handle_flush(struct k_work *work)
67 {
68 struct z_work_flusher *flusher
69 = CONTAINER_OF(work, struct z_work_flusher, work);
70
71 k_sem_give(&flusher->sem);
72 }
73
init_flusher(struct z_work_flusher * flusher)74 static inline void init_flusher(struct z_work_flusher *flusher)
75 {
76 k_sem_init(&flusher->sem, 0, 1);
77 k_work_init(&flusher->work, handle_flush);
78 }
79
80 /* List of pending cancellations. */
81 static sys_slist_t pending_cancels;
82
83 /* Initialize a canceler record and add it to the list of pending
84 * cancels.
85 *
86 * Invoked with work lock held.
87 *
88 * @param canceler the structure used to notify a waiting process.
89 * @param work the work structure that is to be canceled
90 */
init_work_cancel(struct z_work_canceller * canceler,struct k_work * work)91 static inline void init_work_cancel(struct z_work_canceller *canceler,
92 struct k_work *work)
93 {
94 k_sem_init(&canceler->sem, 0, 1);
95 canceler->work = work;
96 sys_slist_append(&pending_cancels, &canceler->node);
97 }
98
99 /* Complete cancellation of a work item and unlock held lock.
100 *
101 * Invoked with work lock held.
102 *
103 * Invoked from a work queue thread.
104 *
105 * Reschedules.
106 *
107 * @param work the work structure that has completed cancellation
108 */
finalize_cancel_locked(struct k_work * work)109 static void finalize_cancel_locked(struct k_work *work)
110 {
111 struct z_work_canceller *wc, *tmp;
112 sys_snode_t *prev = NULL;
113
114 /* Clear this first, so released high-priority threads don't
115 * see it when doing things.
116 */
117 flag_clear(&work->flags, K_WORK_CANCELING_BIT);
118
119 /* Search for and remove the matching container, and release
120 * what's waiting for the completion. The same work item can
121 * appear multiple times in the list if multiple threads
122 * attempt to cancel it.
123 */
124 SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&pending_cancels, wc, tmp, node) {
125 if (wc->work == work) {
126 sys_slist_remove(&pending_cancels, prev, &wc->node);
127 k_sem_give(&wc->sem);
128 } else {
129 prev = &wc->node;
130 }
131 }
132 }
133
k_work_init(struct k_work * work,k_work_handler_t handler)134 void k_work_init(struct k_work *work,
135 k_work_handler_t handler)
136 {
137 __ASSERT_NO_MSG(work != NULL);
138 __ASSERT_NO_MSG(handler != NULL);
139
140 *work = (struct k_work)Z_WORK_INITIALIZER(handler);
141
142 SYS_PORT_TRACING_OBJ_INIT(k_work, work);
143 }
144
work_busy_get_locked(const struct k_work * work)145 static inline int work_busy_get_locked(const struct k_work *work)
146 {
147 return flags_get(&work->flags) & K_WORK_MASK;
148 }
149
k_work_busy_get(const struct k_work * work)150 int k_work_busy_get(const struct k_work *work)
151 {
152 k_spinlock_key_t key = k_spin_lock(&lock);
153 int ret = work_busy_get_locked(work);
154
155 k_spin_unlock(&lock, key);
156
157 return ret;
158 }
159
160 /* Add a flusher work item to the queue.
161 *
162 * Invoked with work lock held.
163 *
164 * Caller must notify queue of pending work.
165 *
166 * @param queue queue on which a work item may appear.
167 * @param work the work item that is either queued or running on @p
168 * queue
169 * @param flusher an uninitialized/unused flusher object
170 */
queue_flusher_locked(struct k_work_q * queue,struct k_work * work,struct z_work_flusher * flusher)171 static void queue_flusher_locked(struct k_work_q *queue,
172 struct k_work *work,
173 struct z_work_flusher *flusher)
174 {
175 bool in_list = false;
176 struct k_work *wn;
177
178 /* Determine whether the work item is still queued. */
179 SYS_SLIST_FOR_EACH_CONTAINER(&queue->pending, wn, node) {
180 if (wn == work) {
181 in_list = true;
182 break;
183 }
184 }
185
186 init_flusher(flusher);
187 if (in_list) {
188 sys_slist_insert(&queue->pending, &work->node,
189 &flusher->work.node);
190 } else {
191 sys_slist_prepend(&queue->pending, &flusher->work.node);
192 }
193 }
194
195 /* Try to remove a work item from the given queue.
196 *
197 * Invoked with work lock held.
198 *
199 * @param queue the queue from which the work should be removed
200 * @param work work that may be on the queue
201 */
queue_remove_locked(struct k_work_q * queue,struct k_work * work)202 static inline void queue_remove_locked(struct k_work_q *queue,
203 struct k_work *work)
204 {
205 if (flag_test_and_clear(&work->flags, K_WORK_QUEUED_BIT)) {
206 (void)sys_slist_find_and_remove(&queue->pending, &work->node);
207 }
208 }
209
210 /* Potentially notify a queue that it needs to look for pending work.
211 *
212 * This may make the work queue thread ready, but as the lock is held it
213 * will not be a reschedule point. Callers should yield after the lock is
214 * released where appropriate (generally if this returns true).
215 *
216 * @param queue to be notified. If this is null no notification is required.
217 *
218 * @return true if and only if the queue was notified and woken, i.e. a
219 * reschedule is pending.
220 */
notify_queue_locked(struct k_work_q * queue)221 static inline bool notify_queue_locked(struct k_work_q *queue)
222 {
223 bool rv = false;
224
225 if (queue != NULL) {
226 rv = z_sched_wake(&queue->notifyq, 0, NULL);
227 }
228
229 return rv;
230 }
231
232 /* Submit an work item to a queue if queue state allows new work.
233 *
234 * Submission is rejected if no queue is provided, or if the queue is
235 * draining and the work isn't being submitted from the queue's
236 * thread (chained submission).
237 *
238 * Invoked with work lock held.
239 * Conditionally notifies queue.
240 *
241 * @param queue the queue to which work should be submitted. This may
242 * be null, in which case the submission will fail.
243 *
244 * @param work to be submitted
245 *
246 * @retval 1 if successfully queued
247 * @retval -EINVAL if no queue is provided
248 * @retval -ENODEV if the queue is not started
249 * @retval -EBUSY if the submission was rejected (draining, plugged)
250 */
queue_submit_locked(struct k_work_q * queue,struct k_work * work)251 static inline int queue_submit_locked(struct k_work_q *queue,
252 struct k_work *work)
253 {
254 if (queue == NULL) {
255 return -EINVAL;
256 }
257
258 int ret = -EBUSY;
259 bool chained = (_current == &queue->thread) && !k_is_in_isr();
260 bool draining = flag_test(&queue->flags, K_WORK_QUEUE_DRAIN_BIT);
261 bool plugged = flag_test(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT);
262
263 /* Test for acceptability, in priority order:
264 *
265 * * -ENODEV if the queue isn't running.
266 * * -EBUSY if draining and not chained
267 * * -EBUSY if plugged and not draining
268 * * otherwise OK
269 */
270 if (!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT)) {
271 ret = -ENODEV;
272 } else if (draining && !chained) {
273 ret = -EBUSY;
274 } else if (plugged && !draining) {
275 ret = -EBUSY;
276 } else {
277 sys_slist_append(&queue->pending, &work->node);
278 ret = 1;
279 (void)notify_queue_locked(queue);
280 }
281
282 return ret;
283 }
284
285 /* Attempt to submit work to a queue.
286 *
287 * The submission can fail if:
288 * * the work is cancelling,
289 * * no candidate queue can be identified;
290 * * the candidate queue rejects the submission.
291 *
292 * Invoked with work lock held.
293 * Conditionally notifies queue.
294 *
295 * @param work the work structure to be submitted
296
297 * @param queuep pointer to a queue reference. On input this should
298 * dereference to the proposed queue (which may be null); after completion it
299 * will be null if the work was not submitted or if submitted will reference
300 * the queue it was submitted to. That may or may not be the queue provided
301 * on input.
302 *
303 * @retval 0 if work was already submitted to a queue
304 * @retval 1 if work was not submitted and has been queued to @p queue
305 * @retval 2 if work was running and has been queued to the queue that was
306 * running it
307 * @retval -EBUSY if canceling or submission was rejected by queue
308 * @retval -EINVAL if no queue is provided
309 * @retval -ENODEV if the queue is not started
310 */
submit_to_queue_locked(struct k_work * work,struct k_work_q ** queuep)311 static int submit_to_queue_locked(struct k_work *work,
312 struct k_work_q **queuep)
313 {
314 int ret = 0;
315
316 if (flag_test(&work->flags, K_WORK_CANCELING_BIT)) {
317 /* Disallowed */
318 ret = -EBUSY;
319 } else if (!flag_test(&work->flags, K_WORK_QUEUED_BIT)) {
320 /* Not currently queued */
321 ret = 1;
322
323 /* If no queue specified resubmit to last queue.
324 */
325 if (*queuep == NULL) {
326 *queuep = work->queue;
327 }
328
329 /* If the work is currently running we have to use the
330 * queue it's running on to prevent handler
331 * re-entrancy.
332 */
333 if (flag_test(&work->flags, K_WORK_RUNNING_BIT)) {
334 __ASSERT_NO_MSG(work->queue != NULL);
335 *queuep = work->queue;
336 ret = 2;
337 }
338
339 int rc = queue_submit_locked(*queuep, work);
340
341 if (rc < 0) {
342 ret = rc;
343 } else {
344 flag_set(&work->flags, K_WORK_QUEUED_BIT);
345 work->queue = *queuep;
346 }
347 } else {
348 /* Already queued, do nothing. */
349 }
350
351 if (ret <= 0) {
352 *queuep = NULL;
353 }
354
355 return ret;
356 }
357
358 /* Submit work to a queue but do not yield the current thread.
359 *
360 * Intended for internal use.
361 *
362 * See also submit_to_queue_locked().
363 *
364 * @param queuep pointer to a queue reference.
365 * @param work the work structure to be submitted
366 *
367 * @retval see submit_to_queue_locked()
368 */
z_work_submit_to_queue(struct k_work_q * queue,struct k_work * work)369 int z_work_submit_to_queue(struct k_work_q *queue,
370 struct k_work *work)
371 {
372 __ASSERT_NO_MSG(work != NULL);
373
374 k_spinlock_key_t key = k_spin_lock(&lock);
375
376 int ret = submit_to_queue_locked(work, &queue);
377
378 k_spin_unlock(&lock, key);
379
380 return ret;
381 }
382
k_work_submit_to_queue(struct k_work_q * queue,struct k_work * work)383 int k_work_submit_to_queue(struct k_work_q *queue,
384 struct k_work *work)
385 {
386 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, submit_to_queue, queue, work);
387
388 int ret = z_work_submit_to_queue(queue, work);
389
390 /* submit_to_queue_locked() won't reschedule on its own
391 * (really it should, otherwise this process will result in
392 * spurious calls to z_swap() due to the race), so do it here
393 * if the queue state changed.
394 */
395 if (ret > 0) {
396 z_reschedule_unlocked();
397 }
398
399 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, submit_to_queue, queue, work, ret);
400
401 return ret;
402 }
403
k_work_submit(struct k_work * work)404 int k_work_submit(struct k_work *work)
405 {
406 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, submit, work);
407
408 int ret = k_work_submit_to_queue(&k_sys_work_q, work);
409
410 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, submit, work, ret);
411
412 return ret;
413 }
414
415 /* Flush the work item if necessary.
416 *
417 * Flushing is necessary only if the work is either queued or running.
418 *
419 * Invoked with work lock held by key.
420 * Sleeps.
421 *
422 * @param work the work item that is to be flushed
423 * @param flusher state used to synchronize the flush
424 *
425 * @retval true if work is queued or running. If this happens the
426 * caller must take the flusher semaphore after releasing the lock.
427 *
428 * @retval false otherwise. No wait required.
429 */
work_flush_locked(struct k_work * work,struct z_work_flusher * flusher)430 static bool work_flush_locked(struct k_work *work,
431 struct z_work_flusher *flusher)
432 {
433 bool need_flush = (flags_get(&work->flags)
434 & (K_WORK_QUEUED | K_WORK_RUNNING)) != 0U;
435
436 if (need_flush) {
437 struct k_work_q *queue = work->queue;
438
439 __ASSERT_NO_MSG(queue != NULL);
440
441 queue_flusher_locked(queue, work, flusher);
442 notify_queue_locked(queue);
443 }
444
445 return need_flush;
446 }
447
k_work_flush(struct k_work * work,struct k_work_sync * sync)448 bool k_work_flush(struct k_work *work,
449 struct k_work_sync *sync)
450 {
451 __ASSERT_NO_MSG(work != NULL);
452 __ASSERT_NO_MSG(!flag_test(&work->flags, K_WORK_DELAYABLE_BIT));
453 __ASSERT_NO_MSG(!k_is_in_isr());
454 __ASSERT_NO_MSG(sync != NULL);
455 #ifdef CONFIG_KERNEL_COHERENCE
456 __ASSERT_NO_MSG(arch_mem_coherent(sync));
457 #endif
458
459 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, flush, work);
460
461 struct z_work_flusher *flusher = &sync->flusher;
462 k_spinlock_key_t key = k_spin_lock(&lock);
463
464 bool need_flush = work_flush_locked(work, flusher);
465
466 k_spin_unlock(&lock, key);
467
468 /* If necessary wait until the flusher item completes */
469 if (need_flush) {
470 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work, flush, work, K_FOREVER);
471
472 k_sem_take(&flusher->sem, K_FOREVER);
473 }
474
475 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, flush, work, need_flush);
476
477 return need_flush;
478 }
479
480 /* Execute the non-waiting steps necessary to cancel a work item.
481 *
482 * Invoked with work lock held.
483 *
484 * @param work the work item to be canceled.
485 *
486 * @retval true if we need to wait for the work item to finish canceling
487 * @retval false if the work item is idle
488 *
489 * @return k_busy_wait() captured under lock
490 */
cancel_async_locked(struct k_work * work)491 static int cancel_async_locked(struct k_work *work)
492 {
493 /* If we haven't already started canceling, do it now. */
494 if (!flag_test(&work->flags, K_WORK_CANCELING_BIT)) {
495 /* Remove it from the queue, if it's queued. */
496 queue_remove_locked(work->queue, work);
497 }
498
499 /* If it's still busy after it's been dequeued, then flag it
500 * as canceling.
501 */
502 int ret = work_busy_get_locked(work);
503
504 if (ret != 0) {
505 flag_set(&work->flags, K_WORK_CANCELING_BIT);
506 ret = work_busy_get_locked(work);
507 }
508
509 return ret;
510 }
511
512 /* Complete cancellation necessary, release work lock, and wait if
513 * necessary.
514 *
515 * Invoked with work lock held by key.
516 * Sleeps.
517 *
518 * @param work work that is being canceled
519 * @param canceller state used to synchronize the cancellation
520 * @param key used by work lock
521 *
522 * @retval true if and only if the work was still active on entry. The caller
523 * must wait on the canceller semaphore after releasing the lock.
524 *
525 * @retval false if work was idle on entry. The caller need not wait.
526 */
cancel_sync_locked(struct k_work * work,struct z_work_canceller * canceller)527 static bool cancel_sync_locked(struct k_work *work,
528 struct z_work_canceller *canceller)
529 {
530 bool ret = flag_test(&work->flags, K_WORK_CANCELING_BIT);
531
532 /* If something's still running then we have to wait for
533 * completion, which is indicated when finish_cancel() gets
534 * invoked.
535 */
536 if (ret) {
537 init_work_cancel(canceller, work);
538 }
539
540 return ret;
541 }
542
k_work_cancel(struct k_work * work)543 int k_work_cancel(struct k_work *work)
544 {
545 __ASSERT_NO_MSG(work != NULL);
546 __ASSERT_NO_MSG(!flag_test(&work->flags, K_WORK_DELAYABLE_BIT));
547
548 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel, work);
549
550 k_spinlock_key_t key = k_spin_lock(&lock);
551 int ret = cancel_async_locked(work);
552
553 k_spin_unlock(&lock, key);
554
555 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel, work, ret);
556
557 return ret;
558 }
559
k_work_cancel_sync(struct k_work * work,struct k_work_sync * sync)560 bool k_work_cancel_sync(struct k_work *work,
561 struct k_work_sync *sync)
562 {
563 __ASSERT_NO_MSG(work != NULL);
564 __ASSERT_NO_MSG(sync != NULL);
565 __ASSERT_NO_MSG(!flag_test(&work->flags, K_WORK_DELAYABLE_BIT));
566 __ASSERT_NO_MSG(!k_is_in_isr());
567 #ifdef CONFIG_KERNEL_COHERENCE
568 __ASSERT_NO_MSG(arch_mem_coherent(sync));
569 #endif
570
571 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel_sync, work, sync);
572
573 struct z_work_canceller *canceller = &sync->canceller;
574 k_spinlock_key_t key = k_spin_lock(&lock);
575 bool pending = (work_busy_get_locked(work) != 0U);
576 bool need_wait = false;
577
578 if (pending) {
579 (void)cancel_async_locked(work);
580 need_wait = cancel_sync_locked(work, canceller);
581 }
582
583 k_spin_unlock(&lock, key);
584
585 if (need_wait) {
586 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_work, cancel_sync, work, sync);
587
588 k_sem_take(&canceller->sem, K_FOREVER);
589 }
590
591 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel_sync, work, sync, pending);
592 return pending;
593 }
594
595 /* Loop executed by a work queue thread.
596 *
597 * @param workq_ptr pointer to the work queue structure
598 */
work_queue_main(void * workq_ptr,void * p2,void * p3)599 static void work_queue_main(void *workq_ptr, void *p2, void *p3)
600 {
601 struct k_work_q *queue = (struct k_work_q *)workq_ptr;
602
603 while (true) {
604 sys_snode_t *node;
605 struct k_work *work = NULL;
606 k_work_handler_t handler = NULL;
607 k_spinlock_key_t key = k_spin_lock(&lock);
608 bool yield;
609
610 /* Check for and prepare any new work. */
611 node = sys_slist_get(&queue->pending);
612 if (node != NULL) {
613 /* Mark that there's some work active that's
614 * not on the pending list.
615 */
616 flag_set(&queue->flags, K_WORK_QUEUE_BUSY_BIT);
617 work = CONTAINER_OF(node, struct k_work, node);
618 flag_set(&work->flags, K_WORK_RUNNING_BIT);
619 flag_clear(&work->flags, K_WORK_QUEUED_BIT);
620
621 /* Static code analysis tool can raise a false-positive violation
622 * in the line below that 'work' is checked for null after being
623 * dereferenced.
624 *
625 * The work is figured out by CONTAINER_OF, as a container
626 * of type struct k_work that contains the node.
627 * The only way for it to be NULL is if node would be a member
628 * of struct k_work object that has been placed at address NULL,
629 * which should never happen, even line 'if (work != NULL)'
630 * ensures that.
631 * This means that if node is not NULL, then work will not be NULL.
632 */
633 handler = work->handler;
634 } else if (flag_test_and_clear(&queue->flags,
635 K_WORK_QUEUE_DRAIN_BIT)) {
636 /* Not busy and draining: move threads waiting for
637 * drain to ready state. The held spinlock inhibits
638 * immediate reschedule; released threads get their
639 * chance when this invokes z_sched_wait() below.
640 *
641 * We don't touch K_WORK_QUEUE_PLUGGABLE, so getting
642 * here doesn't mean that the queue will allow new
643 * submissions.
644 */
645 (void)z_sched_wake_all(&queue->drainq, 1, NULL);
646 } else {
647 /* No work is available and no queue state requires
648 * special handling.
649 */
650 ;
651 }
652
653 if (work == NULL) {
654 /* Nothing's had a chance to add work since we took
655 * the lock, and we didn't find work nor got asked to
656 * stop. Just go to sleep: when something happens the
657 * work thread will be woken and we can check again.
658 */
659
660 (void)z_sched_wait(&lock, key, &queue->notifyq,
661 K_FOREVER, NULL);
662 continue;
663 }
664
665 k_spin_unlock(&lock, key);
666
667 __ASSERT_NO_MSG(handler != NULL);
668 handler(work);
669
670 /* Mark the work item as no longer running and deal
671 * with any cancellation issued while it was running.
672 * Clear the BUSY flag and optionally yield to prevent
673 * starving other threads.
674 */
675 key = k_spin_lock(&lock);
676
677 flag_clear(&work->flags, K_WORK_RUNNING_BIT);
678 if (flag_test(&work->flags, K_WORK_CANCELING_BIT)) {
679 finalize_cancel_locked(work);
680 }
681
682 flag_clear(&queue->flags, K_WORK_QUEUE_BUSY_BIT);
683 yield = !flag_test(&queue->flags, K_WORK_QUEUE_NO_YIELD_BIT);
684 k_spin_unlock(&lock, key);
685
686 /* Optionally yield to prevent the work queue from
687 * starving other threads.
688 */
689 if (yield) {
690 k_yield();
691 }
692 }
693 }
694
k_work_queue_init(struct k_work_q * queue)695 void k_work_queue_init(struct k_work_q *queue)
696 {
697 __ASSERT_NO_MSG(queue != NULL);
698
699 *queue = (struct k_work_q) {
700 .flags = 0,
701 };
702
703 SYS_PORT_TRACING_OBJ_INIT(k_work_queue, queue);
704 }
705
k_work_queue_start(struct k_work_q * queue,k_thread_stack_t * stack,size_t stack_size,int prio,const struct k_work_queue_config * cfg)706 void k_work_queue_start(struct k_work_q *queue,
707 k_thread_stack_t *stack,
708 size_t stack_size,
709 int prio,
710 const struct k_work_queue_config *cfg)
711 {
712 __ASSERT_NO_MSG(queue);
713 __ASSERT_NO_MSG(stack);
714 __ASSERT_NO_MSG(!flag_test(&queue->flags, K_WORK_QUEUE_STARTED_BIT));
715 uint32_t flags = K_WORK_QUEUE_STARTED;
716
717 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, start, queue);
718
719 sys_slist_init(&queue->pending);
720 z_waitq_init(&queue->notifyq);
721 z_waitq_init(&queue->drainq);
722
723 if ((cfg != NULL) && cfg->no_yield) {
724 flags |= K_WORK_QUEUE_NO_YIELD;
725 }
726
727 /* It hasn't actually been started yet, but all the state is in place
728 * so we can submit things and once the thread gets control it's ready
729 * to roll.
730 */
731 flags_set(&queue->flags, flags);
732
733 (void)k_thread_create(&queue->thread, stack, stack_size,
734 work_queue_main, queue, NULL, NULL,
735 prio, 0, K_FOREVER);
736
737 if ((cfg != NULL) && (cfg->name != NULL)) {
738 k_thread_name_set(&queue->thread, cfg->name);
739 }
740
741 k_thread_start(&queue->thread);
742
743 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, start, queue);
744 }
745
k_work_queue_drain(struct k_work_q * queue,bool plug)746 int k_work_queue_drain(struct k_work_q *queue,
747 bool plug)
748 {
749 __ASSERT_NO_MSG(queue);
750 __ASSERT_NO_MSG(!k_is_in_isr());
751
752 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, drain, queue);
753
754 int ret = 0;
755 k_spinlock_key_t key = k_spin_lock(&lock);
756
757 if (((flags_get(&queue->flags)
758 & (K_WORK_QUEUE_BUSY | K_WORK_QUEUE_DRAIN)) != 0U)
759 || plug
760 || !sys_slist_is_empty(&queue->pending)) {
761 flag_set(&queue->flags, K_WORK_QUEUE_DRAIN_BIT);
762 if (plug) {
763 flag_set(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT);
764 }
765
766 notify_queue_locked(queue);
767 ret = z_sched_wait(&lock, key, &queue->drainq,
768 K_FOREVER, NULL);
769 } else {
770 k_spin_unlock(&lock, key);
771 }
772
773 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, drain, queue, ret);
774
775 return ret;
776 }
777
k_work_queue_unplug(struct k_work_q * queue)778 int k_work_queue_unplug(struct k_work_q *queue)
779 {
780 __ASSERT_NO_MSG(queue);
781
782 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work_queue, unplug, queue);
783
784 int ret = -EALREADY;
785 k_spinlock_key_t key = k_spin_lock(&lock);
786
787 if (flag_test_and_clear(&queue->flags, K_WORK_QUEUE_PLUGGED_BIT)) {
788 ret = 0;
789 }
790
791 k_spin_unlock(&lock, key);
792
793 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work_queue, unplug, queue, ret);
794
795 return ret;
796 }
797
798 #ifdef CONFIG_SYS_CLOCK_EXISTS
799
800 /* Timeout handler for delayable work.
801 *
802 * Invoked by timeout infrastructure.
803 * Takes and releases work lock.
804 * Conditionally reschedules.
805 */
work_timeout(struct _timeout * to)806 static void work_timeout(struct _timeout *to)
807 {
808 struct k_work_delayable *dw
809 = CONTAINER_OF(to, struct k_work_delayable, timeout);
810 struct k_work *wp = &dw->work;
811 k_spinlock_key_t key = k_spin_lock(&lock);
812 struct k_work_q *queue = NULL;
813
814 /* If the work is still marked delayed (should be) then clear that
815 * state and submit it to the queue. If successful the queue will be
816 * notified of new work at the next reschedule point.
817 *
818 * If not successful there is no notification that the work has been
819 * abandoned. Sorry.
820 */
821 if (flag_test_and_clear(&wp->flags, K_WORK_DELAYED_BIT)) {
822 queue = dw->queue;
823 (void)submit_to_queue_locked(wp, &queue);
824 }
825
826 k_spin_unlock(&lock, key);
827 }
828
k_work_init_delayable(struct k_work_delayable * dwork,k_work_handler_t handler)829 void k_work_init_delayable(struct k_work_delayable *dwork,
830 k_work_handler_t handler)
831 {
832 __ASSERT_NO_MSG(dwork != NULL);
833 __ASSERT_NO_MSG(handler != NULL);
834
835 *dwork = (struct k_work_delayable){
836 .work = {
837 .handler = handler,
838 .flags = K_WORK_DELAYABLE,
839 },
840 };
841 z_init_timeout(&dwork->timeout);
842
843 SYS_PORT_TRACING_OBJ_INIT(k_work_delayable, dwork);
844 }
845
work_delayable_busy_get_locked(const struct k_work_delayable * dwork)846 static inline int work_delayable_busy_get_locked(const struct k_work_delayable *dwork)
847 {
848 return flags_get(&dwork->work.flags) & K_WORK_MASK;
849 }
850
k_work_delayable_busy_get(const struct k_work_delayable * dwork)851 int k_work_delayable_busy_get(const struct k_work_delayable *dwork)
852 {
853 k_spinlock_key_t key = k_spin_lock(&lock);
854 int ret = work_delayable_busy_get_locked(dwork);
855
856 k_spin_unlock(&lock, key);
857 return ret;
858 }
859
860 /* Attempt to schedule a work item for future (maybe immediate)
861 * submission.
862 *
863 * Invoked with work lock held.
864 *
865 * See also submit_to_queue_locked(), which implements this for a no-wait
866 * delay.
867 *
868 * Invoked with work lock held.
869 *
870 * @param queuep pointer to a pointer to a queue. On input this
871 * should dereference to the proposed queue (which may be null); after
872 * completion it will be null if the work was not submitted or if
873 * submitted will reference the queue it was submitted to. That may
874 * or may not be the queue provided on input.
875 *
876 * @param dwork the delayed work structure
877 *
878 * @param delay the delay to use before scheduling.
879 *
880 * @retval from submit_to_queue_locked() if delay is K_NO_WAIT; otherwise
881 * @retval 1 to indicate successfully scheduled.
882 */
schedule_for_queue_locked(struct k_work_q ** queuep,struct k_work_delayable * dwork,k_timeout_t delay)883 static int schedule_for_queue_locked(struct k_work_q **queuep,
884 struct k_work_delayable *dwork,
885 k_timeout_t delay)
886 {
887 int ret = 1;
888 struct k_work *work = &dwork->work;
889
890 if (K_TIMEOUT_EQ(delay, K_NO_WAIT)) {
891 return submit_to_queue_locked(work, queuep);
892 }
893
894 flag_set(&work->flags, K_WORK_DELAYED_BIT);
895 dwork->queue = *queuep;
896
897 /* Add timeout */
898 z_add_timeout(&dwork->timeout, work_timeout, delay);
899
900 return ret;
901 }
902
903 /* Unschedule delayable work.
904 *
905 * If the work is delayed, cancel the timeout and clear the delayed
906 * flag.
907 *
908 * Invoked with work lock held.
909 *
910 * @param dwork pointer to delayable work structure.
911 *
912 * @return true if and only if work had been delayed so the timeout
913 * was cancelled.
914 */
unschedule_locked(struct k_work_delayable * dwork)915 static inline bool unschedule_locked(struct k_work_delayable *dwork)
916 {
917 bool ret = false;
918 struct k_work *work = &dwork->work;
919
920 /* If scheduled, try to cancel. If it fails, that means the
921 * callback has been dequeued and will inevitably run (or has
922 * already run), so treat that as "undelayed" and return
923 * false.
924 */
925 if (flag_test_and_clear(&work->flags, K_WORK_DELAYED_BIT)) {
926 ret = z_abort_timeout(&dwork->timeout) == 0;
927 }
928
929 return ret;
930 }
931
932 /* Full cancellation of a delayable work item.
933 *
934 * Unschedules the delayed part then delegates to standard work
935 * cancellation.
936 *
937 * Invoked with work lock held.
938 *
939 * @param dwork delayable work item
940 *
941 * @return k_work_busy_get() flags
942 */
cancel_delayable_async_locked(struct k_work_delayable * dwork)943 static int cancel_delayable_async_locked(struct k_work_delayable *dwork)
944 {
945 (void)unschedule_locked(dwork);
946
947 return cancel_async_locked(&dwork->work);
948 }
949
k_work_schedule_for_queue(struct k_work_q * queue,struct k_work_delayable * dwork,k_timeout_t delay)950 int k_work_schedule_for_queue(struct k_work_q *queue,
951 struct k_work_delayable *dwork,
952 k_timeout_t delay)
953 {
954 __ASSERT_NO_MSG(dwork != NULL);
955
956 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, schedule_for_queue, queue, dwork, delay);
957
958 struct k_work *work = &dwork->work;
959 int ret = 0;
960 k_spinlock_key_t key = k_spin_lock(&lock);
961
962 /* Schedule the work item if it's idle or running. */
963 if ((work_busy_get_locked(work) & ~K_WORK_RUNNING) == 0U) {
964 ret = schedule_for_queue_locked(&queue, dwork, delay);
965 }
966
967 k_spin_unlock(&lock, key);
968
969 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, schedule_for_queue, queue, dwork, delay, ret);
970
971 return ret;
972 }
973
k_work_schedule(struct k_work_delayable * dwork,k_timeout_t delay)974 int k_work_schedule(struct k_work_delayable *dwork,
975 k_timeout_t delay)
976 {
977 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, schedule, dwork, delay);
978
979 int ret = k_work_schedule_for_queue(&k_sys_work_q, dwork, delay);
980
981 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, schedule, dwork, delay, ret);
982
983 return ret;
984 }
985
k_work_reschedule_for_queue(struct k_work_q * queue,struct k_work_delayable * dwork,k_timeout_t delay)986 int k_work_reschedule_for_queue(struct k_work_q *queue,
987 struct k_work_delayable *dwork,
988 k_timeout_t delay)
989 {
990 __ASSERT_NO_MSG(dwork != NULL);
991
992 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, reschedule_for_queue, queue, dwork, delay);
993
994 int ret = 0;
995 k_spinlock_key_t key = k_spin_lock(&lock);
996
997 /* Remove any active scheduling. */
998 (void)unschedule_locked(dwork);
999
1000 /* Schedule the work item with the new parameters. */
1001 ret = schedule_for_queue_locked(&queue, dwork, delay);
1002
1003 k_spin_unlock(&lock, key);
1004
1005 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, reschedule_for_queue, queue, dwork, delay, ret);
1006
1007 return ret;
1008 }
1009
k_work_reschedule(struct k_work_delayable * dwork,k_timeout_t delay)1010 int k_work_reschedule(struct k_work_delayable *dwork,
1011 k_timeout_t delay)
1012 {
1013 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, reschedule, dwork, delay);
1014
1015 int ret = k_work_reschedule_for_queue(&k_sys_work_q, dwork, delay);
1016
1017 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, reschedule, dwork, delay, ret);
1018
1019 return ret;
1020 }
1021
k_work_cancel_delayable(struct k_work_delayable * dwork)1022 int k_work_cancel_delayable(struct k_work_delayable *dwork)
1023 {
1024 __ASSERT_NO_MSG(dwork != NULL);
1025
1026 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel_delayable, dwork);
1027
1028 k_spinlock_key_t key = k_spin_lock(&lock);
1029 int ret = cancel_delayable_async_locked(dwork);
1030
1031 k_spin_unlock(&lock, key);
1032
1033 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel_delayable, dwork, ret);
1034
1035 return ret;
1036 }
1037
k_work_cancel_delayable_sync(struct k_work_delayable * dwork,struct k_work_sync * sync)1038 bool k_work_cancel_delayable_sync(struct k_work_delayable *dwork,
1039 struct k_work_sync *sync)
1040 {
1041 __ASSERT_NO_MSG(dwork != NULL);
1042 __ASSERT_NO_MSG(sync != NULL);
1043 __ASSERT_NO_MSG(!k_is_in_isr());
1044 #ifdef CONFIG_KERNEL_COHERENCE
1045 __ASSERT_NO_MSG(arch_mem_coherent(sync));
1046 #endif
1047
1048 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, cancel_delayable_sync, dwork, sync);
1049
1050 struct z_work_canceller *canceller = &sync->canceller;
1051 k_spinlock_key_t key = k_spin_lock(&lock);
1052 bool pending = (work_delayable_busy_get_locked(dwork) != 0U);
1053 bool need_wait = false;
1054
1055 if (pending) {
1056 (void)cancel_delayable_async_locked(dwork);
1057 need_wait = cancel_sync_locked(&dwork->work, canceller);
1058 }
1059
1060 k_spin_unlock(&lock, key);
1061
1062 if (need_wait) {
1063 k_sem_take(&canceller->sem, K_FOREVER);
1064 }
1065
1066 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, cancel_delayable_sync, dwork, sync, pending);
1067 return pending;
1068 }
1069
k_work_flush_delayable(struct k_work_delayable * dwork,struct k_work_sync * sync)1070 bool k_work_flush_delayable(struct k_work_delayable *dwork,
1071 struct k_work_sync *sync)
1072 {
1073 __ASSERT_NO_MSG(dwork != NULL);
1074 __ASSERT_NO_MSG(sync != NULL);
1075 __ASSERT_NO_MSG(!k_is_in_isr());
1076 #ifdef CONFIG_KERNEL_COHERENCE
1077 __ASSERT_NO_MSG(arch_mem_coherent(sync));
1078 #endif
1079
1080 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_work, flush_delayable, dwork, sync);
1081
1082 struct k_work *work = &dwork->work;
1083 struct z_work_flusher *flusher = &sync->flusher;
1084 k_spinlock_key_t key = k_spin_lock(&lock);
1085
1086 /* If it's idle release the lock and return immediately. */
1087 if (work_busy_get_locked(work) == 0U) {
1088 k_spin_unlock(&lock, key);
1089
1090 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, flush_delayable, dwork, sync, false);
1091
1092 return false;
1093 }
1094
1095 /* If unscheduling did something then submit it. Ignore a
1096 * failed submission (e.g. when cancelling).
1097 */
1098 if (unschedule_locked(dwork)) {
1099 struct k_work_q *queue = dwork->queue;
1100
1101 (void)submit_to_queue_locked(work, &queue);
1102 }
1103
1104 /* Wait for it to finish */
1105 bool need_flush = work_flush_locked(work, flusher);
1106
1107 k_spin_unlock(&lock, key);
1108
1109 /* If necessary wait until the flusher item completes */
1110 if (need_flush) {
1111 k_sem_take(&flusher->sem, K_FOREVER);
1112 }
1113
1114 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_work, flush_delayable, dwork, sync, need_flush);
1115
1116 return need_flush;
1117 }
1118
1119 #endif /* CONFIG_SYS_CLOCK_EXISTS */
1120