1 /*
2 * Copyright (c) 2010-2011 Dmitry Vyukov
3 * Copyright (c) 2022 Intel Corporation
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 */
7
8 #ifndef ZEPHYR_RTIO_MPSC_H_
9 #define ZEPHYR_RTIO_MPSC_H_
10
11 #include <stdint.h>
12 #include <stdbool.h>
13 #include <zephyr/toolchain/common.h>
14 #include <zephyr/sys/atomic.h>
15 #include <zephyr/kernel.h>
16
17 #ifdef __cplusplus
18 extern "C" {
19 #endif
20
21 /**
22 * @brief RTIO Multiple Producer Single Consumer (MPSC) Queue API
23 * @defgroup rtio_mpsc RTIO MPSC API
24 * @ingroup rtio
25 * @{
26 */
27
28 /*
29 * On single core systems atomics are unnecessary
30 * and cause a lot of unnecessary cache invalidation
31 *
32 * Using volatile to at least ensure memory is read/written
33 * by the compiler generated op codes is enough.
34 *
35 * On SMP atomics *must* be used to ensure the pointers
36 * are updated in the correct order and the values are
37 * updated core caches correctly.
38 */
39 #if defined(CONFIG_SMP)
40
41 typedef atomic_ptr_t mpsc_ptr_t;
42
43 #define mpsc_ptr_get(ptr) atomic_ptr_get(&(ptr))
44 #define mpsc_ptr_set(ptr, val) atomic_ptr_set(&(ptr), val)
45 #define mpsc_ptr_set_get(ptr, val) atomic_ptr_set(&(ptr), val)
46
47 #else
48
49 typedef struct rtio_mpsc_node *mpsc_ptr_t;
50
51 #define mpsc_ptr_get(ptr) ptr
52 #define mpsc_ptr_set(ptr, val) ptr = val
53 #define mpsc_ptr_set_get(ptr, val) \
54 ({ \
55 mpsc_ptr_t tmp = ptr; \
56 ptr = val; \
57 tmp; \
58 })
59
60 #endif
61
62 /**
63 * @file rtio_mpsc.h
64 *
65 * @brief A wait-free intrusive multi producer single consumer (MPSC) queue using
66 * a singly linked list. Ordering is First-In-First-Out.
67 *
68 * Based on the well known and widely used wait-free MPSC queue described by
69 * Dmitry Vyukov with some slight changes to account for needs of an
70 * RTOS on a variety of archs. Both consumer and producer are wait free. No CAS
71 * loop or lock is needed.
72 *
73 * An MPSC queue is safe to produce or consume in an ISR with O(1) push/pop.
74 *
75 * @warning MPSC is *not* safe to consume in multiple execution contexts.
76 */
77
78 /**
79 * @brief Queue member
80 */
81 struct rtio_mpsc_node {
82 mpsc_ptr_t next;
83 };
84
85 /**
86 * @brief MPSC Queue
87 */
88 struct rtio_mpsc {
89 mpsc_ptr_t head;
90 struct rtio_mpsc_node *tail;
91 struct rtio_mpsc_node stub;
92 };
93
94 /**
95 * @brief Static initializer for a mpsc queue
96 *
97 * Since the queue is
98 *
99 * @param symbol name of the queue
100 */
101 #define RTIO_MPSC_INIT(symbol) \
102 { \
103 .head = (struct rtio_mpsc_node *)&symbol.stub, \
104 .tail = (struct rtio_mpsc_node *)&symbol.stub, \
105 .stub = { \
106 .next = NULL, \
107 }, \
108 }
109
110 /**
111 * @brief Initialize queue
112 *
113 * @param q Queue to initialize or reset
114 */
rtio_mpsc_init(struct rtio_mpsc * q)115 static inline void rtio_mpsc_init(struct rtio_mpsc *q)
116 {
117 mpsc_ptr_set(q->head, &q->stub);
118 q->tail = &q->stub;
119 mpsc_ptr_set(q->stub.next, NULL);
120 }
121
122 /**
123 * @brief Push a node
124 *
125 * @param q Queue to push the node to
126 * @param n Node to push into the queue
127 */
rtio_mpsc_push(struct rtio_mpsc * q,struct rtio_mpsc_node * n)128 static ALWAYS_INLINE void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n)
129 {
130 struct rtio_mpsc_node *prev;
131 int key;
132
133 mpsc_ptr_set(n->next, NULL);
134
135 key = arch_irq_lock();
136 prev = (struct rtio_mpsc_node *)mpsc_ptr_set_get(q->head, n);
137 mpsc_ptr_set(prev->next, n);
138 arch_irq_unlock(key);
139 }
140
141 /**
142 * @brief Pop a node off of the list
143 *
144 * @retval NULL When no node is available
145 * @retval node When node is available
146 */
rtio_mpsc_pop(struct rtio_mpsc * q)147 static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
148 {
149 struct rtio_mpsc_node *head;
150 struct rtio_mpsc_node *tail = q->tail;
151 struct rtio_mpsc_node *next = (struct rtio_mpsc_node *)mpsc_ptr_get(tail->next);
152
153 /* Skip over the stub/sentinel */
154 if (tail == &q->stub) {
155 if (next == NULL) {
156 return NULL;
157 }
158
159 q->tail = next;
160 tail = next;
161 next = (struct rtio_mpsc_node *)mpsc_ptr_get(next->next);
162 }
163
164 /* If next is non-NULL then a valid node is found, return it */
165 if (next != NULL) {
166 q->tail = next;
167 return tail;
168 }
169
170 head = (struct rtio_mpsc_node *)mpsc_ptr_get(q->head);
171
172 /* If next is NULL, and the tail != HEAD then the queue has pending
173 * updates that can't yet be accessed.
174 */
175 if (tail != head) {
176 return NULL;
177 }
178
179 rtio_mpsc_push(q, &q->stub);
180
181 next = (struct rtio_mpsc_node *)mpsc_ptr_get(tail->next);
182
183 if (next != NULL) {
184 q->tail = next;
185 return tail;
186 }
187
188 return NULL;
189 }
190
191 /**
192 * @}
193 */
194
195 #ifdef __cplusplus
196 }
197 #endif
198
199 #endif /* ZEPHYR_RTIO_MPSC_H_ */
200