1 /*
2 * Copyright (c) 2016 Wind River Systems, Inc.
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /**
8 * @file
9 *
10 * @brief Pipes
11 */
12
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15
16 #include <zephyr/toolchain.h>
17 #include <ksched.h>
18 #include <wait_q.h>
19 #include <zephyr/init.h>
20 #include <zephyr/internal/syscall_handler.h>
21 #include <kernel_internal.h>
22 #include <zephyr/sys/check.h>
23
24 struct waitq_walk_data {
25 sys_dlist_t *list;
26 size_t bytes_requested;
27 size_t bytes_available;
28 };
29
30 static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe,
31 void *data, size_t bytes_to_read,
32 size_t *bytes_read, size_t min_xfer,
33 k_timeout_t timeout);
34 #ifdef CONFIG_OBJ_CORE_PIPE
35 static struct k_obj_type obj_type_pipe;
36 #endif /* CONFIG_OBJ_CORE_PIPE */
37
38
z_impl_k_pipe_init(struct k_pipe * pipe,unsigned char * buffer,size_t size)39 void z_impl_k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)
40 {
41 pipe->buffer = buffer;
42 pipe->size = size;
43 pipe->bytes_used = 0U;
44 pipe->read_index = 0U;
45 pipe->write_index = 0U;
46 pipe->lock = (struct k_spinlock){};
47 z_waitq_init(&pipe->wait_q.writers);
48 z_waitq_init(&pipe->wait_q.readers);
49 SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe, buffer, size);
50
51 pipe->flags = 0;
52
53 #if defined(CONFIG_POLL)
54 sys_dlist_init(&pipe->poll_events);
55 #endif /* CONFIG_POLL */
56 k_object_init(pipe);
57
58 #ifdef CONFIG_OBJ_CORE_PIPE
59 k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
60 #endif /* CONFIG_OBJ_CORE_PIPE */
61 }
62
z_impl_k_pipe_alloc_init(struct k_pipe * pipe,size_t size)63 int z_impl_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
64 {
65 void *buffer;
66 int ret;
67
68 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, alloc_init, pipe);
69
70 if (size != 0U) {
71 buffer = z_thread_malloc(size);
72 if (buffer != NULL) {
73 k_pipe_init(pipe, buffer, size);
74 pipe->flags = K_PIPE_FLAG_ALLOC;
75 ret = 0;
76 } else {
77 ret = -ENOMEM;
78 }
79 } else {
80 k_pipe_init(pipe, NULL, 0U);
81 ret = 0;
82 }
83
84 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, alloc_init, pipe, ret);
85
86 return ret;
87 }
88
89 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_init(struct k_pipe * pipe,unsigned char * buffer,size_t size)90 static inline void z_vrfy_k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)
91 {
92 K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(pipe, K_OBJ_PIPE));
93 K_OOPS(K_SYSCALL_MEMORY_WRITE(buffer, size));
94
95 z_impl_k_pipe_init(pipe, buffer, size);
96 }
97 #include <zephyr/syscalls/k_pipe_init_mrsh.c>
98
z_vrfy_k_pipe_alloc_init(struct k_pipe * pipe,size_t size)99 static inline int z_vrfy_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
100 {
101 K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(pipe, K_OBJ_PIPE));
102
103 return z_impl_k_pipe_alloc_init(pipe, size);
104 }
105 #include <zephyr/syscalls/k_pipe_alloc_init_mrsh.c>
106 #endif /* CONFIG_USERSPACE */
107
handle_poll_events(struct k_pipe * pipe)108 static inline bool handle_poll_events(struct k_pipe *pipe)
109 {
110 #ifdef CONFIG_POLL
111 return z_handle_obj_poll_events(&pipe->poll_events, K_POLL_STATE_PIPE_DATA_AVAILABLE);
112 #else
113 ARG_UNUSED(pipe);
114 return false;
115 #endif /* CONFIG_POLL */
116 }
117
z_impl_k_pipe_flush(struct k_pipe * pipe)118 void z_impl_k_pipe_flush(struct k_pipe *pipe)
119 {
120 size_t bytes_read;
121
122 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, flush, pipe);
123
124 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
125
126 (void) pipe_get_internal(key, pipe, NULL, (size_t) -1, &bytes_read, 0U,
127 K_NO_WAIT);
128
129 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, flush, pipe);
130 }
131
132 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_flush(struct k_pipe * pipe)133 void z_vrfy_k_pipe_flush(struct k_pipe *pipe)
134 {
135 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
136
137 z_impl_k_pipe_flush(pipe);
138 }
139 #include <zephyr/syscalls/k_pipe_flush_mrsh.c>
140 #endif /* CONFIG_USERSPACE */
141
z_impl_k_pipe_buffer_flush(struct k_pipe * pipe)142 void z_impl_k_pipe_buffer_flush(struct k_pipe *pipe)
143 {
144 size_t bytes_read;
145
146 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, buffer_flush, pipe);
147
148 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
149
150 if (pipe->buffer != NULL) {
151 (void) pipe_get_internal(key, pipe, NULL, pipe->size,
152 &bytes_read, 0U, K_NO_WAIT);
153 } else {
154 k_spin_unlock(&pipe->lock, key);
155 }
156
157 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, buffer_flush, pipe);
158 }
159
160 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_buffer_flush(struct k_pipe * pipe)161 void z_vrfy_k_pipe_buffer_flush(struct k_pipe *pipe)
162 {
163 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
164
165 z_impl_k_pipe_buffer_flush(pipe);
166 }
167 #endif /* CONFIG_USERSPACE */
168
k_pipe_cleanup(struct k_pipe * pipe)169 int k_pipe_cleanup(struct k_pipe *pipe)
170 {
171 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, cleanup, pipe);
172
173 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
174
175 CHECKIF((z_waitq_head(&pipe->wait_q.readers) != NULL) ||
176 (z_waitq_head(&pipe->wait_q.writers) != NULL)) {
177 k_spin_unlock(&pipe->lock, key);
178
179 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, -EAGAIN);
180
181 return -EAGAIN;
182 }
183
184 if ((pipe->flags & K_PIPE_FLAG_ALLOC) != 0U) {
185 k_free(pipe->buffer);
186 pipe->buffer = NULL;
187
188 /*
189 * Freeing the buffer changes the pipe into a bufferless
190 * pipe. Reset the pipe's counters to prevent malfunction.
191 */
192
193 pipe->size = 0U;
194 pipe->bytes_used = 0U;
195 pipe->read_index = 0U;
196 pipe->write_index = 0U;
197 pipe->flags &= ~K_PIPE_FLAG_ALLOC;
198 }
199
200 k_spin_unlock(&pipe->lock, key);
201
202 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, 0U);
203
204 return 0;
205 }
206
207 /**
208 * @brief Copy bytes from @a src to @a dest
209 *
210 * @return Number of bytes copied
211 */
pipe_xfer(unsigned char * dest,size_t dest_size,const unsigned char * src,size_t src_size)212 static size_t pipe_xfer(unsigned char *dest, size_t dest_size,
213 const unsigned char *src, size_t src_size)
214 {
215 size_t num_bytes = MIN(dest_size, src_size);
216
217 if (dest == NULL) {
218 /* Data is being flushed. Pretend the data was copied. */
219 return num_bytes;
220 }
221
222 (void) memcpy(dest, src, num_bytes);
223
224 return num_bytes;
225 }
226
227 /**
228 * @brief Callback routine used to populate wait list
229 *
230 * @return 1 to stop further walking; 0 to continue walking
231 */
pipe_walk_op(struct k_thread * thread,void * data)232 static int pipe_walk_op(struct k_thread *thread, void *data)
233 {
234 struct waitq_walk_data *walk_data = data;
235 struct _pipe_desc *desc = (struct _pipe_desc *)thread->base.swap_data;
236
237 sys_dlist_append(walk_data->list, &desc->node);
238
239 walk_data->bytes_available += desc->bytes_to_xfer;
240
241 if (walk_data->bytes_available >= walk_data->bytes_requested) {
242 return 1;
243 }
244
245 return 0;
246 }
247
248 /**
249 * @brief Popluate pipe descriptors for copying to/from waiters' buffers
250 *
251 * This routine cycles through the waiters on the wait queue and creates
252 * a list of threads that will have data directly copied to / read from
253 * their buffers. This list helps us avoid double copying later.
254 *
255 * @return # of bytes available for direct copying
256 */
pipe_waiter_list_populate(sys_dlist_t * list,_wait_q_t * wait_q,size_t bytes_to_xfer)257 static size_t pipe_waiter_list_populate(sys_dlist_t *list,
258 _wait_q_t *wait_q,
259 size_t bytes_to_xfer)
260 {
261 struct waitq_walk_data walk_data;
262
263 walk_data.list = list;
264 walk_data.bytes_requested = bytes_to_xfer;
265 walk_data.bytes_available = 0;
266
267 (void) z_sched_waitq_walk(wait_q, pipe_walk_op, &walk_data);
268
269 return walk_data.bytes_available;
270 }
271
272 /**
273 * @brief Populate pipe descriptors for copying to/from pipe buffer
274 *
275 * This routine is only called if the pipe buffer is not empty (when reading),
276 * or if not full (when writing).
277 */
pipe_buffer_list_populate(sys_dlist_t * list,struct _pipe_desc * desc,unsigned char * buffer,size_t size,size_t start,size_t end)278 static size_t pipe_buffer_list_populate(sys_dlist_t *list,
279 struct _pipe_desc *desc,
280 unsigned char *buffer,
281 size_t size,
282 size_t start,
283 size_t end)
284 {
285 sys_dlist_append(list, &desc[0].node);
286
287 desc[0].thread = NULL;
288 desc[0].buffer = &buffer[start];
289
290 if (start < end) {
291 desc[0].bytes_to_xfer = end - start;
292 return end - start;
293 }
294
295 desc[0].bytes_to_xfer = size - start;
296
297 desc[1].thread = NULL;
298 desc[1].buffer = &buffer[0];
299 desc[1].bytes_to_xfer = end;
300
301 sys_dlist_append(list, &desc[1].node);
302
303 return size - start + end;
304 }
305
306 /**
307 * @brief Determine the correct return code
308 *
309 * Bytes Xferred No Wait Wait
310 * >= Minimum 0 0
311 * < Minimum -EIO* -EAGAIN
312 *
313 * * The "-EIO No Wait" case was already checked after the list of pipe
314 * descriptors was created.
315 *
316 * @return See table above
317 */
pipe_return_code(size_t min_xfer,size_t bytes_remaining,size_t bytes_requested)318 static int pipe_return_code(size_t min_xfer, size_t bytes_remaining,
319 size_t bytes_requested)
320 {
321 if ((bytes_requested - bytes_remaining) >= min_xfer) {
322 /*
323 * At least the minimum number of requested
324 * bytes have been transferred.
325 */
326 return 0;
327 }
328
329 return -EAGAIN;
330 }
331
332 /**
333 * @brief Copy data from source(s) to destination(s)
334 */
335
pipe_write(struct k_pipe * pipe,sys_dlist_t * src_list,sys_dlist_t * dest_list,bool * reschedule)336 static size_t pipe_write(struct k_pipe *pipe, sys_dlist_t *src_list,
337 sys_dlist_t *dest_list, bool *reschedule)
338 {
339 struct _pipe_desc *src;
340 struct _pipe_desc *dest;
341 size_t bytes_copied;
342 size_t num_bytes_written = 0U;
343
344 src = (struct _pipe_desc *)sys_dlist_get(src_list);
345 dest = (struct _pipe_desc *)sys_dlist_get(dest_list);
346
347 while ((src != NULL) && (dest != NULL)) {
348 bytes_copied = pipe_xfer(dest->buffer, dest->bytes_to_xfer,
349 src->buffer, src->bytes_to_xfer);
350
351 num_bytes_written += bytes_copied;
352
353 dest->buffer += bytes_copied;
354 dest->bytes_to_xfer -= bytes_copied;
355
356 src->buffer += bytes_copied;
357 src->bytes_to_xfer -= bytes_copied;
358
359 if (dest->thread == NULL) {
360
361 /* Writing to the pipe buffer. Update details. */
362
363 pipe->bytes_used += bytes_copied;
364 pipe->write_index += bytes_copied;
365 if (pipe->write_index >= pipe->size) {
366 pipe->write_index -= pipe->size;
367 }
368 } else if (dest->bytes_to_xfer == 0U) {
369
370 /* The thread's read request has been satisfied. */
371
372 z_unpend_thread(dest->thread);
373 z_ready_thread(dest->thread);
374
375 *reschedule = true;
376 }
377
378 if (src->bytes_to_xfer == 0U) {
379 src = (struct _pipe_desc *)sys_dlist_get(src_list);
380 }
381
382 if (dest->bytes_to_xfer == 0U) {
383 dest = (struct _pipe_desc *)sys_dlist_get(dest_list);
384 }
385 }
386
387 return num_bytes_written;
388 }
389
z_impl_k_pipe_put(struct k_pipe * pipe,const void * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)390 int z_impl_k_pipe_put(struct k_pipe *pipe, const void *data,
391 size_t bytes_to_write, size_t *bytes_written,
392 size_t min_xfer, k_timeout_t timeout)
393 {
394 struct _pipe_desc pipe_desc[2];
395 struct _pipe_desc isr_desc;
396 struct _pipe_desc *src_desc;
397 sys_dlist_t dest_list;
398 sys_dlist_t src_list;
399 size_t bytes_can_write;
400 bool reschedule_needed = false;
401
402 __ASSERT(((arch_is_in_isr() == false) ||
403 K_TIMEOUT_EQ(timeout, K_NO_WAIT)), "");
404
405 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, put, pipe, timeout);
406
407 CHECKIF((min_xfer > bytes_to_write) || (bytes_written == NULL)) {
408 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout,
409 -EINVAL);
410
411 return -EINVAL;
412 }
413
414 sys_dlist_init(&src_list);
415 sys_dlist_init(&dest_list);
416
417 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
418
419 /*
420 * First, write to any waiting readers, if any exist.
421 * Second, write to the pipe buffer, if it exists.
422 */
423
424 bytes_can_write = pipe_waiter_list_populate(&dest_list,
425 &pipe->wait_q.readers,
426 bytes_to_write);
427
428 if (pipe->bytes_used != pipe->size) {
429 bytes_can_write += pipe_buffer_list_populate(&dest_list,
430 pipe_desc,
431 pipe->buffer,
432 pipe->size,
433 pipe->write_index,
434 pipe->read_index);
435 }
436
437 if ((bytes_can_write < min_xfer) &&
438 (K_TIMEOUT_EQ(timeout, K_NO_WAIT))) {
439
440 /* The request can not be fulfilled. */
441
442 k_spin_unlock(&pipe->lock, key);
443 *bytes_written = 0U;
444
445 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe,
446 timeout, -EIO);
447
448 return -EIO;
449 }
450
451 /*
452 * Do not use the pipe descriptor stored within k_thread if
453 * invoked from within an ISR as that is not safe to do.
454 */
455
456 src_desc = k_is_in_isr() ? &isr_desc : &_current->pipe_desc;
457
458 src_desc->buffer = (unsigned char *)data;
459 src_desc->bytes_to_xfer = bytes_to_write;
460 src_desc->thread = _current;
461 sys_dlist_append(&src_list, &src_desc->node);
462
463 *bytes_written = pipe_write(pipe, &src_list,
464 &dest_list, &reschedule_needed);
465
466 /*
467 * Only handle poll events if the pipe has had some bytes written and
468 * there are bytes remaining after any pending readers have read from it
469 */
470
471 if ((pipe->bytes_used != 0U) && (*bytes_written != 0U)) {
472 reschedule_needed = handle_poll_events(pipe) || reschedule_needed;
473 }
474
475 /*
476 * The immediate success conditions below are backwards
477 * compatible with an earlier pipe implementation.
478 */
479
480 if ((*bytes_written == bytes_to_write) ||
481 (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) ||
482 ((*bytes_written >= min_xfer) && (min_xfer > 0U))) {
483
484 /* The minimum amount of data has been copied */
485
486 if (reschedule_needed) {
487 z_reschedule(&pipe->lock, key);
488 } else {
489 k_spin_unlock(&pipe->lock, key);
490 }
491
492 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
493
494 return 0;
495 }
496
497 /* The minimum amount of data has not been copied. Block. */
498
499 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, put, pipe, timeout);
500
501 _current->base.swap_data = src_desc;
502
503 z_sched_wait(&pipe->lock, key, &pipe->wait_q.writers, timeout, NULL);
504
505 /*
506 * On SMP systems, threads in the processing list may timeout before
507 * the data has finished copying. The following spin lock/unlock pair
508 * prevents those threads from executing further until the data copying
509 * is complete.
510 */
511
512 key = k_spin_lock(&pipe->lock);
513 k_spin_unlock(&pipe->lock, key);
514
515 *bytes_written = bytes_to_write - src_desc->bytes_to_xfer;
516
517 int ret = pipe_return_code(min_xfer, src_desc->bytes_to_xfer,
518 bytes_to_write);
519
520 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, ret);
521
522 return ret;
523 }
524
525 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_put(struct k_pipe * pipe,const void * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)526 int z_vrfy_k_pipe_put(struct k_pipe *pipe, const void *data,
527 size_t bytes_to_write, size_t *bytes_written,
528 size_t min_xfer, k_timeout_t timeout)
529 {
530 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
531 K_OOPS(K_SYSCALL_MEMORY_WRITE(bytes_written, sizeof(*bytes_written)));
532 K_OOPS(K_SYSCALL_MEMORY_READ(data, bytes_to_write));
533
534 return z_impl_k_pipe_put(pipe, data,
535 bytes_to_write, bytes_written, min_xfer,
536 timeout);
537 }
538 #include <zephyr/syscalls/k_pipe_put_mrsh.c>
539 #endif /* CONFIG_USERSPACE */
540
pipe_get_internal(k_spinlock_key_t key,struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)541 static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe,
542 void *data, size_t bytes_to_read,
543 size_t *bytes_read, size_t min_xfer,
544 k_timeout_t timeout)
545 {
546 sys_dlist_t src_list;
547 struct _pipe_desc pipe_desc[2];
548 struct _pipe_desc isr_desc;
549 struct _pipe_desc *dest_desc;
550 struct _pipe_desc *src_desc;
551 size_t num_bytes_read = 0U;
552 size_t bytes_copied;
553 size_t bytes_can_read = 0U;
554 bool reschedule_needed = false;
555
556 /*
557 * Data copying takes place in the following order.
558 * 1. Copy data from the pipe buffer to the receive buffer.
559 * 2. Copy data from the waiting writer(s) to the receive buffer.
560 * 3. Refill the pipe buffer from the waiting writer(s).
561 */
562
563 sys_dlist_init(&src_list);
564
565 if (pipe->bytes_used != 0) {
566 bytes_can_read = pipe_buffer_list_populate(&src_list,
567 pipe_desc,
568 pipe->buffer,
569 pipe->size,
570 pipe->read_index,
571 pipe->write_index);
572 }
573
574 bytes_can_read += pipe_waiter_list_populate(&src_list,
575 &pipe->wait_q.writers,
576 bytes_to_read);
577
578 if ((bytes_can_read < min_xfer) &&
579 (K_TIMEOUT_EQ(timeout, K_NO_WAIT))) {
580
581 /* The request can not be fulfilled. */
582
583 k_spin_unlock(&pipe->lock, key);
584 *bytes_read = 0;
585
586 return -EIO;
587 }
588
589 /*
590 * Do not use the pipe descriptor stored within k_thread if
591 * invoked from within an ISR as that is not safe to do.
592 */
593
594 dest_desc = k_is_in_isr() ? &isr_desc : &_current->pipe_desc;
595
596 dest_desc->buffer = data;
597 dest_desc->bytes_to_xfer = bytes_to_read;
598 dest_desc->thread = _current;
599
600 src_desc = (struct _pipe_desc *)sys_dlist_get(&src_list);
601 while (src_desc != NULL) {
602 bytes_copied = pipe_xfer(dest_desc->buffer,
603 dest_desc->bytes_to_xfer,
604 src_desc->buffer,
605 src_desc->bytes_to_xfer);
606
607 num_bytes_read += bytes_copied;
608
609 src_desc->buffer += bytes_copied;
610 src_desc->bytes_to_xfer -= bytes_copied;
611
612 if (dest_desc->buffer != NULL) {
613 dest_desc->buffer += bytes_copied;
614 }
615 dest_desc->bytes_to_xfer -= bytes_copied;
616
617 if (src_desc->thread == NULL) {
618
619 /* Reading from the pipe buffer. Update details. */
620
621 pipe->bytes_used -= bytes_copied;
622 pipe->read_index += bytes_copied;
623 if (pipe->read_index >= pipe->size) {
624 pipe->read_index -= pipe->size;
625 }
626 } else if (src_desc->bytes_to_xfer == 0U) {
627
628 /* The thread's write request has been satisfied. */
629
630 z_unpend_thread(src_desc->thread);
631 z_ready_thread(src_desc->thread);
632
633 reschedule_needed = true;
634 }
635 src_desc = (struct _pipe_desc *)sys_dlist_get(&src_list);
636 }
637
638 if (pipe->bytes_used != pipe->size) {
639 sys_dlist_t pipe_list;
640
641 /*
642 * The pipe is not full. If there are any waiting writers,
643 * refill the pipe.
644 */
645
646 sys_dlist_init(&src_list);
647 sys_dlist_init(&pipe_list);
648
649 (void) pipe_waiter_list_populate(&src_list,
650 &pipe->wait_q.writers,
651 pipe->size - pipe->bytes_used);
652
653 (void) pipe_buffer_list_populate(&pipe_list, pipe_desc,
654 pipe->buffer, pipe->size,
655 pipe->write_index,
656 pipe->read_index);
657
658 (void) pipe_write(pipe, &src_list,
659 &pipe_list, &reschedule_needed);
660 }
661
662 /*
663 * The immediate success conditions below are backwards
664 * compatible with an earlier pipe implementation.
665 */
666
667 if ((num_bytes_read == bytes_to_read) ||
668 (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) ||
669 ((num_bytes_read >= min_xfer) && (min_xfer > 0U))) {
670
671 /* The minimum amount of data has been copied */
672
673 *bytes_read = num_bytes_read;
674 if (reschedule_needed) {
675 z_reschedule(&pipe->lock, key);
676 } else {
677 k_spin_unlock(&pipe->lock, key);
678 }
679
680 return 0;
681 }
682
683 /* The minimum amount of data has not been copied. Block. */
684
685 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, get, pipe, timeout);
686
687 _current->base.swap_data = dest_desc;
688
689 z_sched_wait(&pipe->lock, key, &pipe->wait_q.readers, timeout, NULL);
690
691 /*
692 * On SMP systems, threads in the processing list may timeout before
693 * the data has finished copying. The following spin lock/unlock pair
694 * prevents those threads from executing further until the data copying
695 * is complete.
696 */
697
698 key = k_spin_lock(&pipe->lock);
699 k_spin_unlock(&pipe->lock, key);
700
701 *bytes_read = bytes_to_read - dest_desc->bytes_to_xfer;
702
703 int ret = pipe_return_code(min_xfer, dest_desc->bytes_to_xfer,
704 bytes_to_read);
705
706 return ret;
707 }
708
z_impl_k_pipe_get(struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)709 int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
710 size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
711 {
712 __ASSERT(((arch_is_in_isr() == false) ||
713 K_TIMEOUT_EQ(timeout, K_NO_WAIT)), "");
714
715 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, get, pipe, timeout);
716
717 CHECKIF((min_xfer > bytes_to_read) || (bytes_read == NULL)) {
718 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe,
719 timeout, -EINVAL);
720
721 return -EINVAL;
722 }
723
724 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
725
726 int ret = pipe_get_internal(key, pipe, data, bytes_to_read, bytes_read,
727 min_xfer, timeout);
728
729 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, ret);
730
731 return ret;
732 }
733
734 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_get(struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)735 int z_vrfy_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
736 size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
737 {
738 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
739 K_OOPS(K_SYSCALL_MEMORY_WRITE(bytes_read, sizeof(*bytes_read)));
740 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, bytes_to_read));
741
742 return z_impl_k_pipe_get(pipe, data,
743 bytes_to_read, bytes_read, min_xfer,
744 timeout);
745 }
746 #include <zephyr/syscalls/k_pipe_get_mrsh.c>
747 #endif /* CONFIG_USERSPACE */
748
z_impl_k_pipe_read_avail(struct k_pipe * pipe)749 size_t z_impl_k_pipe_read_avail(struct k_pipe *pipe)
750 {
751 size_t res;
752 k_spinlock_key_t key;
753
754 /* Buffer and size are fixed. No need to spin. */
755 if ((pipe->buffer == NULL) || (pipe->size == 0U)) {
756 res = 0;
757 goto out;
758 }
759
760 key = k_spin_lock(&pipe->lock);
761
762 if (pipe->read_index == pipe->write_index) {
763 res = pipe->bytes_used;
764 } else if (pipe->read_index < pipe->write_index) {
765 res = pipe->write_index - pipe->read_index;
766 } else {
767 res = pipe->size - (pipe->read_index - pipe->write_index);
768 }
769
770 k_spin_unlock(&pipe->lock, key);
771
772 out:
773 return res;
774 }
775
776 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_read_avail(struct k_pipe * pipe)777 size_t z_vrfy_k_pipe_read_avail(struct k_pipe *pipe)
778 {
779 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
780
781 return z_impl_k_pipe_read_avail(pipe);
782 }
783 #include <zephyr/syscalls/k_pipe_read_avail_mrsh.c>
784 #endif /* CONFIG_USERSPACE */
785
z_impl_k_pipe_write_avail(struct k_pipe * pipe)786 size_t z_impl_k_pipe_write_avail(struct k_pipe *pipe)
787 {
788 size_t res;
789 k_spinlock_key_t key;
790
791 /* Buffer and size are fixed. No need to spin. */
792 if ((pipe->buffer == NULL) || (pipe->size == 0U)) {
793 res = 0;
794 goto out;
795 }
796
797 key = k_spin_lock(&pipe->lock);
798
799 if (pipe->write_index == pipe->read_index) {
800 res = pipe->size - pipe->bytes_used;
801 } else if (pipe->write_index < pipe->read_index) {
802 res = pipe->read_index - pipe->write_index;
803 } else {
804 res = pipe->size - (pipe->write_index - pipe->read_index);
805 }
806
807 k_spin_unlock(&pipe->lock, key);
808
809 out:
810 return res;
811 }
812
813 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_write_avail(struct k_pipe * pipe)814 size_t z_vrfy_k_pipe_write_avail(struct k_pipe *pipe)
815 {
816 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
817
818 return z_impl_k_pipe_write_avail(pipe);
819 }
820 #include <zephyr/syscalls/k_pipe_write_avail_mrsh.c>
821 #endif /* CONFIG_USERSPACE */
822
823 #ifdef CONFIG_OBJ_CORE_PIPE
init_pipe_obj_core_list(void)824 static int init_pipe_obj_core_list(void)
825 {
826 /* Initialize pipe object type */
827
828 z_obj_type_init(&obj_type_pipe, K_OBJ_TYPE_PIPE_ID,
829 offsetof(struct k_pipe, obj_core));
830
831 /* Initialize and link statically defined pipes */
832
833 STRUCT_SECTION_FOREACH(k_pipe, pipe) {
834 k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
835 }
836
837 return 0;
838 }
839
840 SYS_INIT(init_pipe_obj_core_list, PRE_KERNEL_1,
841 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
842 #endif /* CONFIG_OBJ_CORE_PIPE */
843