1 /* 2 * Copyright (c) 2023 Intel Corporation 3 * 4 * SPDX-License-Identifier: Apache-2.0 5 */ 6 7 #ifndef ZEPHYR_SYS_SPSC_LOCKFREE_H_ 8 #define ZEPHYR_SYS_SPSC_LOCKFREE_H_ 9 10 #include <stdint.h> 11 #include <stdbool.h> 12 #include <zephyr/toolchain/common.h> 13 #include <zephyr/sys/atomic.h> 14 #include <zephyr/sys/util_macro.h> 15 16 /** 17 * @brief Single Producer Single Consumer (SPSC) Lockfree Queue API 18 * @defgroup spsc_lockfree SPSC API 19 * @ingroup datastructure_apis 20 * @{ 21 */ 22 23 /** 24 * @file spsc_lockfree.h 25 * 26 * @brief A lock-free and type safe power of 2 fixed sized single producer 27 * single consumer (SPSC) queue using a ringbuffer and atomics to ensure 28 * coherency. 29 * 30 * This SPSC queue implementation works on an array which wraps using a power of 31 * two size and uses a bit mask to perform a modulus. Atomics are used to allow 32 * single-producer single-consumer safe semantics without locks. Elements are 33 * expected to be of a fixed size. The API is type safe as the underlying buffer 34 * is typed and all usage is done through macros. 35 * 36 * An SPSC queue may be declared on a stack or statically and work as intended so 37 * long as its lifetime outlives any usage. Static declarations should be the 38 * preferred method as stack . It is meant to be a shared object between two 39 * execution contexts (ISR and a thread for example) 40 * 41 * An SPSC queue is safe to produce or consume in an ISR with O(1) push/pull. 42 * 43 * @warning SPSC is *not* safe to produce or consume in multiple execution 44 * contexts. 45 * 46 * Safe usage would be, where A and B are unique execution contexts: 47 * 1. ISR A producing and a Thread B consuming. 48 * 2. Thread A producing and ISR B consuming. 49 * 3. Thread A producing and Thread B consuming. 50 * 4. ISR A producing and ISR B consuming. 51 */ 52 53 /** 54 * @private 55 * @brief Common SPSC attributes 56 * 57 * @warning Not to be manipulated without the macros! 58 */ 59 struct spsc { 60 /* private value only the producer thread should mutate */ 61 unsigned long acquire; 62 63 /* private value only the consumer thread should mutate */ 64 unsigned long consume; 65 66 /* producer mutable, consumer readable */ 67 atomic_t in; 68 69 /* consumer mutable, producer readable */ 70 atomic_t out; 71 72 /* mask used to automatically wrap values */ 73 const unsigned long mask; 74 }; 75 76 /** 77 * @brief Statically initialize an spsc 78 * 79 * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8) 80 * @param buf Buffer pointer 81 */ 82 #define SPSC_INITIALIZER(sz, buf) \ 83 { \ 84 ._spsc = \ 85 { \ 86 .acquire = 0, \ 87 .consume = 0, \ 88 .in = ATOMIC_INIT(0), \ 89 .out = ATOMIC_INIT(0), \ 90 .mask = sz - 1, \ 91 }, \ 92 .buffer = buf, \ 93 } 94 95 /** 96 * @brief Declare an anonymous struct type for an spsc 97 * 98 * @param name Name of the spsc symbol to be provided 99 * @param type Type stored in the spsc 100 */ 101 #define SPSC_DECLARE(name, type) \ 102 static struct spsc_##name { \ 103 struct spsc _spsc; \ 104 type * const buffer; \ 105 } 106 107 /** 108 * @brief Define an spsc with a fixed size 109 * 110 * @param name Name of the spsc symbol to be provided 111 * @param type Type stored in the spsc 112 * @param sz Size of the spsc, must be power of 2 (ex: 2, 4, 8) 113 */ 114 #define SPSC_DEFINE(name, type, sz) \ 115 BUILD_ASSERT(IS_POWER_OF_TWO(sz)); \ 116 static type __spsc_buf_##name[sz]; \ 117 SPSC_DECLARE(name, type) name = SPSC_INITIALIZER(sz, __spsc_buf_##name); 118 119 /** 120 * @brief Size of the SPSC queue 121 * 122 * @param spsc SPSC reference 123 */ 124 #define spsc_size(spsc) ((spsc)->_spsc.mask + 1) 125 126 /** 127 * @private 128 * @brief A number modulo the spsc size, assumes power of 2 129 * 130 * @param spsc SPSC reference 131 * @param i Value to modulo to the size of the spsc 132 */ 133 #define z_spsc_mask(spsc, i) ((i) & (spsc)->_spsc.mask) 134 135 /** 136 * @private 137 * @brief Load the current "in" index from the spsc as an unsigned long 138 */ 139 #define z_spsc_in(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.in) 140 141 /** 142 * @private 143 * @brief Load the current "out" index from the spsc as an unsigned long 144 */ 145 #define z_spsc_out(spsc) (unsigned long)atomic_get(&(spsc)->_spsc.out) 146 147 /** 148 * @brief Initialize/reset a spsc such that its empty 149 * 150 * Note that this is not safe to do while being used in a producer/consumer 151 * situation with multiple calling contexts (isrs/threads). 152 * 153 * @param spsc SPSC to initialize/reset 154 */ 155 #define spsc_reset(spsc) \ 156 ({ \ 157 (spsc)->_spsc.consume = 0; \ 158 (spsc)->_spsc.acquire = 0; \ 159 atomic_set(&(spsc)->_spsc.in, 0); \ 160 atomic_set(&(spsc)->_spsc.out, 0); \ 161 }) 162 163 /** 164 * @brief Acquire an element to produce from the SPSC 165 * 166 * @param spsc SPSC to acquire an element from for producing 167 * 168 * @return A pointer to the acquired element or null if the spsc is full 169 */ 170 #define spsc_acquire(spsc) \ 171 ({ \ 172 unsigned long idx = z_spsc_in(spsc) + (spsc)->_spsc.acquire; \ 173 bool spsc_acq = (idx - z_spsc_out(spsc)) < spsc_size(spsc); \ 174 if (spsc_acq) { \ 175 (spsc)->_spsc.acquire += 1; \ 176 } \ 177 spsc_acq ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL; \ 178 }) 179 180 /** 181 * @brief Produce one previously acquired element to the SPSC 182 * 183 * This makes one element available to the consumer immediately 184 * 185 * @param spsc SPSC to produce the previously acquired element or do nothing 186 */ 187 #define spsc_produce(spsc) \ 188 ({ \ 189 if ((spsc)->_spsc.acquire > 0) { \ 190 (spsc)->_spsc.acquire -= 1; \ 191 atomic_add(&(spsc)->_spsc.in, 1); \ 192 } \ 193 }) 194 195 /** 196 * @brief Produce all previously acquired elements to the SPSC 197 * 198 * This makes all previous acquired elements available to the consumer 199 * immediately 200 * 201 * @param spsc SPSC to produce all previously acquired elements or do nothing 202 */ 203 #define spsc_produce_all(spsc) \ 204 ({ \ 205 if ((spsc)->_spsc.acquire > 0) { \ 206 unsigned long acquired = (spsc)->_spsc.acquire; \ 207 (spsc)->_spsc.acquire = 0; \ 208 atomic_add(&(spsc)->_spsc.in, acquired); \ 209 } \ 210 }) 211 212 /** 213 * @brief Drop all previously acquired elements 214 * 215 * This makes all previous acquired elements available to be acquired again 216 * 217 * @param spsc SPSC to drop all previously acquired elements or do nothing 218 */ 219 #define spsc_drop_all(spsc) \ 220 do { \ 221 (spsc)->_spsc.acquire = 0; \ 222 } while (false) 223 224 /** 225 * @brief Consume an element from the spsc 226 * 227 * @param spsc Spsc to consume from 228 * 229 * @return Pointer to element or null if no consumable elements left 230 */ 231 #define spsc_consume(spsc) \ 232 ({ \ 233 unsigned long idx = z_spsc_out(spsc) + (spsc)->_spsc.consume; \ 234 bool has_consumable = (idx != z_spsc_in(spsc)); \ 235 if (has_consumable) { \ 236 (spsc)->_spsc.consume += 1; \ 237 } \ 238 has_consumable ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL; \ 239 }) 240 241 /** 242 * @brief Release a consumed element 243 * 244 * @param spsc SPSC to release consumed element or do nothing 245 */ 246 #define spsc_release(spsc) \ 247 ({ \ 248 if ((spsc)->_spsc.consume > 0) { \ 249 (spsc)->_spsc.consume -= 1; \ 250 atomic_add(&(spsc)->_spsc.out, 1); \ 251 } \ 252 }) 253 254 /** 255 * @brief Release all consumed elements 256 * 257 * @param spsc SPSC to release consumed elements or do nothing 258 */ 259 #define spsc_release_all(spsc) \ 260 ({ \ 261 if ((spsc)->_spsc.consume > 0) { \ 262 unsigned long consumed = (spsc)->_spsc.consume; \ 263 (spsc)->_spsc.consume = 0; \ 264 atomic_add(&(spsc)->_spsc.out, consumed); \ 265 } \ 266 }) 267 268 /** 269 * @brief Count of acquirable in spsc 270 * 271 * @param spsc SPSC to get item count for 272 */ 273 #define spsc_acquirable(spsc) \ 274 ({ (((spsc)->_spsc.in + (spsc)->_spsc.acquire) - (spsc)->_spsc.out) - spsc_size(spsc); }) 275 276 /** 277 * @brief Count of consumables in spsc 278 * 279 * @param spsc SPSC to get item count for 280 */ 281 #define spsc_consumable(spsc) ({ (spsc)->_spsc.in - (spsc)->_spsc.out - (spsc)->_spsc.consume; }) 282 283 /** 284 * @brief Peek at the first available item in queue 285 * 286 * @param spsc Spsc to peek into 287 * 288 * @return Pointer to element or null if no consumable elements left 289 */ 290 #define spsc_peek(spsc) \ 291 ({ \ 292 unsigned long idx = z_spsc_out(spsc) + (spsc)->_spsc.consume; \ 293 bool has_consumable = (idx != z_spsc_in(spsc)); \ 294 has_consumable ? &((spsc)->buffer[z_spsc_mask(spsc, idx)]) : NULL; \ 295 }) 296 297 /** 298 * @brief Peek at the next item in the queue from a given one 299 * 300 * 301 * @param spsc SPSC to peek at 302 * @param item Pointer to an item in the queue 303 * 304 * @return Pointer to element or null if none left 305 */ 306 #define spsc_next(spsc, item) \ 307 ({ \ 308 unsigned long idx = ((item) - (spsc)->buffer); \ 309 bool has_next = \ 310 z_spsc_mask(spsc, (idx + 1)) != (z_spsc_mask(spsc, z_spsc_in(spsc))); \ 311 has_next ? &((spsc)->buffer[z_spsc_mask((spsc), idx + 1)]) : NULL; \ 312 }) 313 314 /** 315 * @brief Get the previous item in the queue from a given one 316 * 317 * @param spsc SPSC to peek at 318 * @param item Pointer to an item in the queue 319 * 320 * @return Pointer to element or null if none left 321 */ 322 #define spsc_prev(spsc, item) \ 323 ({ \ 324 unsigned long idx = ((item) - &(spsc)->buffer[0]) / sizeof((spsc)->buffer[0]); \ 325 bool has_prev = idx != z_spsc_mask(spsc, z_spsc_out(spsc)); \ 326 has_prev ? &((spsc)->buffer[z_spsc_mask(spsc, idx - 1)]) : NULL; \ 327 }) 328 329 /** 330 * @} 331 */ 332 333 #endif /* ZEPHYR_SYS_SPSC_LOCKFREE_H_ */ 334