1 /*
2  * Copyright (c) 2016 Wind River Systems, Inc.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 /**
8  * @file
9  *
10  * @brief Pipes
11  */
12 
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15 
16 #include <zephyr/toolchain.h>
17 #include <ksched.h>
18 #include <wait_q.h>
19 #include <zephyr/init.h>
20 #include <zephyr/internal/syscall_handler.h>
21 #include <kernel_internal.h>
22 #include <zephyr/sys/check.h>
23 
24 struct waitq_walk_data {
25 	sys_dlist_t *list;
26 	size_t       bytes_requested;
27 	size_t       bytes_available;
28 };
29 
30 static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe,
31 			     void *data, size_t bytes_to_read,
32 			     size_t *bytes_read, size_t min_xfer,
33 			     k_timeout_t timeout);
34 #ifdef CONFIG_OBJ_CORE_PIPE
35 static struct k_obj_type obj_type_pipe;
36 #endif /* CONFIG_OBJ_CORE_PIPE */
37 
38 
z_impl_k_pipe_init(struct k_pipe * pipe,unsigned char * buffer,size_t size)39 void z_impl_k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)
40 {
41 	pipe->buffer = buffer;
42 	pipe->size = size;
43 	pipe->bytes_used = 0U;
44 	pipe->read_index = 0U;
45 	pipe->write_index = 0U;
46 	pipe->lock = (struct k_spinlock){};
47 	z_waitq_init(&pipe->wait_q.writers);
48 	z_waitq_init(&pipe->wait_q.readers);
49 	SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe, buffer, size);
50 
51 	pipe->flags = 0;
52 
53 #if defined(CONFIG_POLL)
54 	sys_dlist_init(&pipe->poll_events);
55 #endif /* CONFIG_POLL */
56 	k_object_init(pipe);
57 
58 #ifdef CONFIG_OBJ_CORE_PIPE
59 	k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
60 #endif /* CONFIG_OBJ_CORE_PIPE */
61 }
62 
z_impl_k_pipe_alloc_init(struct k_pipe * pipe,size_t size)63 int z_impl_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
64 {
65 	void *buffer;
66 	int ret;
67 
68 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, alloc_init, pipe);
69 
70 	if (size != 0U) {
71 		buffer = z_thread_malloc(size);
72 		if (buffer != NULL) {
73 			k_pipe_init(pipe, buffer, size);
74 			pipe->flags = K_PIPE_FLAG_ALLOC;
75 			ret = 0;
76 		} else {
77 			ret = -ENOMEM;
78 		}
79 	} else {
80 		k_pipe_init(pipe, NULL, 0U);
81 		ret = 0;
82 	}
83 
84 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, alloc_init, pipe, ret);
85 
86 	return ret;
87 }
88 
89 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_init(struct k_pipe * pipe,unsigned char * buffer,size_t size)90 static inline void z_vrfy_k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)
91 {
92 	K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(pipe, K_OBJ_PIPE));
93 	K_OOPS(K_SYSCALL_MEMORY_WRITE(buffer, size));
94 
95 	z_impl_k_pipe_init(pipe, buffer, size);
96 }
97 #include <zephyr/syscalls/k_pipe_init_mrsh.c>
98 
z_vrfy_k_pipe_alloc_init(struct k_pipe * pipe,size_t size)99 static inline int z_vrfy_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
100 {
101 	K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(pipe, K_OBJ_PIPE));
102 
103 	return z_impl_k_pipe_alloc_init(pipe, size);
104 }
105 #include <zephyr/syscalls/k_pipe_alloc_init_mrsh.c>
106 #endif /* CONFIG_USERSPACE */
107 
handle_poll_events(struct k_pipe * pipe)108 static inline bool handle_poll_events(struct k_pipe *pipe)
109 {
110 #ifdef CONFIG_POLL
111 	return z_handle_obj_poll_events(&pipe->poll_events, K_POLL_STATE_PIPE_DATA_AVAILABLE);
112 #else
113 	ARG_UNUSED(pipe);
114 	return false;
115 #endif /* CONFIG_POLL */
116 }
117 
z_impl_k_pipe_flush(struct k_pipe * pipe)118 void z_impl_k_pipe_flush(struct k_pipe *pipe)
119 {
120 	size_t  bytes_read;
121 
122 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, flush, pipe);
123 
124 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
125 
126 	(void) pipe_get_internal(key, pipe, NULL, (size_t) -1, &bytes_read, 0U,
127 				 K_NO_WAIT);
128 
129 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, flush, pipe);
130 }
131 
132 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_flush(struct k_pipe * pipe)133 void z_vrfy_k_pipe_flush(struct k_pipe *pipe)
134 {
135 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
136 
137 	z_impl_k_pipe_flush(pipe);
138 }
139 #include <zephyr/syscalls/k_pipe_flush_mrsh.c>
140 #endif /* CONFIG_USERSPACE */
141 
z_impl_k_pipe_buffer_flush(struct k_pipe * pipe)142 void z_impl_k_pipe_buffer_flush(struct k_pipe *pipe)
143 {
144 	size_t  bytes_read;
145 
146 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, buffer_flush, pipe);
147 
148 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
149 
150 	if (pipe->buffer != NULL) {
151 		(void) pipe_get_internal(key, pipe, NULL, pipe->size,
152 					 &bytes_read, 0U, K_NO_WAIT);
153 	} else {
154 		k_spin_unlock(&pipe->lock, key);
155 	}
156 
157 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, buffer_flush, pipe);
158 }
159 
160 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_buffer_flush(struct k_pipe * pipe)161 void z_vrfy_k_pipe_buffer_flush(struct k_pipe *pipe)
162 {
163 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
164 
165 	z_impl_k_pipe_buffer_flush(pipe);
166 }
167 #endif /* CONFIG_USERSPACE */
168 
k_pipe_cleanup(struct k_pipe * pipe)169 int k_pipe_cleanup(struct k_pipe *pipe)
170 {
171 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, cleanup, pipe);
172 
173 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
174 
175 	CHECKIF((z_waitq_head(&pipe->wait_q.readers) != NULL) ||
176 			(z_waitq_head(&pipe->wait_q.writers) != NULL)) {
177 		k_spin_unlock(&pipe->lock, key);
178 
179 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, -EAGAIN);
180 
181 		return -EAGAIN;
182 	}
183 
184 	if ((pipe->flags & K_PIPE_FLAG_ALLOC) != 0U) {
185 		k_free(pipe->buffer);
186 		pipe->buffer = NULL;
187 
188 		/*
189 		 * Freeing the buffer changes the pipe into a bufferless
190 		 * pipe. Reset the pipe's counters to prevent malfunction.
191 		 */
192 
193 		pipe->size = 0U;
194 		pipe->bytes_used = 0U;
195 		pipe->read_index = 0U;
196 		pipe->write_index = 0U;
197 		pipe->flags &= ~K_PIPE_FLAG_ALLOC;
198 	}
199 
200 	k_spin_unlock(&pipe->lock, key);
201 
202 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, 0U);
203 
204 	return 0;
205 }
206 
207 /**
208  * @brief Copy bytes from @a src to @a dest
209  *
210  * @return Number of bytes copied
211  */
pipe_xfer(unsigned char * dest,size_t dest_size,const unsigned char * src,size_t src_size)212 static size_t pipe_xfer(unsigned char *dest, size_t dest_size,
213 			 const unsigned char *src, size_t src_size)
214 {
215 	size_t num_bytes = MIN(dest_size, src_size);
216 
217 	if (dest == NULL) {
218 		/* Data is being flushed. Pretend the data was copied. */
219 		return num_bytes;
220 	}
221 
222 	(void) memcpy(dest, src, num_bytes);
223 
224 	return num_bytes;
225 }
226 
227 /**
228  * @brief Callback routine used to populate wait list
229  *
230  * @return 1 to stop further walking; 0 to continue walking
231  */
pipe_walk_op(struct k_thread * thread,void * data)232 static int pipe_walk_op(struct k_thread *thread, void *data)
233 {
234 	struct waitq_walk_data *walk_data = data;
235 	struct _pipe_desc *desc = (struct _pipe_desc *)thread->base.swap_data;
236 
237 	sys_dlist_append(walk_data->list, &desc->node);
238 
239 	walk_data->bytes_available += desc->bytes_to_xfer;
240 
241 	if (walk_data->bytes_available >= walk_data->bytes_requested) {
242 		return 1;
243 	}
244 
245 	return 0;
246 }
247 
248 /**
249  * @brief Popluate pipe descriptors for copying to/from waiters' buffers
250  *
251  * This routine cycles through the waiters on the wait queue and creates
252  * a list of threads that will have data directly copied to / read from
253  * their buffers. This list helps us avoid double copying later.
254  *
255  * @return # of bytes available for direct copying
256  */
pipe_waiter_list_populate(sys_dlist_t * list,_wait_q_t * wait_q,size_t bytes_to_xfer)257 static size_t pipe_waiter_list_populate(sys_dlist_t  *list,
258 					_wait_q_t    *wait_q,
259 					size_t        bytes_to_xfer)
260 {
261 	struct waitq_walk_data walk_data;
262 
263 	walk_data.list            = list;
264 	walk_data.bytes_requested = bytes_to_xfer;
265 	walk_data.bytes_available = 0;
266 
267 	(void) z_sched_waitq_walk(wait_q, pipe_walk_op, &walk_data);
268 
269 	return walk_data.bytes_available;
270 }
271 
272 /**
273  * @brief Populate pipe descriptors for copying to/from pipe buffer
274  *
275  * This routine is only called if the pipe buffer is not empty (when reading),
276  * or if not full (when writing).
277  */
pipe_buffer_list_populate(sys_dlist_t * list,struct _pipe_desc * desc,unsigned char * buffer,size_t size,size_t start,size_t end)278 static size_t pipe_buffer_list_populate(sys_dlist_t         *list,
279 					struct _pipe_desc   *desc,
280 					unsigned char       *buffer,
281 					size_t               size,
282 					size_t               start,
283 					size_t               end)
284 {
285 	sys_dlist_append(list, &desc[0].node);
286 
287 	desc[0].thread = NULL;
288 	desc[0].buffer = &buffer[start];
289 
290 	if (start < end) {
291 		desc[0].bytes_to_xfer = end - start;
292 		return end - start;
293 	}
294 
295 	desc[0].bytes_to_xfer = size - start;
296 
297 	desc[1].thread = NULL;
298 	desc[1].buffer = &buffer[0];
299 	desc[1].bytes_to_xfer = end;
300 
301 	sys_dlist_append(list, &desc[1].node);
302 
303 	return size - start + end;
304 }
305 
306 /**
307  * @brief Determine the correct return code
308  *
309  * Bytes Xferred   No Wait   Wait
310  *   >= Minimum       0       0
311  *    < Minimum      -EIO*   -EAGAIN
312  *
313  * * The "-EIO No Wait" case was already checked after the list of pipe
314  *   descriptors was created.
315  *
316  * @return See table above
317  */
pipe_return_code(size_t min_xfer,size_t bytes_remaining,size_t bytes_requested)318 static int pipe_return_code(size_t min_xfer, size_t bytes_remaining,
319 			     size_t bytes_requested)
320 {
321 	if ((bytes_requested - bytes_remaining) >= min_xfer) {
322 		/*
323 		 * At least the minimum number of requested
324 		 * bytes have been transferred.
325 		 */
326 		return 0;
327 	}
328 
329 	return -EAGAIN;
330 }
331 
332 /**
333  * @brief Copy data from source(s) to destination(s)
334  */
335 
pipe_write(struct k_pipe * pipe,sys_dlist_t * src_list,sys_dlist_t * dest_list,bool * reschedule)336 static size_t pipe_write(struct k_pipe *pipe, sys_dlist_t *src_list,
337 			 sys_dlist_t *dest_list, bool *reschedule)
338 {
339 	struct _pipe_desc *src;
340 	struct _pipe_desc *dest;
341 	size_t  bytes_copied;
342 	size_t  num_bytes_written = 0U;
343 
344 	src = (struct _pipe_desc *)sys_dlist_get(src_list);
345 	dest = (struct _pipe_desc *)sys_dlist_get(dest_list);
346 
347 	while ((src != NULL) && (dest != NULL)) {
348 		bytes_copied = pipe_xfer(dest->buffer, dest->bytes_to_xfer,
349 					 src->buffer, src->bytes_to_xfer);
350 
351 		num_bytes_written   += bytes_copied;
352 
353 		dest->buffer        += bytes_copied;
354 		dest->bytes_to_xfer -= bytes_copied;
355 
356 		src->buffer         += bytes_copied;
357 		src->bytes_to_xfer  -= bytes_copied;
358 
359 		if (dest->thread == NULL) {
360 
361 			/* Writing to the pipe buffer. Update details. */
362 
363 			pipe->bytes_used += bytes_copied;
364 			pipe->write_index += bytes_copied;
365 			if (pipe->write_index >= pipe->size) {
366 				pipe->write_index -= pipe->size;
367 			}
368 		} else if (dest->bytes_to_xfer == 0U) {
369 
370 			/* The thread's read request has been satisfied. */
371 
372 			z_unpend_thread(dest->thread);
373 			z_ready_thread(dest->thread);
374 
375 			*reschedule = true;
376 		}
377 
378 		if (src->bytes_to_xfer == 0U) {
379 			src = (struct _pipe_desc *)sys_dlist_get(src_list);
380 		}
381 
382 		if (dest->bytes_to_xfer == 0U) {
383 			dest = (struct _pipe_desc *)sys_dlist_get(dest_list);
384 		}
385 	}
386 
387 	return num_bytes_written;
388 }
389 
z_impl_k_pipe_put(struct k_pipe * pipe,const void * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)390 int z_impl_k_pipe_put(struct k_pipe *pipe, const void *data,
391 		      size_t bytes_to_write, size_t *bytes_written,
392 		      size_t min_xfer, k_timeout_t timeout)
393 {
394 	struct _pipe_desc  pipe_desc[2];
395 	struct _pipe_desc  isr_desc;
396 	struct _pipe_desc *src_desc;
397 	sys_dlist_t        dest_list;
398 	sys_dlist_t        src_list;
399 	size_t             bytes_can_write;
400 	bool               reschedule_needed = false;
401 
402 	__ASSERT(((arch_is_in_isr() == false) ||
403 		  K_TIMEOUT_EQ(timeout, K_NO_WAIT)), "");
404 
405 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, put, pipe, timeout);
406 
407 	CHECKIF((min_xfer > bytes_to_write) || (bytes_written == NULL)) {
408 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout,
409 					       -EINVAL);
410 
411 		return -EINVAL;
412 	}
413 
414 	sys_dlist_init(&src_list);
415 	sys_dlist_init(&dest_list);
416 
417 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
418 
419 	/*
420 	 * First, write to any waiting readers, if any exist.
421 	 * Second, write to the pipe buffer, if it exists.
422 	 */
423 
424 	bytes_can_write = pipe_waiter_list_populate(&dest_list,
425 						    &pipe->wait_q.readers,
426 						    bytes_to_write);
427 
428 	if (pipe->bytes_used != pipe->size) {
429 		bytes_can_write += pipe_buffer_list_populate(&dest_list,
430 							     pipe_desc,
431 							     pipe->buffer,
432 							     pipe->size,
433 							     pipe->write_index,
434 							     pipe->read_index);
435 	}
436 
437 	if ((bytes_can_write < min_xfer) &&
438 	    (K_TIMEOUT_EQ(timeout, K_NO_WAIT))) {
439 
440 		/* The request can not be fulfilled. */
441 
442 		k_spin_unlock(&pipe->lock, key);
443 		*bytes_written = 0U;
444 
445 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe,
446 					       timeout, -EIO);
447 
448 		return -EIO;
449 	}
450 
451 	/*
452 	 * Do not use the pipe descriptor stored within k_thread if
453 	 * invoked from within an ISR as that is not safe to do.
454 	 */
455 
456 	src_desc = k_is_in_isr() ? &isr_desc : &_current->pipe_desc;
457 
458 	src_desc->buffer        = (unsigned char *)data;
459 	src_desc->bytes_to_xfer = bytes_to_write;
460 	src_desc->thread        = _current;
461 	sys_dlist_append(&src_list, &src_desc->node);
462 
463 	*bytes_written = pipe_write(pipe, &src_list,
464 				    &dest_list, &reschedule_needed);
465 
466 	/*
467 	 * Only handle poll events if the pipe has had some bytes written and
468 	 * there are bytes remaining after any pending readers have read from it
469 	 */
470 
471 	if ((pipe->bytes_used != 0U) && (*bytes_written != 0U)) {
472 		reschedule_needed = handle_poll_events(pipe) || reschedule_needed;
473 	}
474 
475 	/*
476 	 * The immediate success conditions below are backwards
477 	 * compatible with an earlier pipe implementation.
478 	 */
479 
480 	if ((*bytes_written == bytes_to_write) ||
481 	    (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) ||
482 	    ((*bytes_written >= min_xfer) && (min_xfer > 0U))) {
483 
484 		/* The minimum amount of data has been copied */
485 
486 		if (reschedule_needed) {
487 			z_reschedule(&pipe->lock, key);
488 		} else {
489 			k_spin_unlock(&pipe->lock, key);
490 		}
491 
492 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
493 
494 		return 0;
495 	}
496 
497 	/* The minimum amount of data has not been copied. Block. */
498 
499 	SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, put, pipe, timeout);
500 
501 	_current->base.swap_data = src_desc;
502 
503 	z_sched_wait(&pipe->lock, key, &pipe->wait_q.writers, timeout, NULL);
504 
505 	/*
506 	 * On SMP systems, threads in the processing list may timeout before
507 	 * the data has finished copying. The following spin lock/unlock pair
508 	 * prevents those threads from executing further until the data copying
509 	 * is complete.
510 	 */
511 
512 	key = k_spin_lock(&pipe->lock);
513 	k_spin_unlock(&pipe->lock, key);
514 
515 	*bytes_written = bytes_to_write - src_desc->bytes_to_xfer;
516 
517 	int ret = pipe_return_code(min_xfer, src_desc->bytes_to_xfer,
518 				   bytes_to_write);
519 
520 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, ret);
521 
522 	return ret;
523 }
524 
525 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_put(struct k_pipe * pipe,const void * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)526 int z_vrfy_k_pipe_put(struct k_pipe *pipe, const void *data,
527 		      size_t bytes_to_write, size_t *bytes_written,
528 		      size_t min_xfer, k_timeout_t timeout)
529 {
530 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
531 	K_OOPS(K_SYSCALL_MEMORY_WRITE(bytes_written, sizeof(*bytes_written)));
532 	K_OOPS(K_SYSCALL_MEMORY_READ(data, bytes_to_write));
533 
534 	return z_impl_k_pipe_put(pipe, data,
535 				 bytes_to_write, bytes_written, min_xfer,
536 				 timeout);
537 }
538 #include <zephyr/syscalls/k_pipe_put_mrsh.c>
539 #endif /* CONFIG_USERSPACE */
540 
pipe_get_internal(k_spinlock_key_t key,struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)541 static int pipe_get_internal(k_spinlock_key_t key, struct k_pipe *pipe,
542 			     void *data, size_t bytes_to_read,
543 			     size_t *bytes_read, size_t min_xfer,
544 			     k_timeout_t timeout)
545 {
546 	sys_dlist_t         src_list;
547 	struct _pipe_desc   pipe_desc[2];
548 	struct _pipe_desc   isr_desc;
549 	struct _pipe_desc  *dest_desc;
550 	struct _pipe_desc  *src_desc;
551 	size_t         num_bytes_read = 0U;
552 	size_t         bytes_copied;
553 	size_t         bytes_can_read = 0U;
554 	bool           reschedule_needed = false;
555 
556 	/*
557 	 * Data copying takes place in the following order.
558 	 * 1. Copy data from the pipe buffer to the receive buffer.
559 	 * 2. Copy data from the waiting writer(s) to the receive buffer.
560 	 * 3. Refill the pipe buffer from the waiting writer(s).
561 	 */
562 
563 	sys_dlist_init(&src_list);
564 
565 	if (pipe->bytes_used != 0) {
566 		bytes_can_read = pipe_buffer_list_populate(&src_list,
567 							   pipe_desc,
568 							   pipe->buffer,
569 							   pipe->size,
570 							   pipe->read_index,
571 							   pipe->write_index);
572 	}
573 
574 	bytes_can_read += pipe_waiter_list_populate(&src_list,
575 						    &pipe->wait_q.writers,
576 						    bytes_to_read);
577 
578 	if ((bytes_can_read < min_xfer) &&
579 	    (K_TIMEOUT_EQ(timeout, K_NO_WAIT))) {
580 
581 		/* The request can not be fulfilled. */
582 
583 		k_spin_unlock(&pipe->lock, key);
584 		*bytes_read = 0;
585 
586 		return -EIO;
587 	}
588 
589 	/*
590 	 * Do not use the pipe descriptor stored within k_thread if
591 	 * invoked from within an ISR as that is not safe to do.
592 	 */
593 
594 	dest_desc = k_is_in_isr() ? &isr_desc : &_current->pipe_desc;
595 
596 	dest_desc->buffer = data;
597 	dest_desc->bytes_to_xfer = bytes_to_read;
598 	dest_desc->thread = _current;
599 
600 	src_desc = (struct _pipe_desc *)sys_dlist_get(&src_list);
601 	while (src_desc != NULL) {
602 		bytes_copied = pipe_xfer(dest_desc->buffer,
603 					  dest_desc->bytes_to_xfer,
604 					  src_desc->buffer,
605 					  src_desc->bytes_to_xfer);
606 
607 		num_bytes_read += bytes_copied;
608 
609 		src_desc->buffer += bytes_copied;
610 		src_desc->bytes_to_xfer -= bytes_copied;
611 
612 		if (dest_desc->buffer != NULL) {
613 			dest_desc->buffer += bytes_copied;
614 		}
615 		dest_desc->bytes_to_xfer -= bytes_copied;
616 
617 		if (src_desc->thread == NULL) {
618 
619 			/* Reading from the pipe buffer. Update details. */
620 
621 			pipe->bytes_used -= bytes_copied;
622 			pipe->read_index += bytes_copied;
623 			if (pipe->read_index >= pipe->size) {
624 				pipe->read_index -= pipe->size;
625 			}
626 		} else if (src_desc->bytes_to_xfer == 0U) {
627 
628 			/* The thread's write request has been satisfied. */
629 
630 			z_unpend_thread(src_desc->thread);
631 			z_ready_thread(src_desc->thread);
632 
633 			reschedule_needed = true;
634 		}
635 		src_desc = (struct _pipe_desc *)sys_dlist_get(&src_list);
636 	}
637 
638 	if (pipe->bytes_used != pipe->size) {
639 		sys_dlist_t         pipe_list;
640 
641 		/*
642 		 * The pipe is not full. If there are any waiting writers,
643 		 * refill the pipe.
644 		 */
645 
646 		sys_dlist_init(&src_list);
647 		sys_dlist_init(&pipe_list);
648 
649 		(void) pipe_waiter_list_populate(&src_list,
650 						 &pipe->wait_q.writers,
651 						 pipe->size - pipe->bytes_used);
652 
653 		(void) pipe_buffer_list_populate(&pipe_list, pipe_desc,
654 						 pipe->buffer, pipe->size,
655 						 pipe->write_index,
656 						 pipe->read_index);
657 
658 		(void) pipe_write(pipe, &src_list,
659 				  &pipe_list, &reschedule_needed);
660 	}
661 
662 	/*
663 	 * The immediate success conditions below are backwards
664 	 * compatible with an earlier pipe implementation.
665 	 */
666 
667 	if ((num_bytes_read == bytes_to_read) ||
668 	    (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) ||
669 	    ((num_bytes_read >= min_xfer) && (min_xfer > 0U))) {
670 
671 		/* The minimum amount of data has been copied */
672 
673 		*bytes_read = num_bytes_read;
674 		if (reschedule_needed) {
675 			z_reschedule(&pipe->lock, key);
676 		} else {
677 			k_spin_unlock(&pipe->lock, key);
678 		}
679 
680 		return 0;
681 	}
682 
683 	/* The minimum amount of data has not been copied. Block. */
684 
685 	SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, get, pipe, timeout);
686 
687 	_current->base.swap_data = dest_desc;
688 
689 	z_sched_wait(&pipe->lock, key, &pipe->wait_q.readers, timeout, NULL);
690 
691 	/*
692 	 * On SMP systems, threads in the processing list may timeout before
693 	 * the data has finished copying. The following spin lock/unlock pair
694 	 * prevents those threads from executing further until the data copying
695 	 * is complete.
696 	 */
697 
698 	key = k_spin_lock(&pipe->lock);
699 	k_spin_unlock(&pipe->lock, key);
700 
701 	*bytes_read = bytes_to_read - dest_desc->bytes_to_xfer;
702 
703 	int ret = pipe_return_code(min_xfer, dest_desc->bytes_to_xfer,
704 				   bytes_to_read);
705 
706 	return ret;
707 }
708 
z_impl_k_pipe_get(struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)709 int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
710 		     size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
711 {
712 	__ASSERT(((arch_is_in_isr() == false) ||
713 		  K_TIMEOUT_EQ(timeout, K_NO_WAIT)), "");
714 
715 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, get, pipe, timeout);
716 
717 	CHECKIF((min_xfer > bytes_to_read) || (bytes_read == NULL)) {
718 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe,
719 					       timeout, -EINVAL);
720 
721 		return -EINVAL;
722 	}
723 
724 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
725 
726 	int ret = pipe_get_internal(key, pipe, data, bytes_to_read, bytes_read,
727 				    min_xfer, timeout);
728 
729 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, ret);
730 
731 	return ret;
732 }
733 
734 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_get(struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)735 int z_vrfy_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
736 		      size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
737 {
738 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
739 	K_OOPS(K_SYSCALL_MEMORY_WRITE(bytes_read, sizeof(*bytes_read)));
740 	K_OOPS(K_SYSCALL_MEMORY_WRITE(data, bytes_to_read));
741 
742 	return z_impl_k_pipe_get(pipe, data,
743 				bytes_to_read, bytes_read, min_xfer,
744 				timeout);
745 }
746 #include <zephyr/syscalls/k_pipe_get_mrsh.c>
747 #endif /* CONFIG_USERSPACE */
748 
z_impl_k_pipe_read_avail(struct k_pipe * pipe)749 size_t z_impl_k_pipe_read_avail(struct k_pipe *pipe)
750 {
751 	size_t res;
752 	k_spinlock_key_t key;
753 
754 	/* Buffer and size are fixed. No need to spin. */
755 	if ((pipe->buffer == NULL) || (pipe->size == 0U)) {
756 		res = 0;
757 		goto out;
758 	}
759 
760 	key = k_spin_lock(&pipe->lock);
761 
762 	if (pipe->read_index == pipe->write_index) {
763 		res = pipe->bytes_used;
764 	} else if (pipe->read_index < pipe->write_index) {
765 		res = pipe->write_index - pipe->read_index;
766 	} else {
767 		res = pipe->size - (pipe->read_index - pipe->write_index);
768 	}
769 
770 	k_spin_unlock(&pipe->lock, key);
771 
772 out:
773 	return res;
774 }
775 
776 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_read_avail(struct k_pipe * pipe)777 size_t z_vrfy_k_pipe_read_avail(struct k_pipe *pipe)
778 {
779 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
780 
781 	return z_impl_k_pipe_read_avail(pipe);
782 }
783 #include <zephyr/syscalls/k_pipe_read_avail_mrsh.c>
784 #endif /* CONFIG_USERSPACE */
785 
z_impl_k_pipe_write_avail(struct k_pipe * pipe)786 size_t z_impl_k_pipe_write_avail(struct k_pipe *pipe)
787 {
788 	size_t res;
789 	k_spinlock_key_t key;
790 
791 	/* Buffer and size are fixed. No need to spin. */
792 	if ((pipe->buffer == NULL) || (pipe->size == 0U)) {
793 		res = 0;
794 		goto out;
795 	}
796 
797 	key = k_spin_lock(&pipe->lock);
798 
799 	if (pipe->write_index == pipe->read_index) {
800 		res = pipe->size - pipe->bytes_used;
801 	} else if (pipe->write_index < pipe->read_index) {
802 		res = pipe->read_index - pipe->write_index;
803 	} else {
804 		res = pipe->size - (pipe->write_index - pipe->read_index);
805 	}
806 
807 	k_spin_unlock(&pipe->lock, key);
808 
809 out:
810 	return res;
811 }
812 
813 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_write_avail(struct k_pipe * pipe)814 size_t z_vrfy_k_pipe_write_avail(struct k_pipe *pipe)
815 {
816 	K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
817 
818 	return z_impl_k_pipe_write_avail(pipe);
819 }
820 #include <zephyr/syscalls/k_pipe_write_avail_mrsh.c>
821 #endif /* CONFIG_USERSPACE */
822 
823 #ifdef CONFIG_OBJ_CORE_PIPE
init_pipe_obj_core_list(void)824 static int init_pipe_obj_core_list(void)
825 {
826 	/* Initialize pipe object type */
827 
828 	z_obj_type_init(&obj_type_pipe, K_OBJ_TYPE_PIPE_ID,
829 			offsetof(struct k_pipe, obj_core));
830 
831 	/* Initialize and link statically defined pipes */
832 
833 	STRUCT_SECTION_FOREACH(k_pipe, pipe) {
834 		k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe);
835 	}
836 
837 	return 0;
838 }
839 
840 SYS_INIT(init_pipe_obj_core_list, PRE_KERNEL_1,
841 	 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
842 #endif /* CONFIG_OBJ_CORE_PIPE */
843