1 /*
2  * Copyright (c) 2024 Måns Ansgariusson <mansgariusson@gmail.com>
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 #include <stdint.h>
7 #include <zephyr/kernel.h>
8 #include <zephyr/ztest.h>
9 #include <zephyr/random/random.h>
10 #include <zephyr/logging/log.h>
11 
12 LOG_MODULE_REGISTER(k_pipe_concurrency, LOG_LEVEL_DBG);
13 ZTEST_SUITE(k_pipe_concurrency, NULL, NULL, NULL, NULL, NULL);
14 
15 static const int partial_wait_time = 2000;
16 #define DUMMY_DATA_SIZE 16
17 static struct k_thread thread;
18 static K_THREAD_STACK_DEFINE(stack, 1024 + CONFIG_TEST_EXTRA_STACK_SIZE);
19 static struct k_pipe pipe;
20 
thread_close(void * arg1,void * arg2,void * arg3)21 static void thread_close(void *arg1, void *arg2, void *arg3)
22 {
23 	k_pipe_close((struct k_pipe *)arg1);
24 }
25 
thread_reset(void * arg1,void * arg2,void * arg3)26 static void thread_reset(void *arg1, void *arg2, void *arg3)
27 {
28 	k_pipe_reset((struct k_pipe *)arg1);
29 }
30 
thread_write(void * arg1,void * arg2,void * arg3)31 static void thread_write(void *arg1, void *arg2, void *arg3)
32 {
33 	uint8_t garbage[DUMMY_DATA_SIZE] = {};
34 
35 	zassert_true(k_pipe_write((struct k_pipe *)arg1, garbage, sizeof(garbage),
36 		K_MSEC(partial_wait_time)) == sizeof(garbage), "Failed to write to pipe");
37 }
38 
thread_read(void * arg1,void * arg2,void * arg3)39 static void thread_read(void *arg1, void *arg2, void *arg3)
40 {
41 	uint8_t garbage[DUMMY_DATA_SIZE];
42 
43 	zassert_true(k_pipe_read((struct k_pipe *)arg1, garbage, sizeof(garbage),
44 		K_MSEC(partial_wait_time)) == sizeof(garbage), "Failed to read from pipe");
45 }
46 
ZTEST(k_pipe_concurrency,test_close_on_read)47 ZTEST(k_pipe_concurrency, test_close_on_read)
48 {
49 	k_tid_t tid;
50 	uint8_t buffer[DUMMY_DATA_SIZE];
51 	uint8_t res;
52 
53 	k_pipe_init(&pipe, buffer, sizeof(buffer));
54 	tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack),
55 		thread_close, &pipe, NULL, NULL, K_PRIO_COOP(0), 0, K_MSEC(100));
56 	zassert_true(tid, "k_thread_create failed");
57 	zassert_true(k_pipe_read(&pipe, &res, sizeof(res), K_MSEC(1000)) == -EPIPE,
58 		"Read on closed pipe should return -EPIPE");
59 	k_thread_join(tid, K_FOREVER);
60 	zassert_true((pipe.flags & PIPE_FLAG_OPEN) == 0,
61 		"Pipe should continue to be closed after all waiters have been released");
62 }
63 
ZTEST(k_pipe_concurrency,test_close_on_write)64 ZTEST(k_pipe_concurrency, test_close_on_write)
65 {
66 	k_tid_t tid;
67 	uint8_t buffer[DUMMY_DATA_SIZE];
68 	uint8_t garbage[DUMMY_DATA_SIZE];
69 
70 	k_pipe_init(&pipe, buffer, sizeof(buffer));
71 	zassert_true(sizeof(garbage) == k_pipe_write(&pipe, garbage, sizeof(garbage), K_MSEC(1000)),
72 		"Failed to write to pipe");
73 
74 	tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack),
75 		thread_close, &pipe, NULL, NULL, K_PRIO_COOP(0), 0, K_MSEC(100));
76 	zassert_true(tid, "k_thread_create failed");
77 	zassert_true(k_pipe_write(&pipe, garbage, sizeof(garbage), K_MSEC(1000)) == -EPIPE,
78 		"write should return -EPIPE, when pipe is closed");
79 	k_thread_join(tid, K_FOREVER);
80 	zassert_true((pipe.flags & PIPE_FLAG_OPEN) == 0,
81 		"pipe should continue to be closed after all waiters have been released");
82 }
83 
ZTEST(k_pipe_concurrency,test_reset_on_read)84 ZTEST(k_pipe_concurrency, test_reset_on_read)
85 {
86 	k_tid_t tid;
87 	uint8_t buffer[DUMMY_DATA_SIZE];
88 	uint8_t res;
89 
90 	k_pipe_init(&pipe, buffer, sizeof(buffer));
91 
92 	tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack),
93 		thread_reset, &pipe, NULL, NULL, K_PRIO_COOP(0), 0, K_MSEC(100));
94 	zassert_true(tid, "k_thread_create failed");
95 	zassert_true(k_pipe_read(&pipe, &res, sizeof(res), K_MSEC(1000)) == -ECANCELED,
96 		"reset on read should return -ECANCELED");
97 	k_thread_join(tid, K_FOREVER);
98 	zassert_true((pipe.flags & PIPE_FLAG_RESET) == 0,
99 		"pipe should not have reset flag after all waiters are done");
100 	zassert_true((pipe.flags & PIPE_FLAG_OPEN) != 0,
101 		"pipe should continue to be open after pipe is reseted");
102 }
103 
ZTEST(k_pipe_concurrency,test_reset_on_write)104 ZTEST(k_pipe_concurrency, test_reset_on_write)
105 {
106 	k_tid_t tid;
107 	uint8_t buffer[DUMMY_DATA_SIZE];
108 	uint8_t garbage[DUMMY_DATA_SIZE];
109 
110 	k_pipe_init(&pipe, buffer, sizeof(buffer));
111 	zassert_true(sizeof(garbage) == k_pipe_write(&pipe, garbage, sizeof(garbage), K_MSEC(1000)),
112 		"Failed to write to pipe");
113 
114 	tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack),
115 		thread_reset, &pipe, NULL, NULL, K_PRIO_COOP(0), 0, K_MSEC(100));
116 	zassert_true(tid, "k_thread_create failed");
117 	zassert_true(k_pipe_write(&pipe, garbage, sizeof(garbage), K_MSEC(1000)) == -ECANCELED,
118 		"reset on write should return -ECANCELED");
119 	k_thread_join(tid, K_FOREVER);
120 	zassert_true((pipe.flags & PIPE_FLAG_RESET) == 0,
121 		"pipe should not have reset flag after all waiters are done");
122 	zassert_true((pipe.flags & PIPE_FLAG_OPEN) != 0,
123 		"pipe should continue to be open after pipe is reseted");
124 }
125 
ZTEST(k_pipe_concurrency,test_partial_read)126 ZTEST(k_pipe_concurrency, test_partial_read)
127 {
128 	k_tid_t tid;
129 	uint8_t buffer[DUMMY_DATA_SIZE];
130 	uint8_t garbage[DUMMY_DATA_SIZE];
131 	size_t write_size = sizeof(garbage)/2;
132 
133 	k_pipe_init(&pipe, buffer, sizeof(buffer));
134 	tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack),
135 		thread_read, &pipe, NULL, NULL, K_PRIO_COOP(0), 0, K_NO_WAIT);
136 
137 	zassert_true(k_pipe_write(&pipe, garbage, write_size, K_NO_WAIT) == write_size,
138 		"write to pipe failed");
139 	k_msleep(partial_wait_time/4);
140 	zassert_true(k_pipe_write(&pipe, garbage, write_size, K_NO_WAIT) == write_size,
141 		"k_k_pipe_write should return number of bytes written");
142 	k_thread_join(tid, K_FOREVER);
143 }
144 
ZTEST(k_pipe_concurrency,test_partial_write)145 ZTEST(k_pipe_concurrency, test_partial_write)
146 {
147 	k_tid_t tid;
148 	uint8_t buffer[DUMMY_DATA_SIZE];
149 	uint8_t garbage[DUMMY_DATA_SIZE];
150 	size_t read_size = sizeof(garbage)/2;
151 
152 	k_pipe_init(&pipe, buffer, sizeof(buffer));
153 
154 	zassert_true(k_pipe_write(&pipe, garbage, sizeof(garbage), K_NO_WAIT) == sizeof(garbage),
155 		"Failed to write to pipe");
156 	tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack),
157 		thread_write, &pipe, NULL, NULL, K_PRIO_COOP(0), 0, K_NO_WAIT);
158 
159 	zassert_true(k_pipe_read(&pipe, garbage, read_size, K_NO_WAIT) == read_size,
160 		"Failed to read from pipe");
161 	k_msleep(partial_wait_time/2);
162 	zassert_true(k_pipe_read(&pipe, garbage, read_size, K_NO_WAIT) == read_size,
163 		"failed t read from pipe");
164 	k_thread_join(tid, K_FOREVER);
165 }
166 
167 static volatile bool zero_thread_read;
168 static volatile bool zero_thread_write;
zero_thread_read_write(void * arg1,void * arg2,void * arg3)169 static void zero_thread_read_write(void *arg1, void *arg2, void *arg3)
170 {
171 	uint8_t tmp[DUMMY_DATA_SIZE];
172 	struct k_pipe *input = (struct k_pipe *)arg1;
173 	struct k_pipe *output = (struct k_pipe *)arg2;
174 
175 	memset(tmp, 0xBB, sizeof(tmp));
176 
177 	zero_thread_read = true;
178 	zassert_true(k_pipe_read(input, tmp, sizeof(tmp), K_FOREVER) == sizeof(tmp),
179 	      "Failed to read from pipe");
180 	zero_thread_write = true;
181 	zassert_true(k_pipe_write(output, tmp, sizeof(tmp), K_FOREVER) == sizeof(tmp),
182 	      "Failed to write to pipe");
183 }
184 
ZTEST(k_pipe_concurrency,test_zero_size_pipe_read_write)185 ZTEST(k_pipe_concurrency, test_zero_size_pipe_read_write)
186 {
187 	k_tid_t tid;
188 	struct k_pipe input_pipe;
189 	struct k_pipe output_pipe;
190 	uint8_t input[DUMMY_DATA_SIZE];
191 	uint8_t output[DUMMY_DATA_SIZE];
192 
193 #ifdef CONFIG_KERNEL_COHERENCE
194 	/* Zero size pipes are not supported due to requiring cache
195 	 * management on data buffers as the buffers can reside in
196 	 * incoherent memory. So skip this test.
197 	 */
198 	ztest_test_skip();
199 #endif
200 
201 	memset(input, 0xAA, sizeof(input));
202 	memset(output, 0xCC, sizeof(output));
203 	k_pipe_init(&input_pipe, NULL, 0);
204 	k_pipe_init(&output_pipe, NULL, 0);
205 
206 	tid = k_thread_create(&thread, stack, K_THREAD_STACK_SIZEOF(stack),
207 		zero_thread_read_write, &input_pipe, &output_pipe, NULL, K_PRIO_COOP(0), 0,
208 		K_NO_WAIT);
209 
210 	zassert_true(sizeof(input) == k_pipe_write(&input_pipe, input, sizeof(input), K_FOREVER),
211 	      "Failed to write to pipe");
212 	zassert_true(sizeof(output) == k_pipe_read(&output_pipe, output, sizeof(output), K_FOREVER),
213 	      "Failed to read from pipe");
214 	zassert_true(memcmp(input, output, sizeof(input)) == 0,
215 		"Unexpected data received from pipe");
216 
217 	zassert_true(zero_thread_read && zero_thread_write,
218 		"Thread did not execute expected read/write operations");
219 
220 	k_thread_join(tid, K_FOREVER);
221 }
222