1 /*
2  * Copyright (c) 2016 Wind River Systems, Inc.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 /**
8  * @file
9  *
10  * @brief Pipes
11  */
12 
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15 
16 #include <zephyr/toolchain.h>
17 #include <ksched.h>
18 #include <wait_q.h>
19 #include <zephyr/init.h>
20 #include <zephyr/internal/syscall_handler.h>
21 #include <kernel_internal.h>
22 #include <zephyr/sys/check.h>
23 
24 struct waitq_walk_data {
25 	sys_dlist_t *list;
26 	size_t       bytes_requested;
27 	size_t       bytes_available;
28 };
29 
30 static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe,
31 			     void *data, size_t bytes_to_read,
32 			     size_t *bytes_read, size_t min_xfer,
33 			     k_timeout_t timeout);
34 #ifdef CONFIG_OBJ_CORE_PIPE
35 static struct k_obj_type obj_type_pipe;
36 #endif /* CONFIG_OBJ_CORE_PIPE */
37 
38 
k_pipe_init(struct k_pipe * pipe,unsigned char * buffer,size_t size)39 void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)
40 {
41 	pipe->buffer = buffer;
42 	pipe->size = size;
43 	pipe->bytes_used = 0U;
44 	pipe->read_index = 0U;
45 	pipe->write_index = 0U;
46 	pipe->lock = (struct k_spinlock){};
47 	z_waitq_init(&pipe->wait_q.writers);
48 	z_waitq_init(&pipe->wait_q.readers);
49 	SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe);
50 
51 	pipe->flags = 0;
52 
53 #if defined(CONFIG_POLL)
54 	sys_dlist_init(&pipe->poll_events);
55 #endif /* CONFIG_POLL */
56 	k_object_init(pipe);
57 
58 #ifdef CONFIG_OBJ_CORE_PIPE
59 	k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
60 #endif /* CONFIG_OBJ_CORE_PIPE */
61 }
62 
z_impl_k_pipe_alloc_init(struct k_pipe * pipe,size_t size)63 int z_impl_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
64 {
65 	void *buffer;
66 	int ret;
67 
68 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, alloc_init, pipe);
69 
70 	if (size != 0U) {
71 		buffer = z_thread_malloc(size);
72 		if (buffer != NULL) {
73 			k_pipe_init(pipe, buffer, size);
74 			pipe->flags = K_PIPE_FLAG_ALLOC;
75 			ret = 0;
76 		} else {
77 			ret = -ENOMEM;
78 		}
79 	} else {
80 		k_pipe_init(pipe, NULL, 0U);
81 		ret = 0;
82 	}
83 
84 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, alloc_init, pipe, ret);
85 
86 	return ret;
87 }
88 
89 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_alloc_init(struct k_pipe * pipe,size_t size)90 static inline int z_vrfy_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
91 {
92 	K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(pipe, K_OBJ_PIPE));
93 
94 	return z_impl_k_pipe_alloc_init(pipe, size);
95 }
96 #include <zephyr/syscalls/k_pipe_alloc_init_mrsh.c>
97 #endif /* CONFIG_USERSPACE */
98 
handle_poll_events(struct k_pipe * pipe)99 static inline void handle_poll_events(struct k_pipe *pipe)
100 {
101 #ifdef CONFIG_POLL
102 	z_handle_obj_poll_events(&pipe->poll_events, K_POLL_STATE_PIPE_DATA_AVAILABLE);
103 #else
104 	ARG_UNUSED(pipe);
105 #endif /* CONFIG_POLL */
106 }
107 
z_impl_k_pipe_flush(struct k_pipe * pipe)108 void z_impl_k_pipe_flush(struct k_pipe *pipe)
109 {
110 	size_t  bytes_read;
111 
112 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, flush, pipe);
113 
114 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
115 
116 	(void) pipe_get_internal(key, pipe, NULL, (size_t) -1, &bytes_read, 0U,
117 				 K_NO_WAIT);
118 
119 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, flush, pipe);
120 }
121 
122 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_flush(struct k_pipe * pipe)123 void z_vrfy_k_pipe_flush(struct k_pipe *pipe)
124 {
125 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
126 
127 	z_impl_k_pipe_flush(pipe);
128 }
129 #include <zephyr/syscalls/k_pipe_flush_mrsh.c>
130 #endif /* CONFIG_USERSPACE */
131 
z_impl_k_pipe_buffer_flush(struct k_pipe * pipe)132 void z_impl_k_pipe_buffer_flush(struct k_pipe *pipe)
133 {
134 	size_t  bytes_read;
135 
136 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, buffer_flush, pipe);
137 
138 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
139 
140 	if (pipe->buffer != NULL) {
141 		(void) pipe_get_internal(key, pipe, NULL, pipe->size,
142 					 &bytes_read, 0U, K_NO_WAIT);
143 	} else {
144 		k_spin_unlock(&pipe->lock, key);
145 	}
146 
147 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, buffer_flush, pipe);
148 }
149 
150 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_buffer_flush(struct k_pipe * pipe)151 void z_vrfy_k_pipe_buffer_flush(struct k_pipe *pipe)
152 {
153 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
154 
155 	z_impl_k_pipe_buffer_flush(pipe);
156 }
157 #endif /* CONFIG_USERSPACE */
158 
k_pipe_cleanup(struct k_pipe * pipe)159 int k_pipe_cleanup(struct k_pipe *pipe)
160 {
161 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, cleanup, pipe);
162 
163 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
164 
165 	CHECKIF((z_waitq_head(&pipe->wait_q.readers) != NULL) ||
166 			(z_waitq_head(&pipe->wait_q.writers) != NULL)) {
167 		k_spin_unlock(&pipe->lock, key);
168 
169 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, -EAGAIN);
170 
171 		return -EAGAIN;
172 	}
173 
174 	if ((pipe->flags & K_PIPE_FLAG_ALLOC) != 0U) {
175 		k_free(pipe->buffer);
176 		pipe->buffer = NULL;
177 
178 		/*
179 		 * Freeing the buffer changes the pipe into a bufferless
180 		 * pipe. Reset the pipe's counters to prevent malfunction.
181 		 */
182 
183 		pipe->size = 0U;
184 		pipe->bytes_used = 0U;
185 		pipe->read_index = 0U;
186 		pipe->write_index = 0U;
187 		pipe->flags &= ~K_PIPE_FLAG_ALLOC;
188 	}
189 
190 	k_spin_unlock(&pipe->lock, key);
191 
192 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, 0U);
193 
194 	return 0;
195 }
196 
197 /**
198  * @brief Copy bytes from @a src to @a dest
199  *
200  * @return Number of bytes copied
201  */
pipe_xfer(unsigned char * dest,size_t dest_size,const unsigned char * src,size_t src_size)202 static size_t pipe_xfer(unsigned char *dest, size_t dest_size,
203 			 const unsigned char *src, size_t src_size)
204 {
205 	size_t num_bytes = MIN(dest_size, src_size);
206 
207 	if (dest == NULL) {
208 		/* Data is being flushed. Pretend the data was copied. */
209 		return num_bytes;
210 	}
211 
212 	(void) memcpy(dest, src, num_bytes);
213 
214 	return num_bytes;
215 }
216 
217 /**
218  * @brief Callback routine used to populate wait list
219  *
220  * @return 1 to stop further walking; 0 to continue walking
221  */
pipe_walk_op(struct k_thread * thread,void * data)222 static int pipe_walk_op(struct k_thread *thread, void *data)
223 {
224 	struct waitq_walk_data *walk_data = data;
225 	struct _pipe_desc *desc = (struct _pipe_desc *)thread->base.swap_data;
226 
227 	sys_dlist_append(walk_data->list, &desc->node);
228 
229 	walk_data->bytes_available += desc->bytes_to_xfer;
230 
231 	if (walk_data->bytes_available >= walk_data->bytes_requested) {
232 		return 1;
233 	}
234 
235 	return 0;
236 }
237 
238 /**
239  * @brief Popluate pipe descriptors for copying to/from waiters' buffers
240  *
241  * This routine cycles through the waiters on the wait queue and creates
242  * a list of threads that will have data directly copied to / read from
243  * their buffers. This list helps us avoid double copying later.
244  *
245  * @return # of bytes available for direct copying
246  */
pipe_waiter_list_populate(sys_dlist_t * list,_wait_q_t * wait_q,size_t bytes_to_xfer)247 static size_t pipe_waiter_list_populate(sys_dlist_t  *list,
248 					_wait_q_t    *wait_q,
249 					size_t        bytes_to_xfer)
250 {
251 	struct waitq_walk_data walk_data;
252 
253 	walk_data.list            = list;
254 	walk_data.bytes_requested = bytes_to_xfer;
255 	walk_data.bytes_available = 0;
256 
257 	(void) z_sched_waitq_walk(wait_q, pipe_walk_op, &walk_data);
258 
259 	return walk_data.bytes_available;
260 }
261 
262 /**
263  * @brief Populate pipe descriptors for copying to/from pipe buffer
264  *
265  * This routine is only called if the pipe buffer is not empty (when reading),
266  * or if not full (when writing).
267  */
pipe_buffer_list_populate(sys_dlist_t * list,struct _pipe_desc * desc,unsigned char * buffer,size_t size,size_t start,size_t end)268 static size_t pipe_buffer_list_populate(sys_dlist_t         *list,
269 					struct _pipe_desc   *desc,
270 					unsigned char       *buffer,
271 					size_t               size,
272 					size_t               start,
273 					size_t               end)
274 {
275 	sys_dlist_append(list, &desc[0].node);
276 
277 	desc[0].thread = NULL;
278 	desc[0].buffer = &buffer[start];
279 
280 	if (start < end) {
281 		desc[0].bytes_to_xfer = end - start;
282 		return end - start;
283 	}
284 
285 	desc[0].bytes_to_xfer = size - start;
286 
287 	desc[1].thread = NULL;
288 	desc[1].buffer = &buffer[0];
289 	desc[1].bytes_to_xfer = end;
290 
291 	sys_dlist_append(list, &desc[1].node);
292 
293 	return size - start + end;
294 }
295 
296 /**
297  * @brief Determine the correct return code
298  *
299  * Bytes Xferred   No Wait   Wait
300  *   >= Minimum       0       0
301  *    < Minimum      -EIO*   -EAGAIN
302  *
303  * * The "-EIO No Wait" case was already checked after the list of pipe
304  *   descriptors was created.
305  *
306  * @return See table above
307  */
pipe_return_code(size_t min_xfer,size_t bytes_remaining,size_t bytes_requested)308 static int pipe_return_code(size_t min_xfer, size_t bytes_remaining,
309 			     size_t bytes_requested)
310 {
311 	if ((bytes_requested - bytes_remaining) >= min_xfer) {
312 		/*
313 		 * At least the minimum number of requested
314 		 * bytes have been transferred.
315 		 */
316 		return 0;
317 	}
318 
319 	return -EAGAIN;
320 }
321 
322 /**
323  * @brief Copy data from source(s) to destination(s)
324  */
325 
pipe_write(struct k_pipe * pipe,sys_dlist_t * src_list,sys_dlist_t * dest_list,bool * reschedule)326 static size_t pipe_write(struct k_pipe *pipe, sys_dlist_t *src_list,
327 			 sys_dlist_t *dest_list, bool *reschedule)
328 {
329 	struct _pipe_desc *src;
330 	struct _pipe_desc *dest;
331 	size_t  bytes_copied;
332 	size_t  num_bytes_written = 0U;
333 
334 	src = (struct _pipe_desc *)sys_dlist_get(src_list);
335 	dest = (struct _pipe_desc *)sys_dlist_get(dest_list);
336 
337 	while ((src != NULL) && (dest != NULL)) {
338 		bytes_copied = pipe_xfer(dest->buffer, dest->bytes_to_xfer,
339 					 src->buffer, src->bytes_to_xfer);
340 
341 		num_bytes_written   += bytes_copied;
342 
343 		dest->buffer        += bytes_copied;
344 		dest->bytes_to_xfer -= bytes_copied;
345 
346 		src->buffer         += bytes_copied;
347 		src->bytes_to_xfer  -= bytes_copied;
348 
349 		if (dest->thread == NULL) {
350 
351 			/* Writing to the pipe buffer. Update details. */
352 
353 			pipe->bytes_used += bytes_copied;
354 			pipe->write_index += bytes_copied;
355 			if (pipe->write_index >= pipe->size) {
356 				pipe->write_index -= pipe->size;
357 			}
358 		} else if (dest->bytes_to_xfer == 0U) {
359 
360 			/* The thread's read request has been satisfied. */
361 
362 			z_unpend_thread(dest->thread);
363 			z_ready_thread(dest->thread);
364 
365 			*reschedule = true;
366 		}
367 
368 		if (src->bytes_to_xfer == 0U) {
369 			src = (struct _pipe_desc *)sys_dlist_get(src_list);
370 		}
371 
372 		if (dest->bytes_to_xfer == 0U) {
373 			dest = (struct _pipe_desc *)sys_dlist_get(dest_list);
374 		}
375 	}
376 
377 	return num_bytes_written;
378 }
379 
z_impl_k_pipe_put(struct k_pipe * pipe,const void * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)380 int z_impl_k_pipe_put(struct k_pipe *pipe, const void *data,
381 		      size_t bytes_to_write, size_t *bytes_written,
382 		      size_t min_xfer, k_timeout_t timeout)
383 {
384 	struct _pipe_desc  pipe_desc[2];
385 	struct _pipe_desc  isr_desc;
386 	struct _pipe_desc *src_desc;
387 	sys_dlist_t        dest_list;
388 	sys_dlist_t        src_list;
389 	size_t             bytes_can_write;
390 	bool               reschedule_needed = false;
391 
392 	__ASSERT(((arch_is_in_isr() == false) ||
393 		  K_TIMEOUT_EQ(timeout, K_NO_WAIT)), "");
394 
395 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, put, pipe, timeout);
396 
397 	CHECKIF((min_xfer > bytes_to_write) || (bytes_written == NULL)) {
398 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout,
399 					       -EINVAL);
400 
401 		return -EINVAL;
402 	}
403 
404 	sys_dlist_init(&src_list);
405 	sys_dlist_init(&dest_list);
406 
407 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
408 
409 	/*
410 	 * First, write to any waiting readers, if any exist.
411 	 * Second, write to the pipe buffer, if it exists.
412 	 */
413 
414 	bytes_can_write = pipe_waiter_list_populate(&dest_list,
415 						    &pipe->wait_q.readers,
416 						    bytes_to_write);
417 
418 	if (pipe->bytes_used != pipe->size) {
419 		bytes_can_write += pipe_buffer_list_populate(&dest_list,
420 							     pipe_desc,
421 							     pipe->buffer,
422 							     pipe->size,
423 							     pipe->write_index,
424 							     pipe->read_index);
425 	}
426 
427 	if ((bytes_can_write < min_xfer) &&
428 	    (K_TIMEOUT_EQ(timeout, K_NO_WAIT))) {
429 
430 		/* The request can not be fulfilled. */
431 
432 		k_spin_unlock(&pipe->lock, key);
433 		*bytes_written = 0U;
434 
435 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe,
436 					       timeout, -EIO);
437 
438 		return -EIO;
439 	}
440 
441 	/*
442 	 * Do not use the pipe descriptor stored within k_thread if
443 	 * invoked from within an ISR as that is not safe to do.
444 	 */
445 
446 	src_desc = k_is_in_isr() ? &isr_desc : &arch_current_thread()->pipe_desc;
447 
448 	src_desc->buffer        = (unsigned char *)data;
449 	src_desc->bytes_to_xfer = bytes_to_write;
450 	src_desc->thread        = arch_current_thread();
451 	sys_dlist_append(&src_list, &src_desc->node);
452 
453 	*bytes_written = pipe_write(pipe, &src_list,
454 				    &dest_list, &reschedule_needed);
455 
456 	/*
457 	 * Only handle poll events if the pipe has had some bytes written and
458 	 * there are bytes remaining after any pending readers have read from it
459 	 */
460 
461 	if ((pipe->bytes_used != 0U) && (*bytes_written != 0U)) {
462 		handle_poll_events(pipe);
463 	}
464 
465 	/*
466 	 * The immediate success conditions below are backwards
467 	 * compatible with an earlier pipe implementation.
468 	 */
469 
470 	if ((*bytes_written == bytes_to_write) ||
471 	    (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) ||
472 	    ((*bytes_written >= min_xfer) && (min_xfer > 0U))) {
473 
474 		/* The minimum amount of data has been copied */
475 
476 		if (reschedule_needed) {
477 			z_reschedule(&pipe->lock, key);
478 		} else {
479 			k_spin_unlock(&pipe->lock, key);
480 		}
481 
482 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
483 
484 		return 0;
485 	}
486 
487 	/* The minimum amount of data has not been copied. Block. */
488 
489 	SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, put, pipe, timeout);
490 
491 	arch_current_thread()->base.swap_data = src_desc;
492 
493 	z_sched_wait(&pipe->lock, key, &pipe->wait_q.writers, timeout, NULL);
494 
495 	/*
496 	 * On SMP systems, threads in the processing list may timeout before
497 	 * the data has finished copying. The following spin lock/unlock pair
498 	 * prevents those threads from executing further until the data copying
499 	 * is complete.
500 	 */
501 
502 	key = k_spin_lock(&pipe->lock);
503 	k_spin_unlock(&pipe->lock, key);
504 
505 	*bytes_written = bytes_to_write - src_desc->bytes_to_xfer;
506 
507 	int ret = pipe_return_code(min_xfer, src_desc->bytes_to_xfer,
508 				   bytes_to_write);
509 
510 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, ret);
511 
512 	return ret;
513 }
514 
515 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_put(struct k_pipe * pipe,const void * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)516 int z_vrfy_k_pipe_put(struct k_pipe *pipe, const void *data,
517 		      size_t bytes_to_write, size_t *bytes_written,
518 		      size_t min_xfer, k_timeout_t timeout)
519 {
520 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
521 	K_OOPS(K_SYSCALL_MEMORY_WRITE(bytes_written, sizeof(*bytes_written)));
522 	K_OOPS(K_SYSCALL_MEMORY_READ(data, bytes_to_write));
523 
524 	return z_impl_k_pipe_put(pipe, data,
525 				 bytes_to_write, bytes_written, min_xfer,
526 				 timeout);
527 }
528 #include <zephyr/syscalls/k_pipe_put_mrsh.c>
529 #endif /* CONFIG_USERSPACE */
530 
pipe_get_internal(k_spinlock_key_t key,struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)531 static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe,
532 			     void *data, size_t bytes_to_read,
533 			     size_t *bytes_read, size_t min_xfer,
534 			     k_timeout_t timeout)
535 {
536 	sys_dlist_t         src_list;
537 	struct _pipe_desc   pipe_desc[2];
538 	struct _pipe_desc   isr_desc;
539 	struct _pipe_desc  *dest_desc;
540 	struct _pipe_desc  *src_desc;
541 	size_t         num_bytes_read = 0U;
542 	size_t         bytes_copied;
543 	size_t         bytes_can_read = 0U;
544 	bool           reschedule_needed = false;
545 
546 	/*
547 	 * Data copying takes place in the following order.
548 	 * 1. Copy data from the pipe buffer to the receive buffer.
549 	 * 2. Copy data from the waiting writer(s) to the receive buffer.
550 	 * 3. Refill the pipe buffer from the waiting writer(s).
551 	 */
552 
553 	sys_dlist_init(&src_list);
554 
555 	if (pipe->bytes_used != 0) {
556 		bytes_can_read = pipe_buffer_list_populate(&src_list,
557 							   pipe_desc,
558 							   pipe->buffer,
559 							   pipe->size,
560 							   pipe->read_index,
561 							   pipe->write_index);
562 	}
563 
564 	bytes_can_read += pipe_waiter_list_populate(&src_list,
565 						    &pipe->wait_q.writers,
566 						    bytes_to_read);
567 
568 	if ((bytes_can_read < min_xfer) &&
569 	    (K_TIMEOUT_EQ(timeout, K_NO_WAIT))) {
570 
571 		/* The request can not be fulfilled. */
572 
573 		k_spin_unlock(&pipe->lock, key);
574 		*bytes_read = 0;
575 
576 		return -EIO;
577 	}
578 
579 	/*
580 	 * Do not use the pipe descriptor stored within k_thread if
581 	 * invoked from within an ISR as that is not safe to do.
582 	 */
583 
584 	dest_desc = k_is_in_isr() ? &isr_desc : &arch_current_thread()->pipe_desc;
585 
586 	dest_desc->buffer = data;
587 	dest_desc->bytes_to_xfer = bytes_to_read;
588 	dest_desc->thread = arch_current_thread();
589 
590 	src_desc = (struct _pipe_desc *)sys_dlist_get(&src_list);
591 	while (src_desc != NULL) {
592 		bytes_copied = pipe_xfer(dest_desc->buffer,
593 					  dest_desc->bytes_to_xfer,
594 					  src_desc->buffer,
595 					  src_desc->bytes_to_xfer);
596 
597 		num_bytes_read += bytes_copied;
598 
599 		src_desc->buffer += bytes_copied;
600 		src_desc->bytes_to_xfer -= bytes_copied;
601 
602 		if (dest_desc->buffer != NULL) {
603 			dest_desc->buffer += bytes_copied;
604 		}
605 		dest_desc->bytes_to_xfer -= bytes_copied;
606 
607 		if (src_desc->thread == NULL) {
608 
609 			/* Reading from the pipe buffer. Update details. */
610 
611 			pipe->bytes_used -= bytes_copied;
612 			pipe->read_index += bytes_copied;
613 			if (pipe->read_index >= pipe->size) {
614 				pipe->read_index -= pipe->size;
615 			}
616 		} else if (src_desc->bytes_to_xfer == 0U) {
617 
618 			/* The thread's write request has been satisfied. */
619 
620 			z_unpend_thread(src_desc->thread);
621 			z_ready_thread(src_desc->thread);
622 
623 			reschedule_needed = true;
624 		}
625 		src_desc = (struct _pipe_desc *)sys_dlist_get(&src_list);
626 	}
627 
628 	if (pipe->bytes_used != pipe->size) {
629 		sys_dlist_t         pipe_list;
630 
631 		/*
632 		 * The pipe is not full. If there are any waiting writers,
633 		 * refill the pipe.
634 		 */
635 
636 		sys_dlist_init(&src_list);
637 		sys_dlist_init(&pipe_list);
638 
639 		(void) pipe_waiter_list_populate(&src_list,
640 						 &pipe->wait_q.writers,
641 						 pipe->size - pipe->bytes_used);
642 
643 		(void) pipe_buffer_list_populate(&pipe_list, pipe_desc,
644 						 pipe->buffer, pipe->size,
645 						 pipe->write_index,
646 						 pipe->read_index);
647 
648 		(void) pipe_write(pipe, &src_list,
649 				  &pipe_list, &reschedule_needed);
650 	}
651 
652 	/*
653 	 * The immediate success conditions below are backwards
654 	 * compatible with an earlier pipe implementation.
655 	 */
656 
657 	if ((num_bytes_read == bytes_to_read) ||
658 	    (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) ||
659 	    ((num_bytes_read >= min_xfer) && (min_xfer > 0U))) {
660 
661 		/* The minimum amount of data has been copied */
662 
663 		*bytes_read = num_bytes_read;
664 		if (reschedule_needed) {
665 			z_reschedule(&pipe->lock, key);
666 		} else {
667 			k_spin_unlock(&pipe->lock, key);
668 		}
669 
670 		return 0;
671 	}
672 
673 	/* The minimum amount of data has not been copied. Block. */
674 
675 	SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, get, pipe, timeout);
676 
677 	arch_current_thread()->base.swap_data = dest_desc;
678 
679 	z_sched_wait(&pipe->lock, key, &pipe->wait_q.readers, timeout, NULL);
680 
681 	/*
682 	 * On SMP systems, threads in the processing list may timeout before
683 	 * the data has finished copying. The following spin lock/unlock pair
684 	 * prevents those threads from executing further until the data copying
685 	 * is complete.
686 	 */
687 
688 	key = k_spin_lock(&pipe->lock);
689 	k_spin_unlock(&pipe->lock, key);
690 
691 	*bytes_read = bytes_to_read - dest_desc->bytes_to_xfer;
692 
693 	int ret = pipe_return_code(min_xfer, dest_desc->bytes_to_xfer,
694 				   bytes_to_read);
695 
696 	return ret;
697 }
698 
z_impl_k_pipe_get(struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)699 int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
700 		     size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
701 {
702 	__ASSERT(((arch_is_in_isr() == false) ||
703 		  K_TIMEOUT_EQ(timeout, K_NO_WAIT)), "");
704 
705 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, get, pipe, timeout);
706 
707 	CHECKIF((min_xfer > bytes_to_read) || (bytes_read == NULL)) {
708 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe,
709 					       timeout, -EINVAL);
710 
711 		return -EINVAL;
712 	}
713 
714 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
715 
716 	int ret = pipe_get_internal(key, pipe, data, bytes_to_read, bytes_read,
717 				    min_xfer, timeout);
718 
719 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, ret);
720 
721 	return ret;
722 }
723 
724 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_get(struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)725 int z_vrfy_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
726 		      size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
727 {
728 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
729 	K_OOPS(K_SYSCALL_MEMORY_WRITE(bytes_read, sizeof(*bytes_read)));
730 	K_OOPS(K_SYSCALL_MEMORY_WRITE(data, bytes_to_read));
731 
732 	return z_impl_k_pipe_get(pipe, data,
733 				bytes_to_read, bytes_read, min_xfer,
734 				timeout);
735 }
736 #include <zephyr/syscalls/k_pipe_get_mrsh.c>
737 #endif /* CONFIG_USERSPACE */
738 
z_impl_k_pipe_read_avail(struct k_pipe * pipe)739 size_t z_impl_k_pipe_read_avail(struct k_pipe *pipe)
740 {
741 	size_t res;
742 	k_spinlock_key_t key;
743 
744 	/* Buffer and size are fixed. No need to spin. */
745 	if ((pipe->buffer == NULL) || (pipe->size == 0U)) {
746 		res = 0;
747 		goto out;
748 	}
749 
750 	key = k_spin_lock(&pipe->lock);
751 
752 	if (pipe->read_index == pipe->write_index) {
753 		res = pipe->bytes_used;
754 	} else if (pipe->read_index < pipe->write_index) {
755 		res = pipe->write_index - pipe->read_index;
756 	} else {
757 		res = pipe->size - (pipe->read_index - pipe->write_index);
758 	}
759 
760 	k_spin_unlock(&pipe->lock, key);
761 
762 out:
763 	return res;
764 }
765 
766 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_read_avail(struct k_pipe * pipe)767 size_t z_vrfy_k_pipe_read_avail(struct k_pipe *pipe)
768 {
769 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
770 
771 	return z_impl_k_pipe_read_avail(pipe);
772 }
773 #include <zephyr/syscalls/k_pipe_read_avail_mrsh.c>
774 #endif /* CONFIG_USERSPACE */
775 
z_impl_k_pipe_write_avail(struct k_pipe * pipe)776 size_t z_impl_k_pipe_write_avail(struct k_pipe *pipe)
777 {
778 	size_t res;
779 	k_spinlock_key_t key;
780 
781 	/* Buffer and size are fixed. No need to spin. */
782 	if ((pipe->buffer == NULL) || (pipe->size == 0U)) {
783 		res = 0;
784 		goto out;
785 	}
786 
787 	key = k_spin_lock(&pipe->lock);
788 
789 	if (pipe->write_index == pipe->read_index) {
790 		res = pipe->size - pipe->bytes_used;
791 	} else if (pipe->write_index < pipe->read_index) {
792 		res = pipe->read_index - pipe->write_index;
793 	} else {
794 		res = pipe->size - (pipe->write_index - pipe->read_index);
795 	}
796 
797 	k_spin_unlock(&pipe->lock, key);
798 
799 out:
800 	return res;
801 }
802 
803 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_write_avail(struct k_pipe * pipe)804 size_t z_vrfy_k_pipe_write_avail(struct k_pipe *pipe)
805 {
806 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
807 
808 	return z_impl_k_pipe_write_avail(pipe);
809 }
810 #include <zephyr/syscalls/k_pipe_write_avail_mrsh.c>
811 #endif /* CONFIG_USERSPACE */
812 
813 #ifdef CONFIG_OBJ_CORE_PIPE
init_pipe_obj_core_list(void)814 static int init_pipe_obj_core_list(void)
815 {
816 	/* Initialize pipe object type */
817 
818 	z_obj_type_init(&obj_type_pipe, K_OBJ_TYPE_PIPE_ID,
819 			offsetof(struct k_pipe, obj_core));
820 
821 	/* Initialize and link statically defined pipes */
822 
823 	STRUCT_SECTION_FOREACH(k_pipe, pipe) {
824 		k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
825 	}
826 
827 	return 0;
828 }
829 
830 SYS_INIT(init_pipe_obj_core_list, PRE_KERNEL_1,
831 	 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
832 #endif /* CONFIG_OBJ_CORE_PIPE */
833