1 /*
2 * Copyright (c) 2010-2011 Dmitry Vyukov
3 * Copyright (c) 2023 Intel Corporation
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 */
7
8 #ifndef ZEPHYR_SYS_MPSC_LOCKFREE_H_
9 #define ZEPHYR_SYS_MPSC_LOCKFREE_H_
10
11 #include <stdint.h>
12 #include <stdbool.h>
13 #include <zephyr/sys/atomic.h>
14 #include <zephyr/kernel.h>
15
16 #ifdef __cplusplus
17 extern "C" {
18 #endif
19
20 /**
21 * @brief Multiple Producer Single Consumer (MPSC) Lockfree Queue API
22 * @defgroup mpsc_lockfree MPSC Lockfree Queue API
23 * @ingroup datastructure_apis
24 * @{
25 */
26
27 /**
28 * @file mpsc_lockfree.h
29 *
30 * @brief A wait-free intrusive multi producer single consumer (MPSC) queue using
31 * a singly linked list. Ordering is First-In-First-Out.
32 *
33 * Based on the well known and widely used wait-free MPSC queue described by
34 * Dmitry Vyukov with some slight changes to account for needs of an
35 * RTOS on a variety of archs. Both consumer and producer are wait free. No CAS
36 * loop or lock is needed.
37 *
38 * An MPSC queue is safe to produce or consume in an ISR with O(1) push/pop.
39 *
40 * @warning MPSC is *not* safe to consume in multiple execution contexts.
41 */
42
43 /*
44 * On single core systems atomics are unnecessary
45 * and cause a lot of unnecessary cache invalidation
46 *
47 * Using volatile to at least ensure memory is read/written
48 * by the compiler generated op codes is enough.
49 *
50 * On SMP atomics *must* be used to ensure the pointers
51 * are updated in the correct order.
52 */
53 #if defined(CONFIG_SMP)
54
55 typedef atomic_ptr_t mpsc_ptr_t;
56
57 #define mpsc_ptr_get(ptr) atomic_ptr_get(&(ptr))
58 #define mpsc_ptr_set(ptr, val) atomic_ptr_set(&(ptr), val)
59 #define mpsc_ptr_set_get(ptr, val) atomic_ptr_set(&(ptr), val)
60
61 #else /* defined(CONFIG_SMP) */
62
63 typedef struct mpsc_node *mpsc_ptr_t;
64
65 #define mpsc_ptr_get(ptr) ptr
66 #define mpsc_ptr_set(ptr, val) ptr = val
67 #define mpsc_ptr_set_get(ptr, val) \
68 ({ \
69 mpsc_ptr_t tmp = ptr; \
70 ptr = val; \
71 tmp; \
72 })
73
74 #endif /* defined(CONFIG_SMP) */
75
76 /**
77 * @brief Queue member
78 */
79 struct mpsc_node {
80 mpsc_ptr_t next;
81 };
82
83 /**
84 * @brief MPSC Queue
85 */
86 struct mpsc {
87 mpsc_ptr_t head;
88 struct mpsc_node *tail;
89 struct mpsc_node stub;
90 };
91
92 /**
93 * @brief Static initializer for a mpsc queue
94 *
95 * Since the queue is
96 *
97 * @param symbol name of the queue
98 */
99 #define MPSC_INIT(symbol) \
100 { \
101 .head = (struct mpsc_node *)&symbol.stub, \
102 .tail = (struct mpsc_node *)&symbol.stub, \
103 .stub = { \
104 .next = NULL, \
105 }, \
106 }
107
108 /**
109 * @brief Initialize queue
110 *
111 * @param q Queue to initialize or reset
112 */
mpsc_init(struct mpsc * q)113 static inline void mpsc_init(struct mpsc *q)
114 {
115 mpsc_ptr_set(q->head, &q->stub);
116 q->tail = &q->stub;
117 mpsc_ptr_set(q->stub.next, NULL);
118 }
119
120 /**
121 * @brief Push a node
122 *
123 * @param q Queue to push the node to
124 * @param n Node to push into the queue
125 */
mpsc_push(struct mpsc * q,struct mpsc_node * n)126 static ALWAYS_INLINE void mpsc_push(struct mpsc *q, struct mpsc_node *n)
127 {
128 struct mpsc_node *prev;
129 int key;
130
131 mpsc_ptr_set(n->next, NULL);
132
133 key = arch_irq_lock();
134 prev = (struct mpsc_node *)mpsc_ptr_set_get(q->head, n);
135 mpsc_ptr_set(prev->next, n);
136 arch_irq_unlock(key);
137 }
138
139 /**
140 * @brief Pop a node off of the list
141 *
142 * @retval NULL When no node is available
143 * @retval node When node is available
144 */
mpsc_pop(struct mpsc * q)145 static inline struct mpsc_node *mpsc_pop(struct mpsc *q)
146 {
147 struct mpsc_node *head;
148 struct mpsc_node *tail = q->tail;
149 struct mpsc_node *next = (struct mpsc_node *)mpsc_ptr_get(tail->next);
150
151 /* Skip over the stub/sentinel */
152 if (tail == &q->stub) {
153 if (next == NULL) {
154 return NULL;
155 }
156
157 q->tail = next;
158 tail = next;
159 next = (struct mpsc_node *)mpsc_ptr_get(next->next);
160 }
161
162 /* If next is non-NULL then a valid node is found, return it */
163 if (next != NULL) {
164 q->tail = next;
165 return tail;
166 }
167
168 head = (struct mpsc_node *)mpsc_ptr_get(q->head);
169
170 /* If next is NULL, and the tail != HEAD then the queue has pending
171 * updates that can't yet be accessed.
172 */
173 if (tail != head) {
174 return NULL;
175 }
176
177 mpsc_push(q, &q->stub);
178
179 next = (struct mpsc_node *)mpsc_ptr_get(tail->next);
180
181 if (next != NULL) {
182 q->tail = next;
183 return tail;
184 }
185
186 return NULL;
187 }
188
189 /**
190 * @}
191 */
192
193 #ifdef __cplusplus
194 }
195 #endif
196
197 #endif /* ZEPHYR_SYS_MPSC_LOCKFREE_H_ */
198