1 /*
2  * Copyright (c) 2010-2011 Dmitry Vyukov
3  * Copyright (c) 2022 Intel Corporation
4  *
5  * SPDX-License-Identifier: Apache-2.0
6  */
7 
8 #ifndef ZEPHYR_RTIO_MPSC_H_
9 #define ZEPHYR_RTIO_MPSC_H_
10 
11 #include <stdint.h>
12 #include <stdbool.h>
13 #include <zephyr/toolchain/common.h>
14 #include <zephyr/sys/atomic.h>
15 #include <zephyr/kernel.h>
16 
17 #ifdef __cplusplus
18 extern "C" {
19 #endif
20 
21 /**
22  * @brief RTIO Multiple Producer Single Consumer (MPSC) Queue API
23  * @defgroup rtio_mpsc RTIO MPSC API
24  * @ingroup rtio
25  * @{
26  */
27 
28 /*
29  * On single core systems atomics are unnecessary
30  * and cause a lot of unnecessary cache invalidation
31  *
32  * Using volatile to at least ensure memory is read/written
33  * by the compiler generated op codes is enough.
34  *
35  * On SMP atomics *must* be used to ensure the pointers
36  * are updated in the correct order and the values are
37  * updated core caches correctly.
38  */
39 #if defined(CONFIG_SMP)
40 
41 typedef atomic_ptr_t mpsc_ptr_t;
42 
43 #define mpsc_ptr_get(ptr)	   atomic_ptr_get(&(ptr))
44 #define mpsc_ptr_set(ptr, val)	   atomic_ptr_set(&(ptr), val)
45 #define mpsc_ptr_set_get(ptr, val) atomic_ptr_set(&(ptr), val)
46 
47 #else
48 
49 typedef struct rtio_mpsc_node *mpsc_ptr_t;
50 
51 #define mpsc_ptr_get(ptr)      ptr
52 #define mpsc_ptr_set(ptr, val) ptr = val
53 #define mpsc_ptr_set_get(ptr, val)                                                                 \
54 	({                                                                                         \
55 		mpsc_ptr_t tmp = ptr;                                                              \
56 		ptr = val;                                                                         \
57 		tmp;                                                                               \
58 	})
59 
60 #endif
61 
62 /**
63  * @file rtio_mpsc.h
64  *
65  * @brief A wait-free intrusive multi producer single consumer (MPSC) queue using
66  * a singly linked list. Ordering is First-In-First-Out.
67  *
68  * Based on the well known and widely used wait-free MPSC queue described by
69  * Dmitry Vyukov with some slight changes to account for needs of an
70  * RTOS on a variety of archs. Both consumer and producer are wait free. No CAS
71  * loop or lock is needed.
72  *
73  * An MPSC queue is safe to produce or consume in an ISR with O(1) push/pop.
74  *
75  * @warning MPSC is *not* safe to consume in multiple execution contexts.
76  */
77 
78 /**
79  * @brief Queue member
80  */
81 struct rtio_mpsc_node {
82 	mpsc_ptr_t next;
83 };
84 
85 /**
86  * @brief MPSC Queue
87  */
88 struct rtio_mpsc {
89 	mpsc_ptr_t head;
90 	struct rtio_mpsc_node *tail;
91 	struct rtio_mpsc_node stub;
92 };
93 
94 /**
95  * @brief Static initializer for a mpsc queue
96  *
97  * Since the queue is
98  *
99  * @param symbol name of the queue
100  */
101 #define RTIO_MPSC_INIT(symbol)                                                                     \
102 	{                                                                                          \
103 		.head = (struct rtio_mpsc_node *)&symbol.stub,                                     \
104 		.tail = (struct rtio_mpsc_node *)&symbol.stub,                                     \
105 		.stub = {                                                                          \
106 			.next = NULL,                                                              \
107 		},                                                                                 \
108 	}
109 
110 /**
111  * @brief Initialize queue
112  *
113  * @param q Queue to initialize or reset
114  */
rtio_mpsc_init(struct rtio_mpsc * q)115 static inline void rtio_mpsc_init(struct rtio_mpsc *q)
116 {
117 	mpsc_ptr_set(q->head, &q->stub);
118 	q->tail = &q->stub;
119 	mpsc_ptr_set(q->stub.next, NULL);
120 }
121 
122 /**
123  * @brief Push a node
124  *
125  * @param q Queue to push the node to
126  * @param n Node to push into the queue
127  */
rtio_mpsc_push(struct rtio_mpsc * q,struct rtio_mpsc_node * n)128 static ALWAYS_INLINE void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n)
129 {
130 	struct rtio_mpsc_node *prev;
131 	int key;
132 
133 	mpsc_ptr_set(n->next, NULL);
134 
135 	key = arch_irq_lock();
136 	prev = (struct rtio_mpsc_node *)mpsc_ptr_set_get(q->head, n);
137 	mpsc_ptr_set(prev->next, n);
138 	arch_irq_unlock(key);
139 }
140 
141 /**
142  * @brief Pop a node off of the list
143  *
144  * @retval NULL When no node is available
145  * @retval node When node is available
146  */
rtio_mpsc_pop(struct rtio_mpsc * q)147 static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
148 {
149 	struct rtio_mpsc_node *head;
150 	struct rtio_mpsc_node *tail = q->tail;
151 	struct rtio_mpsc_node *next = (struct rtio_mpsc_node *)mpsc_ptr_get(tail->next);
152 
153 	/* Skip over the stub/sentinel */
154 	if (tail == &q->stub) {
155 		if (next == NULL) {
156 			return NULL;
157 		}
158 
159 		q->tail = next;
160 		tail = next;
161 		next = (struct rtio_mpsc_node *)mpsc_ptr_get(next->next);
162 	}
163 
164 	/* If next is non-NULL then a valid node is found, return it */
165 	if (next != NULL) {
166 		q->tail = next;
167 		return tail;
168 	}
169 
170 	head = (struct rtio_mpsc_node *)mpsc_ptr_get(q->head);
171 
172 	/* If next is NULL, and the tail != HEAD then the queue has pending
173 	 * updates that can't yet be accessed.
174 	 */
175 	if (tail != head) {
176 		return NULL;
177 	}
178 
179 	rtio_mpsc_push(q, &q->stub);
180 
181 	next = (struct rtio_mpsc_node *)mpsc_ptr_get(tail->next);
182 
183 	if (next != NULL) {
184 		q->tail = next;
185 		return tail;
186 	}
187 
188 	return NULL;
189 }
190 
191 /**
192  * @}
193  */
194 
195 #ifdef __cplusplus
196 }
197 #endif
198 
199 #endif /* ZEPHYR_RTIO_MPSC_H_ */
200