1 /*
2 * Copyright (c) 2021 Nordic Semiconductor
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6 #include <sys/mpsc_pbuf.h>
7
8 #define MPSC_PBUF_DEBUG 0
9
10 #define MPSC_PBUF_DBG(buffer, ...) do { \
11 if (MPSC_PBUF_DEBUG) { \
12 printk(__VA_ARGS__); \
13 if (buffer) { \
14 mpsc_state_print(buffer); \
15 } \
16 } \
17 } while (0)
18
mpsc_state_print(struct mpsc_pbuf_buffer * buffer)19 static inline void mpsc_state_print(struct mpsc_pbuf_buffer *buffer)
20 {
21 if (MPSC_PBUF_DEBUG) {
22 printk("wr:%d/%d, rd:%d/%d\n",
23 buffer->wr_idx, buffer->tmp_wr_idx,
24 buffer->rd_idx, buffer->tmp_rd_idx);
25 }
26 }
27
mpsc_pbuf_init(struct mpsc_pbuf_buffer * buffer,const struct mpsc_pbuf_buffer_config * cfg)28 void mpsc_pbuf_init(struct mpsc_pbuf_buffer *buffer,
29 const struct mpsc_pbuf_buffer_config *cfg)
30 {
31 int err;
32
33 memset(buffer, 0, offsetof(struct mpsc_pbuf_buffer, buf));
34 buffer->get_wlen = cfg->get_wlen;
35 buffer->notify_drop = cfg->notify_drop;
36 buffer->buf = cfg->buf;
37 buffer->size = cfg->size;
38 buffer->flags = cfg->flags;
39
40 if (is_power_of_two(buffer->size)) {
41 buffer->flags |= MPSC_PBUF_SIZE_POW2;
42 }
43
44 err = k_sem_init(&buffer->sem, 0, 1);
45 __ASSERT_NO_MSG(err == 0);
46 }
47
free_space(struct mpsc_pbuf_buffer * buffer,uint32_t * res)48 static inline bool free_space(struct mpsc_pbuf_buffer *buffer, uint32_t *res)
49 {
50 if (buffer->rd_idx > buffer->tmp_wr_idx) {
51 *res = buffer->rd_idx - buffer->tmp_wr_idx - 1;
52
53 return false;
54 } else if (!buffer->rd_idx) {
55 *res = buffer->size - buffer->tmp_wr_idx - 1;
56 return false;
57 }
58
59 *res = buffer->size - buffer->tmp_wr_idx;
60
61 return true;
62 }
63
available(struct mpsc_pbuf_buffer * buffer,uint32_t * res)64 static inline bool available(struct mpsc_pbuf_buffer *buffer, uint32_t *res)
65 {
66 if (buffer->tmp_rd_idx <= buffer->wr_idx) {
67 *res = (buffer->wr_idx - buffer->tmp_rd_idx);
68
69 return false;
70 }
71
72 *res = buffer->size - buffer->tmp_rd_idx;
73
74 return true;
75 }
76
is_valid(union mpsc_pbuf_generic * item)77 static inline bool is_valid(union mpsc_pbuf_generic *item)
78 {
79 return item->hdr.valid;
80 }
81
is_invalid(union mpsc_pbuf_generic * item)82 static inline bool is_invalid(union mpsc_pbuf_generic *item)
83 {
84 return !item->hdr.valid && !item->hdr.busy;
85 }
86
idx_inc(struct mpsc_pbuf_buffer * buffer,uint32_t idx,uint32_t val)87 static inline uint32_t idx_inc(struct mpsc_pbuf_buffer *buffer,
88 uint32_t idx, uint32_t val)
89 {
90 uint32_t i = idx + val;
91
92 if (buffer->flags & MPSC_PBUF_SIZE_POW2) {
93 return i & (buffer->size - 1);
94 }
95
96 return (i >= buffer->size) ? i - buffer->size : i;
97 }
98
idx_dec(struct mpsc_pbuf_buffer * buffer,uint32_t idx,uint32_t val)99 static inline uint32_t idx_dec(struct mpsc_pbuf_buffer *buffer,
100 uint32_t idx, uint32_t val)
101 {
102 uint32_t i = idx - val;
103
104 if (buffer->flags & MPSC_PBUF_SIZE_POW2) {
105 return idx & (buffer->size - 1);
106 }
107
108 return (i >= buffer->size) ? i + buffer->size : i;
109 }
110
get_skip(union mpsc_pbuf_generic * item)111 static inline uint32_t get_skip(union mpsc_pbuf_generic *item)
112 {
113 if (item->hdr.busy && !item->hdr.valid) {
114 return item->skip.len;
115 }
116
117 return 0;
118 }
119
add_skip_item(struct mpsc_pbuf_buffer * buffer,uint32_t wlen)120 static void add_skip_item(struct mpsc_pbuf_buffer *buffer, uint32_t wlen)
121 {
122 union mpsc_pbuf_generic skip = {
123 .skip = { .valid = 0, .busy = 1, .len = wlen }
124 };
125
126 buffer->buf[buffer->tmp_wr_idx] = skip.raw;
127 buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, wlen);
128 buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen);
129 }
130
131 /* Attempts to drop a packet. If user packets dropping is allowed then any
132 * type of packet is dropped. Otherwise only skip packets (internal padding).
133 *
134 * If user packet was dropped @p user_packet is set to true. Function returns
135 * a pointer to a dropped packet or null if nothing was dropped. It may point
136 * to user packet (@p user_packet set to true) or internal, skip packet.
137 */
drop_item_locked(struct mpsc_pbuf_buffer * buffer,uint32_t free_wlen,bool allow_drop,bool * user_packet)138 static union mpsc_pbuf_generic *drop_item_locked(struct mpsc_pbuf_buffer *buffer,
139 uint32_t free_wlen,
140 bool allow_drop,
141 bool *user_packet)
142 {
143 union mpsc_pbuf_generic *item;
144 uint32_t rd_wlen;
145 uint32_t skip_wlen;
146
147 *user_packet = false;
148 item = (union mpsc_pbuf_generic *)&buffer->buf[buffer->rd_idx];
149 skip_wlen = get_skip(item);
150
151 rd_wlen = skip_wlen ? skip_wlen : buffer->get_wlen(item);
152 if (skip_wlen) {
153 allow_drop = true;
154 } else if (allow_drop) {
155 if (item->hdr.busy) {
156 /* item is currently processed and cannot be overwritten. */
157 add_skip_item(buffer, free_wlen + 1);
158 buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, rd_wlen);
159 buffer->tmp_wr_idx = idx_inc(buffer, buffer->tmp_wr_idx, rd_wlen);
160
161 /* Get next itme followed the busy one. */
162 uint32_t next_rd_idx = idx_inc(buffer, buffer->rd_idx, rd_wlen);
163
164 item = (union mpsc_pbuf_generic *)&buffer->buf[next_rd_idx];
165 skip_wlen = get_skip(item);
166 if (skip_wlen) {
167 rd_wlen += skip_wlen;
168 } else {
169 rd_wlen += buffer->get_wlen(item);
170 *user_packet = true;
171 }
172 } else {
173 *user_packet = true;
174 }
175 } else {
176 item = NULL;
177 }
178
179 if (allow_drop) {
180 buffer->rd_idx = idx_inc(buffer, buffer->rd_idx, rd_wlen);
181 buffer->tmp_rd_idx = buffer->rd_idx;
182 }
183
184 return item;
185 }
186
mpsc_pbuf_put_word(struct mpsc_pbuf_buffer * buffer,const union mpsc_pbuf_generic item)187 void mpsc_pbuf_put_word(struct mpsc_pbuf_buffer *buffer,
188 const union mpsc_pbuf_generic item)
189 {
190 bool cont;
191 uint32_t free_wlen;
192 k_spinlock_key_t key;
193 union mpsc_pbuf_generic *dropped_item = NULL;
194 bool valid_drop;
195
196 do {
197 cont = false;
198 key = k_spin_lock(&buffer->lock);
199 (void)free_space(buffer, &free_wlen);
200 if (free_wlen) {
201 buffer->buf[buffer->tmp_wr_idx] = item.raw;
202 buffer->tmp_wr_idx = idx_inc(buffer,
203 buffer->tmp_wr_idx, 1);
204 buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, 1);
205 } else {
206 bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE;
207
208 dropped_item = drop_item_locked(buffer, free_wlen,
209 user_drop, &valid_drop);
210 cont = dropped_item != NULL;
211 }
212
213 k_spin_unlock(&buffer->lock, key);
214
215 if (cont && valid_drop) {
216 /* Notify about item being dropped. */
217 buffer->notify_drop(buffer, dropped_item);
218 }
219 } while (cont);
220
221 }
222
mpsc_pbuf_alloc(struct mpsc_pbuf_buffer * buffer,size_t wlen,k_timeout_t timeout)223 union mpsc_pbuf_generic *mpsc_pbuf_alloc(struct mpsc_pbuf_buffer *buffer,
224 size_t wlen, k_timeout_t timeout)
225 {
226 union mpsc_pbuf_generic *item = NULL;
227 union mpsc_pbuf_generic *dropped_item = NULL;
228 bool cont;
229 uint32_t free_wlen;
230 bool valid_drop;
231
232 MPSC_PBUF_DBG(buffer, "alloc %d words, ", (int)wlen);
233
234 if (wlen > (buffer->size - 1)) {
235 MPSC_PBUF_DBG(buffer, "Failed to alloc, ");
236 return NULL;
237 }
238
239 do {
240 k_spinlock_key_t key;
241 bool wrap;
242
243 cont = false;
244 key = k_spin_lock(&buffer->lock);
245 wrap = free_space(buffer, &free_wlen);
246
247 if (free_wlen >= wlen) {
248 item =
249 (union mpsc_pbuf_generic *)&buffer->buf[buffer->tmp_wr_idx];
250 item->hdr.valid = 0;
251 item->hdr.busy = 0;
252 buffer->tmp_wr_idx = idx_inc(buffer,
253 buffer->tmp_wr_idx, wlen);
254 } else if (wrap) {
255 add_skip_item(buffer, free_wlen);
256 cont = true;
257 } else if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT) &&
258 !k_is_in_isr()) {
259 int err;
260
261 k_spin_unlock(&buffer->lock, key);
262 err = k_sem_take(&buffer->sem, timeout);
263 key = k_spin_lock(&buffer->lock);
264 if (err == 0) {
265 cont = true;
266 }
267 } else {
268 bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE;
269
270 dropped_item = drop_item_locked(buffer, free_wlen,
271 user_drop, &valid_drop);
272 cont = dropped_item != NULL;
273 }
274
275 k_spin_unlock(&buffer->lock, key);
276
277 if (cont && dropped_item && valid_drop) {
278 /* Notify about item being dropped. */
279 buffer->notify_drop(buffer, dropped_item);
280 dropped_item = NULL;
281 }
282 } while (cont);
283
284 MPSC_PBUF_DBG(buffer, "allocated %p ", item);
285
286 if (IS_ENABLED(CONFIG_MPSC_CLEAR_ALLOCATED) && item) {
287 /* During test fill with 0's to simplify message comparison */
288 memset(item, 0, sizeof(int) * wlen);
289 }
290
291 return item;
292 }
293
mpsc_pbuf_commit(struct mpsc_pbuf_buffer * buffer,union mpsc_pbuf_generic * item)294 void mpsc_pbuf_commit(struct mpsc_pbuf_buffer *buffer,
295 union mpsc_pbuf_generic *item)
296 {
297 uint32_t wlen = buffer->get_wlen(item);
298
299 k_spinlock_key_t key = k_spin_lock(&buffer->lock);
300
301 item->hdr.valid = 1;
302 buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen);
303 k_spin_unlock(&buffer->lock, key);
304 MPSC_PBUF_DBG(buffer, "committed %p ", item);
305 }
306
mpsc_pbuf_put_word_ext(struct mpsc_pbuf_buffer * buffer,const union mpsc_pbuf_generic item,const void * data)307 void mpsc_pbuf_put_word_ext(struct mpsc_pbuf_buffer *buffer,
308 const union mpsc_pbuf_generic item,
309 const void *data)
310 {
311 static const size_t l =
312 (sizeof(item) + sizeof(data)) / sizeof(uint32_t);
313 union mpsc_pbuf_generic *dropped_item = NULL;
314 bool cont;
315 bool valid_drop;
316
317 do {
318 k_spinlock_key_t key;
319 uint32_t free_wlen;
320 bool wrap;
321
322 cont = false;
323 key = k_spin_lock(&buffer->lock);
324 wrap = free_space(buffer, &free_wlen);
325
326 if (free_wlen >= l) {
327 buffer->buf[buffer->tmp_wr_idx] = item.raw;
328 void **p =
329 (void **)&buffer->buf[buffer->tmp_wr_idx + 1];
330
331 *p = (void *)data;
332 buffer->tmp_wr_idx =
333 idx_inc(buffer, buffer->tmp_wr_idx, l);
334 buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, l);
335 } else if (wrap) {
336 add_skip_item(buffer, free_wlen);
337 cont = true;
338 } else {
339 bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE;
340
341 dropped_item = drop_item_locked(buffer, free_wlen,
342 user_drop, &valid_drop);
343 cont = dropped_item != NULL;
344 }
345
346 k_spin_unlock(&buffer->lock, key);
347
348 if (cont && dropped_item && valid_drop) {
349 /* Notify about item being dropped. */
350 buffer->notify_drop(buffer, dropped_item);
351 dropped_item = NULL;
352 }
353 } while (cont);
354 }
355
mpsc_pbuf_put_data(struct mpsc_pbuf_buffer * buffer,const uint32_t * data,size_t wlen)356 void mpsc_pbuf_put_data(struct mpsc_pbuf_buffer *buffer, const uint32_t *data,
357 size_t wlen)
358 {
359 bool cont;
360 union mpsc_pbuf_generic *dropped_item = NULL;
361 bool valid_drop;
362
363 do {
364 uint32_t free_wlen;
365 k_spinlock_key_t key;
366 bool wrap;
367
368 cont = false;
369 key = k_spin_lock(&buffer->lock);
370 wrap = free_space(buffer, &free_wlen);
371
372 if (free_wlen >= wlen) {
373 memcpy(&buffer->buf[buffer->tmp_wr_idx], data,
374 wlen * sizeof(uint32_t));
375 buffer->tmp_wr_idx =
376 idx_inc(buffer, buffer->tmp_wr_idx, wlen);
377 buffer->wr_idx = idx_inc(buffer, buffer->wr_idx, wlen);
378 } else if (wrap) {
379 add_skip_item(buffer, free_wlen);
380 cont = true;
381 } else {
382 bool user_drop = buffer->flags & MPSC_PBUF_MODE_OVERWRITE;
383
384 dropped_item = drop_item_locked(buffer, free_wlen,
385 user_drop, &valid_drop);
386 cont = dropped_item != NULL;
387 }
388
389 k_spin_unlock(&buffer->lock, key);
390
391 if (cont && dropped_item && valid_drop) {
392 /* Notify about item being dropped. */
393 buffer->notify_drop(buffer, dropped_item);
394 dropped_item = NULL;
395 }
396 } while (cont);
397 }
398
mpsc_pbuf_claim(struct mpsc_pbuf_buffer * buffer)399 const union mpsc_pbuf_generic *mpsc_pbuf_claim(struct mpsc_pbuf_buffer *buffer)
400 {
401 union mpsc_pbuf_generic *item;
402 bool cont;
403
404 do {
405 uint32_t a;
406 k_spinlock_key_t key;
407 bool wrap;
408
409 cont = false;
410 key = k_spin_lock(&buffer->lock);
411 wrap = available(buffer, &a);
412 item = (union mpsc_pbuf_generic *)
413 &buffer->buf[buffer->tmp_rd_idx];
414
415 if (!a || is_invalid(item)) {
416 item = NULL;
417 } else {
418 uint32_t skip = get_skip(item);
419
420 if (skip || !is_valid(item)) {
421 uint32_t inc =
422 skip ? skip : buffer->get_wlen(item);
423
424 buffer->tmp_rd_idx =
425 idx_inc(buffer, buffer->tmp_rd_idx, inc);
426 buffer->rd_idx =
427 idx_inc(buffer, buffer->rd_idx, inc);
428 cont = true;
429 } else {
430 item->hdr.busy = 1;
431 buffer->tmp_rd_idx =
432 idx_inc(buffer, buffer->tmp_rd_idx,
433 buffer->get_wlen(item));
434 }
435 }
436
437 if (!cont) {
438 MPSC_PBUF_DBG(buffer, "claimed: %p ", item);
439 }
440 k_spin_unlock(&buffer->lock, key);
441 } while (cont);
442
443 return item;
444 }
445
mpsc_pbuf_free(struct mpsc_pbuf_buffer * buffer,const union mpsc_pbuf_generic * item)446 void mpsc_pbuf_free(struct mpsc_pbuf_buffer *buffer,
447 const union mpsc_pbuf_generic *item)
448 {
449 uint32_t wlen = buffer->get_wlen(item);
450 k_spinlock_key_t key = k_spin_lock(&buffer->lock);
451 union mpsc_pbuf_generic *witem = (union mpsc_pbuf_generic *)item;
452
453 witem->hdr.valid = 0;
454 if (!(buffer->flags & MPSC_PBUF_MODE_OVERWRITE) ||
455 ((uint32_t *)item == &buffer->buf[buffer->rd_idx])) {
456 witem->hdr.busy = 0;
457 buffer->rd_idx = idx_inc(buffer, buffer->rd_idx, wlen);
458 } else {
459 witem->skip.len = wlen;
460 }
461 MPSC_PBUF_DBG(buffer, "freed: %p ", item);
462
463 k_spin_unlock(&buffer->lock, key);
464 k_sem_give(&buffer->sem);
465 }
466
mpsc_pbuf_is_pending(struct mpsc_pbuf_buffer * buffer)467 bool mpsc_pbuf_is_pending(struct mpsc_pbuf_buffer *buffer)
468 {
469 uint32_t a;
470
471 (void)available(buffer, &a);
472
473 return a ? true : false;
474 }
475