1 /*
2 * Copyright (c) 2016 Wind River Systems, Inc.
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /**
8 * @file
9 *
10 * @brief Pipes
11 */
12
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15
16 #include <zephyr/toolchain.h>
17 #include <ksched.h>
18 #include <wait_q.h>
19 #include <zephyr/init.h>
20 #include <zephyr/internal/syscall_handler.h>
21 #include <kernel_internal.h>
22 #include <zephyr/sys/check.h>
23
24 struct waitq_walk_data {
25 sys_dlist_t *list;
26 size_t bytes_requested;
27 size_t bytes_available;
28 };
29
30 static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe,
31 void *data, size_t bytes_to_read,
32 size_t *bytes_read, size_t min_xfer,
33 k_timeout_t timeout);
34 #ifdef CONFIG_OBJ_CORE_PIPE
35 static struct k_obj_type obj_type_pipe;
36 #endif /* CONFIG_OBJ_CORE_PIPE */
37
38
k_pipe_init(struct k_pipe * pipe,unsigned char * buffer,size_t size)39 void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)
40 {
41 pipe->buffer = buffer;
42 pipe->size = size;
43 pipe->bytes_used = 0U;
44 pipe->read_index = 0U;
45 pipe->write_index = 0U;
46 pipe->lock = (struct k_spinlock){};
47 z_waitq_init(&pipe->wait_q.writers);
48 z_waitq_init(&pipe->wait_q.readers);
49 SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe);
50
51 pipe->flags = 0;
52
53 #if defined(CONFIG_POLL)
54 sys_dlist_init(&pipe->poll_events);
55 #endif /* CONFIG_POLL */
56 k_object_init(pipe);
57
58 #ifdef CONFIG_OBJ_CORE_PIPE
59 k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
60 #endif /* CONFIG_OBJ_CORE_PIPE */
61 }
62
z_impl_k_pipe_alloc_init(struct k_pipe * pipe,size_t size)63 int z_impl_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
64 {
65 void *buffer;
66 int ret;
67
68 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, alloc_init, pipe);
69
70 if (size != 0U) {
71 buffer = z_thread_malloc(size);
72 if (buffer != NULL) {
73 k_pipe_init(pipe, buffer, size);
74 pipe->flags = K_PIPE_FLAG_ALLOC;
75 ret = 0;
76 } else {
77 ret = -ENOMEM;
78 }
79 } else {
80 k_pipe_init(pipe, NULL, 0U);
81 ret = 0;
82 }
83
84 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, alloc_init, pipe, ret);
85
86 return ret;
87 }
88
89 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_alloc_init(struct k_pipe * pipe,size_t size)90 static inline int z_vrfy_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
91 {
92 K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(pipe, K_OBJ_PIPE));
93
94 return z_impl_k_pipe_alloc_init(pipe, size);
95 }
96 #include <zephyr/syscalls/k_pipe_alloc_init_mrsh.c>
97 #endif /* CONFIG_USERSPACE */
98
handle_poll_events(struct k_pipe * pipe)99 static inline void handle_poll_events(struct k_pipe *pipe)
100 {
101 #ifdef CONFIG_POLL
102 z_handle_obj_poll_events(&pipe->poll_events, K_POLL_STATE_PIPE_DATA_AVAILABLE);
103 #else
104 ARG_UNUSED(pipe);
105 #endif /* CONFIG_POLL */
106 }
107
z_impl_k_pipe_flush(struct k_pipe * pipe)108 void z_impl_k_pipe_flush(struct k_pipe *pipe)
109 {
110 size_t bytes_read;
111
112 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, flush, pipe);
113
114 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
115
116 (void) pipe_get_internal(key, pipe, NULL, (size_t) -1, &bytes_read, 0U,
117 K_NO_WAIT);
118
119 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, flush, pipe);
120 }
121
122 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_flush(struct k_pipe * pipe)123 void z_vrfy_k_pipe_flush(struct k_pipe *pipe)
124 {
125 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
126
127 z_impl_k_pipe_flush(pipe);
128 }
129 #include <zephyr/syscalls/k_pipe_flush_mrsh.c>
130 #endif /* CONFIG_USERSPACE */
131
z_impl_k_pipe_buffer_flush(struct k_pipe * pipe)132 void z_impl_k_pipe_buffer_flush(struct k_pipe *pipe)
133 {
134 size_t bytes_read;
135
136 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, buffer_flush, pipe);
137
138 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
139
140 if (pipe->buffer != NULL) {
141 (void) pipe_get_internal(key, pipe, NULL, pipe->size,
142 &bytes_read, 0U, K_NO_WAIT);
143 } else {
144 k_spin_unlock(&pipe->lock, key);
145 }
146
147 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, buffer_flush, pipe);
148 }
149
150 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_buffer_flush(struct k_pipe * pipe)151 void z_vrfy_k_pipe_buffer_flush(struct k_pipe *pipe)
152 {
153 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
154
155 z_impl_k_pipe_buffer_flush(pipe);
156 }
157 #endif /* CONFIG_USERSPACE */
158
k_pipe_cleanup(struct k_pipe * pipe)159 int k_pipe_cleanup(struct k_pipe *pipe)
160 {
161 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, cleanup, pipe);
162
163 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
164
165 CHECKIF((z_waitq_head(&pipe->wait_q.readers) != NULL) ||
166 (z_waitq_head(&pipe->wait_q.writers) != NULL)) {
167 k_spin_unlock(&pipe->lock, key);
168
169 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, -EAGAIN);
170
171 return -EAGAIN;
172 }
173
174 if ((pipe->flags & K_PIPE_FLAG_ALLOC) != 0U) {
175 k_free(pipe->buffer);
176 pipe->buffer = NULL;
177
178 /*
179 * Freeing the buffer changes the pipe into a bufferless
180 * pipe. Reset the pipe's counters to prevent malfunction.
181 */
182
183 pipe->size = 0U;
184 pipe->bytes_used = 0U;
185 pipe->read_index = 0U;
186 pipe->write_index = 0U;
187 pipe->flags &= ~K_PIPE_FLAG_ALLOC;
188 }
189
190 k_spin_unlock(&pipe->lock, key);
191
192 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, 0U);
193
194 return 0;
195 }
196
197 /**
198 * @brief Copy bytes from @a src to @a dest
199 *
200 * @return Number of bytes copied
201 */
pipe_xfer(unsigned char * dest,size_t dest_size,const unsigned char * src,size_t src_size)202 static size_t pipe_xfer(unsigned char *dest, size_t dest_size,
203 const unsigned char *src, size_t src_size)
204 {
205 size_t num_bytes = MIN(dest_size, src_size);
206
207 if (dest == NULL) {
208 /* Data is being flushed. Pretend the data was copied. */
209 return num_bytes;
210 }
211
212 (void) memcpy(dest, src, num_bytes);
213
214 return num_bytes;
215 }
216
217 /**
218 * @brief Callback routine used to populate wait list
219 *
220 * @return 1 to stop further walking; 0 to continue walking
221 */
pipe_walk_op(struct k_thread * thread,void * data)222 static int pipe_walk_op(struct k_thread *thread, void *data)
223 {
224 struct waitq_walk_data *walk_data = data;
225 struct _pipe_desc *desc = (struct _pipe_desc *)thread->base.swap_data;
226
227 sys_dlist_append(walk_data->list, &desc->node);
228
229 walk_data->bytes_available += desc->bytes_to_xfer;
230
231 if (walk_data->bytes_available >= walk_data->bytes_requested) {
232 return 1;
233 }
234
235 return 0;
236 }
237
238 /**
239 * @brief Popluate pipe descriptors for copying to/from waiters' buffers
240 *
241 * This routine cycles through the waiters on the wait queue and creates
242 * a list of threads that will have data directly copied to / read from
243 * their buffers. This list helps us avoid double copying later.
244 *
245 * @return # of bytes available for direct copying
246 */
pipe_waiter_list_populate(sys_dlist_t * list,_wait_q_t * wait_q,size_t bytes_to_xfer)247 static size_t pipe_waiter_list_populate(sys_dlist_t *list,
248 _wait_q_t *wait_q,
249 size_t bytes_to_xfer)
250 {
251 struct waitq_walk_data walk_data;
252
253 walk_data.list = list;
254 walk_data.bytes_requested = bytes_to_xfer;
255 walk_data.bytes_available = 0;
256
257 (void) z_sched_waitq_walk(wait_q, pipe_walk_op, &walk_data);
258
259 return walk_data.bytes_available;
260 }
261
262 /**
263 * @brief Populate pipe descriptors for copying to/from pipe buffer
264 *
265 * This routine is only called if the pipe buffer is not empty (when reading),
266 * or if not full (when writing).
267 */
pipe_buffer_list_populate(sys_dlist_t * list,struct _pipe_desc * desc,unsigned char * buffer,size_t size,size_t start,size_t end)268 static size_t pipe_buffer_list_populate(sys_dlist_t *list,
269 struct _pipe_desc *desc,
270 unsigned char *buffer,
271 size_t size,
272 size_t start,
273 size_t end)
274 {
275 sys_dlist_append(list, &desc[0].node);
276
277 desc[0].thread = NULL;
278 desc[0].buffer = &buffer[start];
279
280 if (start < end) {
281 desc[0].bytes_to_xfer = end - start;
282 return end - start;
283 }
284
285 desc[0].bytes_to_xfer = size - start;
286
287 desc[1].thread = NULL;
288 desc[1].buffer = &buffer[0];
289 desc[1].bytes_to_xfer = end;
290
291 sys_dlist_append(list, &desc[1].node);
292
293 return size - start + end;
294 }
295
296 /**
297 * @brief Determine the correct return code
298 *
299 * Bytes Xferred No Wait Wait
300 * >= Minimum 0 0
301 * < Minimum -EIO* -EAGAIN
302 *
303 * * The "-EIO No Wait" case was already checked after the list of pipe
304 * descriptors was created.
305 *
306 * @return See table above
307 */
pipe_return_code(size_t min_xfer,size_t bytes_remaining,size_t bytes_requested)308 static int pipe_return_code(size_t min_xfer, size_t bytes_remaining,
309 size_t bytes_requested)
310 {
311 if ((bytes_requested - bytes_remaining) >= min_xfer) {
312 /*
313 * At least the minimum number of requested
314 * bytes have been transferred.
315 */
316 return 0;
317 }
318
319 return -EAGAIN;
320 }
321
322 /**
323 * @brief Copy data from source(s) to destination(s)
324 */
325
pipe_write(struct k_pipe * pipe,sys_dlist_t * src_list,sys_dlist_t * dest_list,bool * reschedule)326 static size_t pipe_write(struct k_pipe *pipe, sys_dlist_t *src_list,
327 sys_dlist_t *dest_list, bool *reschedule)
328 {
329 struct _pipe_desc *src;
330 struct _pipe_desc *dest;
331 size_t bytes_copied;
332 size_t num_bytes_written = 0U;
333
334 src = (struct _pipe_desc *)sys_dlist_get(src_list);
335 dest = (struct _pipe_desc *)sys_dlist_get(dest_list);
336
337 while ((src != NULL) && (dest != NULL)) {
338 bytes_copied = pipe_xfer(dest->buffer, dest->bytes_to_xfer,
339 src->buffer, src->bytes_to_xfer);
340
341 num_bytes_written += bytes_copied;
342
343 dest->buffer += bytes_copied;
344 dest->bytes_to_xfer -= bytes_copied;
345
346 src->buffer += bytes_copied;
347 src->bytes_to_xfer -= bytes_copied;
348
349 if (dest->thread == NULL) {
350
351 /* Writing to the pipe buffer. Update details. */
352
353 pipe->bytes_used += bytes_copied;
354 pipe->write_index += bytes_copied;
355 if (pipe->write_index >= pipe->size) {
356 pipe->write_index -= pipe->size;
357 }
358 } else if (dest->bytes_to_xfer == 0U) {
359
360 /* The thread's read request has been satisfied. */
361
362 z_unpend_thread(dest->thread);
363 z_ready_thread(dest->thread);
364
365 *reschedule = true;
366 }
367
368 if (src->bytes_to_xfer == 0U) {
369 src = (struct _pipe_desc *)sys_dlist_get(src_list);
370 }
371
372 if (dest->bytes_to_xfer == 0U) {
373 dest = (struct _pipe_desc *)sys_dlist_get(dest_list);
374 }
375 }
376
377 return num_bytes_written;
378 }
379
z_impl_k_pipe_put(struct k_pipe * pipe,const void * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)380 int z_impl_k_pipe_put(struct k_pipe *pipe, const void *data,
381 size_t bytes_to_write, size_t *bytes_written,
382 size_t min_xfer, k_timeout_t timeout)
383 {
384 struct _pipe_desc pipe_desc[2];
385 struct _pipe_desc isr_desc;
386 struct _pipe_desc *src_desc;
387 sys_dlist_t dest_list;
388 sys_dlist_t src_list;
389 size_t bytes_can_write;
390 bool reschedule_needed = false;
391
392 __ASSERT(((arch_is_in_isr() == false) ||
393 K_TIMEOUT_EQ(timeout, K_NO_WAIT)), "");
394
395 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, put, pipe, timeout);
396
397 CHECKIF((min_xfer > bytes_to_write) || (bytes_written == NULL)) {
398 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout,
399 -EINVAL);
400
401 return -EINVAL;
402 }
403
404 sys_dlist_init(&src_list);
405 sys_dlist_init(&dest_list);
406
407 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
408
409 /*
410 * First, write to any waiting readers, if any exist.
411 * Second, write to the pipe buffer, if it exists.
412 */
413
414 bytes_can_write = pipe_waiter_list_populate(&dest_list,
415 &pipe->wait_q.readers,
416 bytes_to_write);
417
418 if (pipe->bytes_used != pipe->size) {
419 bytes_can_write += pipe_buffer_list_populate(&dest_list,
420 pipe_desc,
421 pipe->buffer,
422 pipe->size,
423 pipe->write_index,
424 pipe->read_index);
425 }
426
427 if ((bytes_can_write < min_xfer) &&
428 (K_TIMEOUT_EQ(timeout, K_NO_WAIT))) {
429
430 /* The request can not be fulfilled. */
431
432 k_spin_unlock(&pipe->lock, key);
433 *bytes_written = 0U;
434
435 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe,
436 timeout, -EIO);
437
438 return -EIO;
439 }
440
441 /*
442 * Do not use the pipe descriptor stored within k_thread if
443 * invoked from within an ISR as that is not safe to do.
444 */
445
446 src_desc = k_is_in_isr() ? &isr_desc : &arch_current_thread()->pipe_desc;
447
448 src_desc->buffer = (unsigned char *)data;
449 src_desc->bytes_to_xfer = bytes_to_write;
450 src_desc->thread = arch_current_thread();
451 sys_dlist_append(&src_list, &src_desc->node);
452
453 *bytes_written = pipe_write(pipe, &src_list,
454 &dest_list, &reschedule_needed);
455
456 /*
457 * Only handle poll events if the pipe has had some bytes written and
458 * there are bytes remaining after any pending readers have read from it
459 */
460
461 if ((pipe->bytes_used != 0U) && (*bytes_written != 0U)) {
462 handle_poll_events(pipe);
463 }
464
465 /*
466 * The immediate success conditions below are backwards
467 * compatible with an earlier pipe implementation.
468 */
469
470 if ((*bytes_written == bytes_to_write) ||
471 (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) ||
472 ((*bytes_written >= min_xfer) && (min_xfer > 0U))) {
473
474 /* The minimum amount of data has been copied */
475
476 if (reschedule_needed) {
477 z_reschedule(&pipe->lock, key);
478 } else {
479 k_spin_unlock(&pipe->lock, key);
480 }
481
482 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
483
484 return 0;
485 }
486
487 /* The minimum amount of data has not been copied. Block. */
488
489 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, put, pipe, timeout);
490
491 arch_current_thread()->base.swap_data = src_desc;
492
493 z_sched_wait(&pipe->lock, key, &pipe->wait_q.writers, timeout, NULL);
494
495 /*
496 * On SMP systems, threads in the processing list may timeout before
497 * the data has finished copying. The following spin lock/unlock pair
498 * prevents those threads from executing further until the data copying
499 * is complete.
500 */
501
502 key = k_spin_lock(&pipe->lock);
503 k_spin_unlock(&pipe->lock, key);
504
505 *bytes_written = bytes_to_write - src_desc->bytes_to_xfer;
506
507 int ret = pipe_return_code(min_xfer, src_desc->bytes_to_xfer,
508 bytes_to_write);
509
510 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, ret);
511
512 return ret;
513 }
514
515 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_put(struct k_pipe * pipe,const void * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)516 int z_vrfy_k_pipe_put(struct k_pipe *pipe, const void *data,
517 size_t bytes_to_write, size_t *bytes_written,
518 size_t min_xfer, k_timeout_t timeout)
519 {
520 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
521 K_OOPS(K_SYSCALL_MEMORY_WRITE(bytes_written, sizeof(*bytes_written)));
522 K_OOPS(K_SYSCALL_MEMORY_READ(data, bytes_to_write));
523
524 return z_impl_k_pipe_put(pipe, data,
525 bytes_to_write, bytes_written, min_xfer,
526 timeout);
527 }
528 #include <zephyr/syscalls/k_pipe_put_mrsh.c>
529 #endif /* CONFIG_USERSPACE */
530
pipe_get_internal(k_spinlock_key_t key,struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)531 static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe,
532 void *data, size_t bytes_to_read,
533 size_t *bytes_read, size_t min_xfer,
534 k_timeout_t timeout)
535 {
536 sys_dlist_t src_list;
537 struct _pipe_desc pipe_desc[2];
538 struct _pipe_desc isr_desc;
539 struct _pipe_desc *dest_desc;
540 struct _pipe_desc *src_desc;
541 size_t num_bytes_read = 0U;
542 size_t bytes_copied;
543 size_t bytes_can_read = 0U;
544 bool reschedule_needed = false;
545
546 /*
547 * Data copying takes place in the following order.
548 * 1. Copy data from the pipe buffer to the receive buffer.
549 * 2. Copy data from the waiting writer(s) to the receive buffer.
550 * 3. Refill the pipe buffer from the waiting writer(s).
551 */
552
553 sys_dlist_init(&src_list);
554
555 if (pipe->bytes_used != 0) {
556 bytes_can_read = pipe_buffer_list_populate(&src_list,
557 pipe_desc,
558 pipe->buffer,
559 pipe->size,
560 pipe->read_index,
561 pipe->write_index);
562 }
563
564 bytes_can_read += pipe_waiter_list_populate(&src_list,
565 &pipe->wait_q.writers,
566 bytes_to_read);
567
568 if ((bytes_can_read < min_xfer) &&
569 (K_TIMEOUT_EQ(timeout, K_NO_WAIT))) {
570
571 /* The request can not be fulfilled. */
572
573 k_spin_unlock(&pipe->lock, key);
574 *bytes_read = 0;
575
576 return -EIO;
577 }
578
579 /*
580 * Do not use the pipe descriptor stored within k_thread if
581 * invoked from within an ISR as that is not safe to do.
582 */
583
584 dest_desc = k_is_in_isr() ? &isr_desc : &arch_current_thread()->pipe_desc;
585
586 dest_desc->buffer = data;
587 dest_desc->bytes_to_xfer = bytes_to_read;
588 dest_desc->thread = arch_current_thread();
589
590 src_desc = (struct _pipe_desc *)sys_dlist_get(&src_list);
591 while (src_desc != NULL) {
592 bytes_copied = pipe_xfer(dest_desc->buffer,
593 dest_desc->bytes_to_xfer,
594 src_desc->buffer,
595 src_desc->bytes_to_xfer);
596
597 num_bytes_read += bytes_copied;
598
599 src_desc->buffer += bytes_copied;
600 src_desc->bytes_to_xfer -= bytes_copied;
601
602 if (dest_desc->buffer != NULL) {
603 dest_desc->buffer += bytes_copied;
604 }
605 dest_desc->bytes_to_xfer -= bytes_copied;
606
607 if (src_desc->thread == NULL) {
608
609 /* Reading from the pipe buffer. Update details. */
610
611 pipe->bytes_used -= bytes_copied;
612 pipe->read_index += bytes_copied;
613 if (pipe->read_index >= pipe->size) {
614 pipe->read_index -= pipe->size;
615 }
616 } else if (src_desc->bytes_to_xfer == 0U) {
617
618 /* The thread's write request has been satisfied. */
619
620 z_unpend_thread(src_desc->thread);
621 z_ready_thread(src_desc->thread);
622
623 reschedule_needed = true;
624 }
625 src_desc = (struct _pipe_desc *)sys_dlist_get(&src_list);
626 }
627
628 if (pipe->bytes_used != pipe->size) {
629 sys_dlist_t pipe_list;
630
631 /*
632 * The pipe is not full. If there are any waiting writers,
633 * refill the pipe.
634 */
635
636 sys_dlist_init(&src_list);
637 sys_dlist_init(&pipe_list);
638
639 (void) pipe_waiter_list_populate(&src_list,
640 &pipe->wait_q.writers,
641 pipe->size - pipe->bytes_used);
642
643 (void) pipe_buffer_list_populate(&pipe_list, pipe_desc,
644 pipe->buffer, pipe->size,
645 pipe->write_index,
646 pipe->read_index);
647
648 (void) pipe_write(pipe, &src_list,
649 &pipe_list, &reschedule_needed);
650 }
651
652 /*
653 * The immediate success conditions below are backwards
654 * compatible with an earlier pipe implementation.
655 */
656
657 if ((num_bytes_read == bytes_to_read) ||
658 (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) ||
659 ((num_bytes_read >= min_xfer) && (min_xfer > 0U))) {
660
661 /* The minimum amount of data has been copied */
662
663 *bytes_read = num_bytes_read;
664 if (reschedule_needed) {
665 z_reschedule(&pipe->lock, key);
666 } else {
667 k_spin_unlock(&pipe->lock, key);
668 }
669
670 return 0;
671 }
672
673 /* The minimum amount of data has not been copied. Block. */
674
675 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, get, pipe, timeout);
676
677 arch_current_thread()->base.swap_data = dest_desc;
678
679 z_sched_wait(&pipe->lock, key, &pipe->wait_q.readers, timeout, NULL);
680
681 /*
682 * On SMP systems, threads in the processing list may timeout before
683 * the data has finished copying. The following spin lock/unlock pair
684 * prevents those threads from executing further until the data copying
685 * is complete.
686 */
687
688 key = k_spin_lock(&pipe->lock);
689 k_spin_unlock(&pipe->lock, key);
690
691 *bytes_read = bytes_to_read - dest_desc->bytes_to_xfer;
692
693 int ret = pipe_return_code(min_xfer, dest_desc->bytes_to_xfer,
694 bytes_to_read);
695
696 return ret;
697 }
698
z_impl_k_pipe_get(struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)699 int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
700 size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
701 {
702 __ASSERT(((arch_is_in_isr() == false) ||
703 K_TIMEOUT_EQ(timeout, K_NO_WAIT)), "");
704
705 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, get, pipe, timeout);
706
707 CHECKIF((min_xfer > bytes_to_read) || (bytes_read == NULL)) {
708 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe,
709 timeout, -EINVAL);
710
711 return -EINVAL;
712 }
713
714 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
715
716 int ret = pipe_get_internal(key, pipe, data, bytes_to_read, bytes_read,
717 min_xfer, timeout);
718
719 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, ret);
720
721 return ret;
722 }
723
724 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_get(struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)725 int z_vrfy_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
726 size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
727 {
728 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
729 K_OOPS(K_SYSCALL_MEMORY_WRITE(bytes_read, sizeof(*bytes_read)));
730 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, bytes_to_read));
731
732 return z_impl_k_pipe_get(pipe, data,
733 bytes_to_read, bytes_read, min_xfer,
734 timeout);
735 }
736 #include <zephyr/syscalls/k_pipe_get_mrsh.c>
737 #endif /* CONFIG_USERSPACE */
738
z_impl_k_pipe_read_avail(struct k_pipe * pipe)739 size_t z_impl_k_pipe_read_avail(struct k_pipe *pipe)
740 {
741 size_t res;
742 k_spinlock_key_t key;
743
744 /* Buffer and size are fixed. No need to spin. */
745 if ((pipe->buffer == NULL) || (pipe->size == 0U)) {
746 res = 0;
747 goto out;
748 }
749
750 key = k_spin_lock(&pipe->lock);
751
752 if (pipe->read_index == pipe->write_index) {
753 res = pipe->bytes_used;
754 } else if (pipe->read_index < pipe->write_index) {
755 res = pipe->write_index - pipe->read_index;
756 } else {
757 res = pipe->size - (pipe->read_index - pipe->write_index);
758 }
759
760 k_spin_unlock(&pipe->lock, key);
761
762 out:
763 return res;
764 }
765
766 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_read_avail(struct k_pipe * pipe)767 size_t z_vrfy_k_pipe_read_avail(struct k_pipe *pipe)
768 {
769 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
770
771 return z_impl_k_pipe_read_avail(pipe);
772 }
773 #include <zephyr/syscalls/k_pipe_read_avail_mrsh.c>
774 #endif /* CONFIG_USERSPACE */
775
z_impl_k_pipe_write_avail(struct k_pipe * pipe)776 size_t z_impl_k_pipe_write_avail(struct k_pipe *pipe)
777 {
778 size_t res;
779 k_spinlock_key_t key;
780
781 /* Buffer and size are fixed. No need to spin. */
782 if ((pipe->buffer == NULL) || (pipe->size == 0U)) {
783 res = 0;
784 goto out;
785 }
786
787 key = k_spin_lock(&pipe->lock);
788
789 if (pipe->write_index == pipe->read_index) {
790 res = pipe->size - pipe->bytes_used;
791 } else if (pipe->write_index < pipe->read_index) {
792 res = pipe->read_index - pipe->write_index;
793 } else {
794 res = pipe->size - (pipe->write_index - pipe->read_index);
795 }
796
797 k_spin_unlock(&pipe->lock, key);
798
799 out:
800 return res;
801 }
802
803 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_write_avail(struct k_pipe * pipe)804 size_t z_vrfy_k_pipe_write_avail(struct k_pipe *pipe)
805 {
806 K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
807
808 return z_impl_k_pipe_write_avail(pipe);
809 }
810 #include <zephyr/syscalls/k_pipe_write_avail_mrsh.c>
811 #endif /* CONFIG_USERSPACE */
812
813 #ifdef CONFIG_OBJ_CORE_PIPE
init_pipe_obj_core_list(void)814 static int init_pipe_obj_core_list(void)
815 {
816 /* Initialize pipe object type */
817
818 z_obj_type_init(&obj_type_pipe, K_OBJ_TYPE_PIPE_ID,
819 offsetof(struct k_pipe, obj_core));
820
821 /* Initialize and link statically defined pipes */
822
823 STRUCT_SECTION_FOREACH(k_pipe, pipe) {
824 k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
825 }
826
827 return 0;
828 }
829
830 SYS_INIT(init_pipe_obj_core_list, PRE_KERNEL_1,
831 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
832 #endif /* CONFIG_OBJ_CORE_PIPE */
833