1 /*
2  * Copyright (c) 2010-2011 Dmitry Vyukov
3  * Copyright (c) 2023 Intel Corporation
4  *
5  * SPDX-License-Identifier: Apache-2.0
6  */
7 
8 #ifndef ZEPHYR_SYS_MPSC_LOCKFREE_H_
9 #define ZEPHYR_SYS_MPSC_LOCKFREE_H_
10 
11 #include <stdint.h>
12 #include <stdbool.h>
13 #include <zephyr/sys/atomic.h>
14 #include <zephyr/kernel.h>
15 
16 #ifdef __cplusplus
17 extern "C" {
18 #endif
19 
20 /**
21  * @brief Multiple Producer Single Consumer (MPSC) Lockfree Queue API
22  * @defgroup mpsc_lockfree MPSC Lockfree Queue API
23  * @ingroup datastructure_apis
24  * @{
25  */
26 
27 /**
28  * @file mpsc_lockfree.h
29  *
30  * @brief A wait-free intrusive multi producer single consumer (MPSC) queue using
31  * a singly linked list. Ordering is First-In-First-Out.
32  *
33  * Based on the well known and widely used wait-free MPSC queue described by
34  * Dmitry Vyukov with some slight changes to account for needs of an
35  * RTOS on a variety of archs. Both consumer and producer are wait free. No CAS
36  * loop or lock is needed.
37  *
38  * An MPSC queue is safe to produce or consume in an ISR with O(1) push/pop.
39  *
40  * @warning MPSC is *not* safe to consume in multiple execution contexts.
41  */
42 
43 /*
44  * On single core systems atomics are unnecessary
45  * and cause a lot of unnecessary cache invalidation
46  *
47  * Using volatile to at least ensure memory is read/written
48  * by the compiler generated op codes is enough.
49  *
50  * On SMP atomics *must* be used to ensure the pointers
51  * are updated in the correct order.
52  */
53 #if defined(CONFIG_SMP)
54 
55 typedef atomic_ptr_t mpsc_ptr_t;
56 
57 #define mpsc_ptr_get(ptr)          atomic_ptr_get(&(ptr))
58 #define mpsc_ptr_set(ptr, val)     atomic_ptr_set(&(ptr), val)
59 #define mpsc_ptr_set_get(ptr, val) atomic_ptr_set(&(ptr), val)
60 
61 #else /* defined(CONFIG_SMP) */
62 
63 typedef struct mpsc_node *mpsc_ptr_t;
64 
65 #define mpsc_ptr_get(ptr)      ptr
66 #define mpsc_ptr_set(ptr, val) ptr = val
67 #define mpsc_ptr_set_get(ptr, val)                                                                 \
68 	({                                                                                         \
69 		mpsc_ptr_t tmp = ptr;                                                              \
70 		ptr = val;                                                                         \
71 		tmp;                                                                               \
72 	})
73 
74 #endif /* defined(CONFIG_SMP) */
75 
76 /**
77  * @brief Queue member
78  */
79 struct mpsc_node {
80 	mpsc_ptr_t next;
81 };
82 
83 /**
84  * @brief MPSC Queue
85  */
86 struct mpsc {
87 	mpsc_ptr_t head;
88 	struct mpsc_node *tail;
89 	struct mpsc_node stub;
90 };
91 
92 /**
93  * @brief Static initializer for a mpsc queue
94  *
95  * Since the queue is
96  *
97  * @param symbol name of the queue
98  */
99 #define MPSC_INIT(symbol)                                                                          \
100 	{                                                                                          \
101 		.head = (struct mpsc_node *)&symbol.stub,                                          \
102 		.tail = (struct mpsc_node *)&symbol.stub,                                          \
103 		.stub = {                                                                          \
104 			.next = NULL,                                                              \
105 		},                                                                                 \
106 	}
107 
108 /**
109  * @brief Initialize queue
110  *
111  * @param q Queue to initialize or reset
112  */
mpsc_init(struct mpsc * q)113 static inline void mpsc_init(struct mpsc *q)
114 {
115 	mpsc_ptr_set(q->head, &q->stub);
116 	q->tail = &q->stub;
117 	mpsc_ptr_set(q->stub.next, NULL);
118 }
119 
120 /**
121  * @brief Push a node
122  *
123  * @param q Queue to push the node to
124  * @param n Node to push into the queue
125  */
mpsc_push(struct mpsc * q,struct mpsc_node * n)126 static ALWAYS_INLINE void mpsc_push(struct mpsc *q, struct mpsc_node *n)
127 {
128 	struct mpsc_node *prev;
129 	int key;
130 
131 	mpsc_ptr_set(n->next, NULL);
132 
133 	key = arch_irq_lock();
134 	prev = (struct mpsc_node *)mpsc_ptr_set_get(q->head, n);
135 	mpsc_ptr_set(prev->next, n);
136 	arch_irq_unlock(key);
137 }
138 
139 /**
140  * @brief Pop a node off of the list
141  *
142  * @retval NULL When no node is available
143  * @retval node When node is available
144  */
mpsc_pop(struct mpsc * q)145 static inline struct mpsc_node *mpsc_pop(struct mpsc *q)
146 {
147 	struct mpsc_node *head;
148 	struct mpsc_node *tail = q->tail;
149 	struct mpsc_node *next = (struct mpsc_node *)mpsc_ptr_get(tail->next);
150 
151 	/* Skip over the stub/sentinel */
152 	if (tail == &q->stub) {
153 		if (next == NULL) {
154 			return NULL;
155 		}
156 
157 		q->tail = next;
158 		tail = next;
159 		next = (struct mpsc_node *)mpsc_ptr_get(next->next);
160 	}
161 
162 	/* If next is non-NULL then a valid node is found, return it */
163 	if (next != NULL) {
164 		q->tail = next;
165 		return tail;
166 	}
167 
168 	head = (struct mpsc_node *)mpsc_ptr_get(q->head);
169 
170 	/* If next is NULL, and the tail != HEAD then the queue has pending
171 	 * updates that can't yet be accessed.
172 	 */
173 	if (tail != head) {
174 		return NULL;
175 	}
176 
177 	mpsc_push(q, &q->stub);
178 
179 	next = (struct mpsc_node *)mpsc_ptr_get(tail->next);
180 
181 	if (next != NULL) {
182 		q->tail = next;
183 		return tail;
184 	}
185 
186 	return NULL;
187 }
188 
189 /**
190  * @}
191  */
192 
193 #ifdef __cplusplus
194 }
195 #endif
196 
197 #endif /* ZEPHYR_SYS_MPSC_LOCKFREE_H_ */
198