1 /*
2  * Copyright (c) 2023 Intel Corporation
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #ifndef ZEPHYR_SYS_SPSC_LOCKFREE_H_
8 #define ZEPHYR_SYS_SPSC_LOCKFREE_H_
9 
10 #include <stdint.h>
11 #include <stdbool.h>
12 #include <zephyr/toolchain/common.h>
13 #include <zephyr/sys/atomic.h>
14 #include <zephyr/sys/util_macro.h>
15 
16 /**
17  * @brief Single Producer Single Consumer (SPSC) Lockfree Queue API
18  * @defgroup spsc_lockfree SPSC API
19  * @ingroup datastructure_apis
20  * @{
21  */
22 
23 /**
24  * @file spsc_lockfree.h
25  *
26  * @brief A lock-free and type safe power of 2 fixed sized single producer
27  * single consumer (SPSC) queue using a ringbuffer and atomics to ensure
28  * coherency.
29  *
30  * This SPSC queue implementation works on an array which wraps using a power of
31  * two size and uses a bit mask to perform a modulus. Atomics are used to allow
32  * single-producer single-consumer safe semantics without locks. Elements are
33  * expected to be of a fixed size. The API is type safe as the underlying buffer
34  * is typed and all usage is done through macros.
35  *
36  * An SPSC queue may be declared on a stack or statically and work as intended so
37  * long as its lifetime outlives any usage. Static declarations should be the
38  * preferred method as stack . It is meant to be a shared object between two
39  * execution contexts (ISR and a thread for example)
40  *
41  * An SPSC queue is safe to produce or consume in an ISR with O(1) push/pull.
42  *
43  * @warning SPSC is *not* safe to produce or consume in multiple execution
44  * contexts.
45  *
46  * Safe usage would be, where A and B are unique execution contexts:
47  * 1. ISR A producing and a Thread B consuming.
48  * 2. Thread A producing and ISR B consuming.
49  * 3. Thread A producing and Thread B consuming.
50  * 4. ISR A producing and ISR B consuming.
51  */
52 
53 /**
54  * @private
55  * @brief Common SPSC attributes
56  *
57  * @warning Not to be manipulated without the macros!
58  */
59 struct spsc {
60 	/* private value only the producer thread should mutate */
61 	unsigned long acquire;
62 
63 	/* private value only the consumer thread should mutate */
64 	unsigned long consume;
65 
66 	/* producer mutable, consumer readable */
67 	atomic_t in;
68 
69 	/* consumer mutable, producer readable */
70 	atomic_t out;
71 
72 	/* mask used to automatically wrap values */
73 	const unsigned long mask;
74 };
75 
76 /**
77  * @brief Statically initialize an spsc
78  *
79  * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
80  * @param buf Buffer pointer
81  */
82 #define SPSC_INITIALIZER(sz, buf)                                                                  \
83 	{                                                                                          \
84 		._spsc =                                                                           \
85 			{                                                                          \
86 				.acquire = 0,                                                      \
87 				.consume = 0,                                                      \
88 				.in = ATOMIC_INIT(0),                                              \
89 				.out = ATOMIC_INIT(0),                                             \
90 				.mask = sz - 1,                                                    \
91 			},                                                                         \
92 		.buffer = buf,                                                                     \
93 	}
94 
95 /**
96  * @brief Declare an anonymous struct type for an spsc
97  *
98  * @param name Name of the spsc symbol to be provided
99  * @param type Type stored in the spsc
100  */
101 #define SPSC_DECLARE(name, type)                                                                   \
102 	static struct spsc_##name {                                                                \
103 		struct spsc _spsc;                                                                 \
104 		type * const buffer;                                                               \
105 	}
106 
107 /**
108  * @brief Define an spsc with a fixed size
109  *
110  * @param name Name of the spsc symbol to be provided
111  * @param type Type stored in the spsc
112  * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8)
113  */
114 #define SPSC_DEFINE(name, type, sz)                                                                \
115 	BUILD_ASSERT(IS_POWER_OF_TWO(sz));                                                         \
116 	static type __spsc_buf_##name[sz];                                                         \
117 	SPSC_DECLARE(name, type) name = SPSC_INITIALIZER(sz, __spsc_buf_##name);
118 
119 /**
120  * @brief Size of the SPSC queue
121  *
122  * @param spsc SPSC reference
123  */
124 #define spsc_size(spsc) ((spsc)->_spsc.mask + 1)
125 
126 /**
127  * @private
128  * @brief A number modulo the spsc size, assumes power of 2
129  *
130  * @param spsc SPSC reference
131  * @param i Value to modulo to the size of the spsc
132  */
133 #define z_spsc_mask(spsc, i) ((i) & (spsc)->_spsc.mask)
134 
135 /**
136  * @private
137  * @brief Load the current "in" index from the spsc as an unsigned long
138  */
139 #define z_spsc_in(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.in)
140 
141 /**
142  * @private
143  * @brief Load the current "out" index from the spsc as an unsigned long
144  */
145 #define z_spsc_out(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.out)
146 
147 /**
148  * @brief Initialize/reset a spsc such that its empty
149  *
150  * Note that this is not safe to do while being used in a producer/consumer
151  * situation with multiple calling contexts (isrs/threads).
152  *
153  * @param spsc SPSC to initialize/reset
154  */
155 #define spsc_reset(spsc)                                                                           \
156 	({                                                                                         \
157 		(spsc)->_spsc.consume = 0;                                                         \
158 		(spsc)->_spsc.acquire = 0;                                                         \
159 		atomic_set(&(spsc)->_spsc.in, 0);                                                  \
160 		atomic_set(&(spsc)->_spsc.out, 0);                                                 \
161 	})
162 
163 /**
164  * @brief Acquire an element to produce from the SPSC
165  *
166  * @param spsc SPSC to acquire an element from for producing
167  *
168  * @return A pointer to the acquired element or null if the spsc is full
169  */
170 #define spsc_acquire(spsc)                                                                         \
171 	({                                                                                         \
172 		unsigned long idx = z_spsc_in(spsc) + (spsc)->_spsc.acquire;                       \
173 		bool spsc_acq = (idx - z_spsc_out(spsc)) < spsc_size(spsc);                        \
174 		if (spsc_acq) {                                                                    \
175 			(spsc)->_spsc.acquire += 1;                                                \
176 		}                                                                                  \
177 		spsc_acq ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL;                       \
178 	})
179 
180 /**
181  * @brief Produce one previously acquired element to the SPSC
182  *
183  * This makes one element available to the consumer immediately
184  *
185  * @param spsc SPSC to produce the previously acquired element or do nothing
186  */
187 #define spsc_produce(spsc)                                                                         \
188 	({                                                                                         \
189 		if ((spsc)->_spsc.acquire > 0) {                                                   \
190 			(spsc)->_spsc.acquire -= 1;                                                \
191 			atomic_add(&(spsc)->_spsc.in, 1);                                          \
192 		}                                                                                  \
193 	})
194 
195 /**
196  * @brief Produce all previously acquired elements to the SPSC
197  *
198  * This makes all previous acquired elements available to the consumer
199  * immediately
200  *
201  * @param spsc SPSC to produce all previously acquired elements or do nothing
202  */
203 #define spsc_produce_all(spsc)                                                                     \
204 	({                                                                                         \
205 		if ((spsc)->_spsc.acquire > 0) {                                                   \
206 			unsigned long acquired = (spsc)->_spsc.acquire;                            \
207 			(spsc)->_spsc.acquire = 0;                                                 \
208 			atomic_add(&(spsc)->_spsc.in, acquired);                                   \
209 		}                                                                                  \
210 	})
211 
212 /**
213  * @brief Drop all previously acquired elements
214  *
215  * This makes all previous acquired elements available to be acquired again
216  *
217  * @param spsc SPSC to drop all previously acquired elements or do nothing
218  */
219 #define spsc_drop_all(spsc)                                                                        \
220 	do {                                                                                       \
221 		(spsc)->_spsc.acquire = 0;                                                         \
222 	} while (false)
223 
224 /**
225  * @brief Consume an element from the spsc
226  *
227  * @param spsc Spsc to consume from
228  *
229  * @return Pointer to element or null if no consumable elements left
230  */
231 #define spsc_consume(spsc)                                                                         \
232 	({                                                                                         \
233 		unsigned long idx = z_spsc_out(spsc) + (spsc)->_spsc.consume;                      \
234 		bool has_consumable = (idx != z_spsc_in(spsc));                                    \
235 		if (has_consumable) {                                                              \
236 			(spsc)->_spsc.consume += 1;                                                \
237 		}                                                                                  \
238 		has_consumable ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL;                 \
239 	})
240 
241 /**
242  * @brief Release a consumed element
243  *
244  * @param spsc SPSC to release consumed element or do nothing
245  */
246 #define spsc_release(spsc)                                                                         \
247 	({                                                                                         \
248 		if ((spsc)->_spsc.consume > 0) {                                                   \
249 			(spsc)->_spsc.consume -= 1;                                                \
250 			atomic_add(&(spsc)->_spsc.out, 1);                                         \
251 		}                                                                                  \
252 	})
253 
254 /**
255  * @brief Release all consumed elements
256  *
257  * @param spsc SPSC to release consumed elements or do nothing
258  */
259 #define spsc_release_all(spsc)                                                                     \
260 	({                                                                                         \
261 		if ((spsc)->_spsc.consume > 0) {                                                   \
262 			unsigned long consumed = (spsc)->_spsc.consume;                            \
263 			(spsc)->_spsc.consume = 0;                                                 \
264 			atomic_add(&(spsc)->_spsc.out, consumed);                                  \
265 		}                                                                                  \
266 	})
267 
268 /**
269  * @brief Count of acquirable in spsc
270  *
271  * @param spsc SPSC to get item count for
272  */
273 #define spsc_acquirable(spsc)                                                                      \
274 	({ (((spsc)->_spsc.in + (spsc)->_spsc.acquire) - (spsc)->_spsc.out) - spsc_size(spsc); })
275 
276 /**
277  * @brief Count of consumables in spsc
278  *
279  * @param spsc SPSC to get item count for
280  */
281 #define spsc_consumable(spsc) ({ (spsc)->_spsc.in - (spsc)->_spsc.out - (spsc)->_spsc.consume; })
282 
283 /**
284  * @brief Peek at the first available item in queue
285  *
286  * @param spsc Spsc to peek into
287  *
288  * @return Pointer to element or null if no consumable elements left
289  */
290 #define spsc_peek(spsc)                                                                            \
291 	({                                                                                         \
292 		unsigned long idx = z_spsc_out(spsc) + (spsc)->_spsc.consume;                      \
293 		bool has_consumable = (idx != z_spsc_in(spsc));                                    \
294 		has_consumable ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL;                 \
295 	})
296 
297 /**
298  * @brief Peek at the next item in the queue from a given one
299  *
300  *
301  * @param spsc SPSC to peek at
302  * @param item Pointer to an item in the queue
303  *
304  * @return Pointer to element or null if none left
305  */
306 #define spsc_next(spsc, item)                                                                      \
307 	({                                                                                         \
308 		unsigned long idx = ((item) - (spsc)->buffer);                                     \
309 		bool has_next =                                                                    \
310 			z_spsc_mask(spsc, (idx + 1)) != (z_spsc_mask(spsc, z_spsc_in(spsc)));      \
311 		has_next ? &((spsc)->buffer[z_spsc_mask((spsc), idx + 1)]) : NULL;                 \
312 	})
313 
314 /**
315  * @brief Get the previous item in the queue from a given one
316  *
317  * @param spsc SPSC to peek at
318  * @param item Pointer to an item in the queue
319  *
320  * @return Pointer to element or null if none left
321  */
322 #define spsc_prev(spsc, item)                                                                      \
323 	({                                                                                         \
324 		unsigned long idx = ((item) - &(spsc)->buffer[0]) / sizeof((spsc)->buffer[0]);     \
325 		bool has_prev = idx != z_spsc_mask(spsc, z_spsc_out(spsc));                        \
326 		has_prev ? &((spsc)->buffer[z_spsc_mask(spsc, idx - 1)]) : NULL;                   \
327 	})
328 
329 /**
330  * @}
331  */
332 
333 #endif /* ZEPHYR_SYS_SPSC_LOCKFREE_H_ */
334