1 /*
2  * Copyright (c) 2021 Nordic Semiconductor ASA
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 #include <zephyr/ztest.h>
7 #include <zephyr/ztress.h>
8 #include <zephyr/sys/mpsc_pbuf.h>
9 #include <zephyr/sys/ring_buffer.h>
10 #include <zephyr/random/random.h>
11 #include <zephyr/logging/log.h>
12 LOG_MODULE_REGISTER(test);
13 
14 #define DEBUG 0
15 #define DBG(...) COND_CODE_1(DEBUG, (printk(__VA_ARGS__)), ())
16 
17 static uint32_t buf32[128];
18 static struct mpsc_pbuf_buffer mpsc_buffer;
19 volatile int test_microdelay_cnt;
20 static struct k_spinlock lock;
21 
22 static atomic_t test_failed;
23 static int test_failed_line;
24 static uint32_t test_failed_cnt;
25 static uint32_t test_failed_ctx;
26 
27 static uint32_t track_mask[4][12];
28 static uint32_t track_base_idx[4];
29 
30 /* data */
31 struct test_data {
32 	uint32_t idx[4];
33 
34 	atomic_t claim_cnt;
35 	atomic_t claim_miss_cnt;
36 	atomic_t produce_cnt;
37 	atomic_t alloc_fails;
38 	atomic_t dropped;
39 };
40 
41 static struct test_data data;
42 
43 #define LEN_BITS 8
44 #define CTX_BITS 2
45 #define DATA_BITS (32 - MPSC_PBUF_HDR_BITS - LEN_BITS - CTX_BITS)
46 
47 #define MASK_BITS 32
48 
49 struct test_packet {
50 	MPSC_PBUF_HDR;
51 	uint32_t len : LEN_BITS;
52 	uint32_t ctx : CTX_BITS;
53 	uint32_t data : DATA_BITS;
54 	uint32_t buf[];
55 };
56 
track_produce(uint32_t ctx,uint32_t idx)57 static void track_produce(uint32_t ctx, uint32_t idx)
58 {
59 	k_spinlock_key_t key = k_spin_lock(&lock);
60 	uint32_t ridx = idx - track_base_idx[ctx];
61 	uint32_t word = ridx / MASK_BITS;
62 	uint32_t bit = ridx & (MASK_BITS - 1);
63 
64 	DBG("p %d|%d\n", ctx, idx);
65 	track_mask[ctx][word] |= BIT(bit);
66 	k_spin_unlock(&lock, key);
67 }
68 
track_consume(uint32_t ctx,uint32_t idx)69 static bool track_consume(uint32_t ctx, uint32_t idx)
70 {
71 	k_spinlock_key_t key = k_spin_lock(&lock);
72 	uint32_t base_idx = track_base_idx[ctx];
73 	uint32_t ridx = idx - base_idx;
74 	uint32_t word = ridx / MASK_BITS;
75 	uint32_t bit = ridx & (MASK_BITS - 1);
76 	bool rv = true;
77 
78 	DBG("c %d|%d\n", ctx, idx);
79 	if (idx < base_idx || idx > (base_idx + 32 * ARRAY_SIZE(track_mask[0]))) {
80 		printk("bits %d\n", MASK_BITS);
81 		printk("Strange value %d|%d base:%d\n", ctx, idx, base_idx);
82 		rv = false;
83 		goto bail;
84 	}
85 
86 	if ((track_mask[ctx][word] & BIT(bit)) == 0) {
87 		/* Already consumed. */
88 		printk("already consumed\n");
89 		rv = false;
90 		goto bail;
91 	}
92 
93 	track_mask[ctx][word] &= ~BIT(bit);
94 
95 	if (word > (ARRAY_SIZE(track_mask[ctx]) / 2)) {
96 		/* Far in the past should all be consumed by now. */
97 		if (track_mask[ctx][0]) {
98 			printk("not all dropped\n");
99 			rv = false;
100 			goto bail;
101 		}
102 
103 		DBG("move %d\n", ctx);
104 		memmove(track_mask[ctx], &track_mask[ctx][1],
105 			sizeof(track_mask[ctx]) - sizeof(uint32_t));
106 		track_mask[ctx][ARRAY_SIZE(track_mask[ctx]) - 1] = 0;
107 		track_base_idx[ctx] += 32;
108 	}
109 
110 bail:
111 	k_spin_unlock(&lock, key);
112 	return rv;
113 }
114 
test_fail(int line,struct test_packet * packet)115 static void test_fail(int line, struct test_packet *packet)
116 {
117 	if (atomic_cas(&test_failed, 0, 1)) {
118 		test_failed_line = line;
119 		test_failed_cnt = packet->data;
120 		test_failed_ctx = packet->ctx;
121 		ztress_abort();
122 	}
123 }
124 
consume_check(struct test_packet * packet)125 static void consume_check(struct test_packet *packet)
126 {
127 	bool res = track_consume(packet->ctx, packet->data);
128 
129 	if (!res) {
130 		test_fail(__LINE__, packet);
131 		return;
132 	}
133 
134 	for (int i = 0; i < packet->len - 1; i++) {
135 		if (packet->buf[i] != packet->data + i) {
136 			test_fail(__LINE__, packet);
137 		}
138 	}
139 }
140 
drop(const struct mpsc_pbuf_buffer * buffer,const union mpsc_pbuf_generic * item)141 static void drop(const struct mpsc_pbuf_buffer *buffer, const union mpsc_pbuf_generic *item)
142 {
143 	struct test_packet *packet = (struct test_packet *)item;
144 
145 	atomic_inc(&data.dropped);
146 	consume_check(packet);
147 }
148 
consume(void * user_data,uint32_t cnt,bool last,int prio)149 static bool consume(void *user_data, uint32_t cnt, bool last, int prio)
150 {
151 	struct mpsc_pbuf_buffer *buffer = user_data;
152 	struct test_packet *packet = (struct test_packet *)mpsc_pbuf_claim(buffer);
153 
154 	if (packet) {
155 		atomic_inc(&data.claim_cnt);
156 		consume_check(packet);
157 		mpsc_pbuf_free(buffer, (union mpsc_pbuf_generic *)packet);
158 	} else {
159 		atomic_inc(&data.claim_miss_cnt);
160 	}
161 
162 
163 	return true;
164 }
165 
produce(void * user_data,uint32_t cnt,bool last,int prio)166 static bool produce(void *user_data, uint32_t cnt, bool last, int prio)
167 {
168 	struct mpsc_pbuf_buffer *buffer = user_data;
169 
170 	zassert_true(prio < 4, NULL);
171 
172 
173 	uint32_t wlen = sys_rand32_get() % (buffer->size / 4) + 1;
174 	struct test_packet *packet = (struct test_packet *)mpsc_pbuf_alloc(buffer, wlen, K_NO_WAIT);
175 
176 	if (!packet) {
177 		atomic_inc(&data.alloc_fails);
178 		return true;
179 	}
180 
181 	atomic_inc(&data.produce_cnt);
182 
183 	/* Note that producing may be interrupted and there will be discontinuity
184 	 * which must be handled when verifying correctness during consumption.
185 	 */
186 	uint32_t id = data.idx[prio];
187 
188 	track_produce(prio, id);
189 
190 	data.idx[prio]++;
191 	packet->ctx = prio;
192 	packet->data = id;
193 	packet->len = wlen;
194 	for (int i = 0; i < (wlen - 1); i++) {
195 		packet->buf[i] = id + i;
196 	}
197 
198 	mpsc_pbuf_commit(buffer, (union mpsc_pbuf_generic *)packet);
199 
200 	return true;
201 }
202 
get_wlen(const union mpsc_pbuf_generic * item)203 static uint32_t get_wlen(const union mpsc_pbuf_generic *item)
204 {
205 	struct test_packet *packet = (struct test_packet *)item;
206 
207 	return packet->len;
208 }
209 
210 /* Test is using 3 contexts to access single mpsc_pbuf instance. Those contexts
211  * are on different priorities (2 threads and timer interrupt) and preempt
212  * each other. One context is consuming and other two are producing. It
213  * validates that each produced packet is consumed or dropped.
214  *
215  * Test is randomized. Thread sleep time and timer timeout are random. Packet
216  * size is also random. Dedicated work is used to fill a pool of random number
217  * (generating random numbers is time consuming so it is decoupled from the main
218  * test.
219  *
220  * Test attempts to stress mpsc_pbuf but having as many preemptions as possible.
221  * In order to achieve that CPU load is monitored periodically and if load is
222  * to low then sleep/timeout time is reduced by reducing a factor that
223  * is used to calculate sleep/timeout time (factor * random number). Test aims
224  * to keep cpu load at ~80%. Some room is left for keeping random number pool
225  * filled.
226  */
stress_test(bool overwrite,ztress_handler h1,ztress_handler h2,ztress_handler h3,ztress_handler h4)227 static void stress_test(bool overwrite,
228 			ztress_handler h1,
229 			ztress_handler h2,
230 			ztress_handler h3,
231 			ztress_handler h4)
232 {
233 	uint32_t preempt_max = 4000;
234 	k_timeout_t t = Z_TIMEOUT_TICKS(20);
235 	struct mpsc_pbuf_buffer_config config = {
236 		.buf = buf32,
237 		.size = ARRAY_SIZE(buf32),
238 		.notify_drop = drop,
239 		.get_wlen = get_wlen,
240 		.flags = overwrite ? MPSC_PBUF_MODE_OVERWRITE : 0
241 	};
242 
243 	if (CONFIG_SYS_CLOCK_TICKS_PER_SEC < 10000) {
244 		ztest_test_skip();
245 	}
246 
247 	test_failed = 0;
248 	memset(track_base_idx, 0, sizeof(track_base_idx));
249 	memset(track_mask, 0, sizeof(track_mask));
250 	memset(&data, 0, sizeof(data));
251 	memset(&mpsc_buffer, 0, sizeof(mpsc_buffer));
252 	mpsc_pbuf_init(&mpsc_buffer, &config);
253 
254 	ztress_set_timeout(K_MSEC(10000));
255 
256 	if (h4 == NULL) {
257 		ZTRESS_EXECUTE(
258 				/*ZTRESS_TIMER(h1,  &mpsc_buffer, 0, t),*/
259 			       ZTRESS_THREAD(h1,  &mpsc_buffer, 0, 0, t),
260 			       ZTRESS_THREAD(h2, &mpsc_buffer, 0, preempt_max, t),
261 			       ZTRESS_THREAD(h3, &mpsc_buffer, 0, preempt_max, t));
262 	} else {
263 		ZTRESS_EXECUTE(
264 				/*ZTRESS_TIMER(h1,  &mpsc_buffer, 0, t),*/
265 			       ZTRESS_THREAD(h1, &mpsc_buffer, 0, 0, t),
266 			       ZTRESS_THREAD(h2, &mpsc_buffer, 0, preempt_max, t),
267 			       ZTRESS_THREAD(h3, &mpsc_buffer, 0, preempt_max, t),
268 			       ZTRESS_THREAD(h4, &mpsc_buffer, 0, preempt_max, t)
269 			       );
270 	}
271 
272 	if (test_failed) {
273 		for (int i = 0; i < 4; i++) {
274 			printk("mask: ");
275 			for (int j = 0; j < ARRAY_SIZE(track_mask[0]); j++) {
276 				printk("%08x ", (uint32_t)track_mask[i][j]);
277 			}
278 			printk("\n");
279 		}
280 	}
281 
282 	zassert_false(test_failed, "Test failed with data:%d (line: %d)",
283 			test_failed_cnt, test_failed_line);
284 	PRINT("Test report:\n");
285 	PRINT("\tClaims:%ld, claim misses:%ld\n", data.claim_cnt, data.claim_miss_cnt);
286 	PRINT("\tProduced:%ld, allocation failures:%ld\n", data.produce_cnt, data.alloc_fails);
287 	PRINT("\tDropped: %ld\n", data.dropped);
288 }
289 
ZTEST(mpsc_pbuf_concurrent,test_stress_preemptions_low_consumer)290 ZTEST(mpsc_pbuf_concurrent, test_stress_preemptions_low_consumer)
291 {
292 	stress_test(true, produce, produce, produce, consume);
293 	stress_test(false, produce, produce, produce, consume);
294 }
295 
296 /* Consumer has medium priority with one lower priority consumer and one higher. */
ZTEST(mpsc_pbuf_concurrent,test_stress_preemptions_mid_consumer)297 ZTEST(mpsc_pbuf_concurrent, test_stress_preemptions_mid_consumer)
298 {
299 	stress_test(true, produce, consume, produce, produce);
300 	stress_test(false, produce, consume, produce, produce);
301 }
302 
303 /* Consumer has the highest priority, it preempts both producer. */
ZTEST(mpsc_pbuf_concurrent,test_stress_preemptions_high_consumer)304 ZTEST(mpsc_pbuf_concurrent, test_stress_preemptions_high_consumer)
305 {
306 	stress_test(true, consume, produce, produce, produce);
307 	stress_test(false, consume, produce, produce, produce);
308 }
309 
310 ZTEST_SUITE(mpsc_pbuf_concurrent, NULL, NULL, NULL, NULL, NULL);
311