1 /*
2 * Copyright (c) 2024 Måns Ansgariusson <mansgariusson@gmail.com>
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6 #include <zephyr/init.h>
7 #include <zephyr/kernel.h>
8 #include <zephyr/internal/syscall_handler.h>
9 #include <ksched.h>
10 #include <kthread.h>
11 #include <wait_q.h>
12
13 #ifdef CONFIG_OBJ_CORE_PIPE
14 static struct k_obj_type obj_type_pipe;
15 #endif /* CONFIG_OBJ_CORE_PIPE */
16
pipe_closed(struct k_pipe * pipe)17 static inline bool pipe_closed(struct k_pipe *pipe)
18 {
19 return (pipe->flags & PIPE_FLAG_OPEN) == 0;
20 }
21
pipe_resetting(struct k_pipe * pipe)22 static inline bool pipe_resetting(struct k_pipe *pipe)
23 {
24 return (pipe->flags & PIPE_FLAG_RESET) != 0;
25 }
26
pipe_full(struct k_pipe * pipe)27 static inline bool pipe_full(struct k_pipe *pipe)
28 {
29 return ring_buf_space_get(&pipe->buf) == 0;
30 }
31
pipe_empty(struct k_pipe * pipe)32 static inline bool pipe_empty(struct k_pipe *pipe)
33 {
34 return ring_buf_is_empty(&pipe->buf);
35 }
36
wait_for(_wait_q_t * waitq,struct k_pipe * pipe,k_spinlock_key_t * key,k_timepoint_t time_limit,bool * need_resched)37 static int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key,
38 k_timepoint_t time_limit, bool *need_resched)
39 {
40 k_timeout_t timeout = sys_timepoint_timeout(time_limit);
41 int rc;
42
43 if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
44 return -EAGAIN;
45 }
46
47 pipe->waiting++;
48 *need_resched = false;
49 if (waitq == &pipe->space) {
50 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, write, pipe, timeout);
51 } else {
52 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, read, pipe, timeout);
53 }
54 rc = z_pend_curr(&pipe->lock, *key, waitq, timeout);
55 *key = k_spin_lock(&pipe->lock);
56 pipe->waiting--;
57 if (unlikely(pipe_resetting(pipe))) {
58 if (pipe->waiting == 0) {
59 pipe->flags &= ~PIPE_FLAG_RESET;
60 }
61 rc = -ECANCELED;
62 }
63
64 return rc;
65 }
66
z_impl_k_pipe_init(struct k_pipe * pipe,uint8_t * buffer,size_t buffer_size)67 void z_impl_k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size)
68 {
69 ring_buf_init(&pipe->buf, buffer_size, buffer);
70 pipe->flags = PIPE_FLAG_OPEN;
71 pipe->waiting = 0;
72
73 pipe->lock = (struct k_spinlock){};
74 z_waitq_init(&pipe->data);
75 z_waitq_init(&pipe->space);
76 k_object_init(pipe);
77
78 #ifdef CONFIG_POLL
79 sys_dlist_init(&pipe->poll_events);
80 #endif /* CONFIG_POLL */
81 #ifdef CONFIG_OBJ_CORE_PIPE
82 k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
83 #endif /* CONFIG_OBJ_CORE_PIPE */
84 SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe, buffer, buffer_size);
85 }
86
87 struct pipe_buf_spec {
88 uint8_t * const data;
89 const size_t len;
90 size_t used;
91 };
92
copy_to_pending_readers(struct k_pipe * pipe,bool * need_resched,const uint8_t * data,size_t len)93 static size_t copy_to_pending_readers(struct k_pipe *pipe, bool *need_resched,
94 const uint8_t *data, size_t len)
95 {
96 struct k_thread *reader = NULL;
97 struct pipe_buf_spec *reader_buf;
98 size_t copy_size, written = 0;
99
100 /*
101 * Attempt a direct data copy to waiting readers if any.
102 * The copy has to be done under the scheduler lock to ensure all the
103 * needed data is copied to the target thread whose buffer spec lives
104 * on that thread's stack, and then the thread unpended only if it
105 * received all the data it wanted, without racing with a potential
106 * thread timeout/cancellation event.
107 */
108 do {
109 LOCK_SCHED_SPINLOCK {
110 reader = _priq_wait_best(&pipe->data.waitq);
111 if (reader == NULL) {
112 K_SPINLOCK_BREAK;
113 }
114
115 reader_buf = reader->base.swap_data;
116 copy_size = MIN(len - written,
117 reader_buf->len - reader_buf->used);
118 memcpy(&reader_buf->data[reader_buf->used],
119 &data[written], copy_size);
120 written += copy_size;
121 reader_buf->used += copy_size;
122
123 if (reader_buf->used < reader_buf->len) {
124 /* This reader wants more: don't unpend. */
125 reader = NULL;
126 } else {
127 /*
128 * This reader has received all the data
129 * it was waiting for: wake it up with
130 * the scheduler lock still held.
131 */
132 unpend_thread_no_timeout(reader);
133 z_abort_thread_timeout(reader);
134 }
135 }
136 if (reader != NULL) {
137 /* rest of thread wake-up outside the scheduler lock */
138 z_thread_return_value_set_with_data(reader, 0, NULL);
139 z_ready_thread(reader);
140 *need_resched = true;
141 }
142 } while (reader != NULL && written < len);
143
144 return written;
145 }
146
z_impl_k_pipe_write(struct k_pipe * pipe,const uint8_t * data,size_t len,k_timeout_t timeout)147 int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_timeout_t timeout)
148 {
149 int rc;
150 size_t written = 0;
151 k_timepoint_t end = sys_timepoint_calc(timeout);
152 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
153 bool need_resched = false;
154
155 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, write, pipe, data, len, timeout);
156
157 if (unlikely(pipe_resetting(pipe))) {
158 rc = -ECANCELED;
159 goto exit;
160 }
161
162 for (;;) {
163 if (unlikely(pipe_closed(pipe))) {
164 rc = -EPIPE;
165 break;
166 }
167
168 if (pipe_empty(pipe)) {
169 if (IS_ENABLED(CONFIG_KERNEL_COHERENCE)) {
170 /*
171 * Systems that enabled this option don't have
172 * their stacks in coherent memory. Given our
173 * pipe_buf_spec is stored on the stack, and
174 * readers may also have their destination
175 * buffer on their stack too, it is not worth
176 * supporting direct-to-readers copy with them.
177 * Simply wake up all pending readers instead.
178 */
179 need_resched = z_sched_wake_all(&pipe->data, 0, NULL);
180 } else if (pipe->waiting != 0) {
181 written += copy_to_pending_readers(pipe, &need_resched,
182 &data[written],
183 len - written);
184 if (written >= len) {
185 rc = written;
186 break;
187 }
188 }
189 #ifdef CONFIG_POLL
190 z_handle_obj_poll_events(&pipe->poll_events,
191 K_POLL_STATE_PIPE_DATA_AVAILABLE);
192 #endif /* CONFIG_POLL */
193 }
194
195 written += ring_buf_put(&pipe->buf, &data[written], len - written);
196 if (likely(written == len)) {
197 rc = written;
198 break;
199 }
200
201 rc = wait_for(&pipe->space, pipe, &key, end, &need_resched);
202 if (rc != 0) {
203 if (rc == -EAGAIN) {
204 rc = written ? written : -EAGAIN;
205 }
206 break;
207 }
208 }
209 exit:
210 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, write, pipe, rc);
211 if (need_resched) {
212 z_reschedule(&pipe->lock, key);
213 } else {
214 k_spin_unlock(&pipe->lock, key);
215 }
216 return rc;
217 }
218
z_impl_k_pipe_read(struct k_pipe * pipe,uint8_t * data,size_t len,k_timeout_t timeout)219 int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout_t timeout)
220 {
221 struct pipe_buf_spec buf = { data, len, 0 };
222 int rc;
223 k_timepoint_t end = sys_timepoint_calc(timeout);
224 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
225 bool need_resched = false;
226
227 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, read, pipe, data, len, timeout);
228
229 if (unlikely(pipe_resetting(pipe))) {
230 rc = -ECANCELED;
231 goto exit;
232 }
233
234 for (;;) {
235 if (pipe_full(pipe)) {
236 /* One or more pending writers may exist. */
237 need_resched = z_sched_wake_all(&pipe->space, 0, NULL);
238 }
239
240 buf.used += ring_buf_get(&pipe->buf, &data[buf.used], len - buf.used);
241 if (likely(buf.used == len)) {
242 rc = buf.used;
243 break;
244 }
245
246 if (unlikely(pipe_closed(pipe))) {
247 rc = buf.used ? buf.used : -EPIPE;
248 break;
249 }
250
251 /* provide our "direct copy" info to potential writers */
252 _current->base.swap_data = &buf;
253
254 rc = wait_for(&pipe->data, pipe, &key, end, &need_resched);
255 if (rc != 0) {
256 if (rc == -EAGAIN) {
257 rc = buf.used ? buf.used : -EAGAIN;
258 }
259 break;
260 }
261 }
262 exit:
263 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, read, pipe, rc);
264 if (need_resched) {
265 z_reschedule(&pipe->lock, key);
266 } else {
267 k_spin_unlock(&pipe->lock, key);
268 }
269 return rc;
270 }
271
z_impl_k_pipe_reset(struct k_pipe * pipe)272 void z_impl_k_pipe_reset(struct k_pipe *pipe)
273 {
274 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, reset, pipe);
275 K_SPINLOCK(&pipe->lock) {
276 ring_buf_reset(&pipe->buf);
277 if (likely(pipe->waiting != 0)) {
278 pipe->flags |= PIPE_FLAG_RESET;
279 z_sched_wake_all(&pipe->data, 0, NULL);
280 z_sched_wake_all(&pipe->space, 0, NULL);
281 }
282 }
283 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, reset, pipe);
284 }
285
z_impl_k_pipe_close(struct k_pipe * pipe)286 void z_impl_k_pipe_close(struct k_pipe *pipe)
287 {
288 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, close, pipe);
289 K_SPINLOCK(&pipe->lock) {
290 pipe->flags = 0;
291 z_sched_wake_all(&pipe->data, 0, NULL);
292 z_sched_wake_all(&pipe->space, 0, NULL);
293 }
294 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, close, pipe);
295 }
296
297 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_init(struct k_pipe * pipe,uint8_t * buffer,size_t buffer_size)298 void z_vrfy_k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size)
299 {
300 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
301 K_OOPS(K_SYSCALL_MEMORY_WRITE(buffer, buffer_size));
302
303 z_impl_k_pipe_init(pipe, buffer, buffer_size);
304 }
305 #include <zephyr/syscalls/k_pipe_init_mrsh.c>
306
z_vrfy_k_pipe_read(struct k_pipe * pipe,uint8_t * data,size_t len,k_timeout_t timeout)307 int z_vrfy_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout_t timeout)
308 {
309 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
310 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, len));
311
312 return z_impl_k_pipe_read(pipe, data, len, timeout);
313 }
314 #include <zephyr/syscalls/k_pipe_read_mrsh.c>
315
z_vrfy_k_pipe_write(struct k_pipe * pipe,const uint8_t * data,size_t len,k_timeout_t timeout)316 int z_vrfy_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_timeout_t timeout)
317 {
318 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
319 K_OOPS(K_SYSCALL_MEMORY_READ(data, len));
320
321 return z_impl_k_pipe_write(pipe, data, len, timeout);
322 }
323 #include <zephyr/syscalls/k_pipe_write_mrsh.c>
324
z_vrfy_k_pipe_reset(struct k_pipe * pipe)325 void z_vrfy_k_pipe_reset(struct k_pipe *pipe)
326 {
327 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
328 z_impl_k_pipe_reset(pipe);
329 }
330 #include <zephyr/syscalls/k_pipe_reset_mrsh.c>
331
z_vrfy_k_pipe_close(struct k_pipe * pipe)332 void z_vrfy_k_pipe_close(struct k_pipe *pipe)
333 {
334 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
335 z_impl_k_pipe_close(pipe);
336 }
337 #include <zephyr/syscalls/k_pipe_close_mrsh.c>
338 #endif /* CONFIG_USERSPACE */
339
340 #ifdef CONFIG_OBJ_CORE_PIPE
init_pipe_obj_core_list(void)341 static int init_pipe_obj_core_list(void)
342 {
343 /* Initialize pipe object type */
344 z_obj_type_init(&obj_type_pipe, K_OBJ_TYPE_PIPE_ID,
345 offsetof(struct k_pipe, obj_core));
346
347 /* Initialize and link statically defined pipes */
348 STRUCT_SECTION_FOREACH(k_pipe, pipe) {
349 k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
350 }
351
352 return 0;
353 }
354
355 SYS_INIT(init_pipe_obj_core_list, PRE_KERNEL_1,
356 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
357 #endif /* CONFIG_OBJ_CORE_PIPE */
358