1 /*
2  * Copyright (c) 2016 Wind River Systems, Inc.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 /**
8  * @file
9  *
10  * @brief Pipes
11  */
12 
13 #include <kernel.h>
14 #include <kernel_structs.h>
15 
16 #include <toolchain.h>
17 #include <ksched.h>
18 #include <wait_q.h>
19 #include <init.h>
20 #include <syscall_handler.h>
21 #include <kernel_internal.h>
22 #include <sys/check.h>
23 
24 struct k_pipe_desc {
25 	unsigned char *buffer;           /* Position in src/dest buffer */
26 	size_t bytes_to_xfer;            /* # bytes left to transfer */
27 #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
28 	struct k_mem_block *block;       /* Pointer to memory block */
29 	struct k_mem_block  copy_block;  /* For backwards compatibility */
30 	struct k_sem *sem;               /* Semaphore to give if async */
31 #endif
32 };
33 
34 struct k_pipe_async {
35 	struct _thread_base thread;   /* Dummy thread object */
36 	struct k_pipe_desc  desc;     /* Pipe message descriptor */
37 };
38 
39 #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
40 /* stack of unused asynchronous message descriptors */
41 K_STACK_DEFINE(pipe_async_msgs, CONFIG_NUM_PIPE_ASYNC_MSGS);
42 #endif /* CONFIG_NUM_PIPE_ASYNC_MSGS > 0 */
43 
44 #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
45 
46 /*
47  * Do run-time initialization of pipe object subsystem.
48  */
init_pipes_module(const struct device * dev)49 static int init_pipes_module(const struct device *dev)
50 {
51 	ARG_UNUSED(dev);
52 
53 	/* Array of asynchronous message descriptors */
54 	static struct k_pipe_async __noinit async_msg[CONFIG_NUM_PIPE_ASYNC_MSGS];
55 
56 #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
57 	/*
58 	 * Create pool of asynchronous pipe message descriptors.
59 	 *
60 	 * A dummy thread requires minimal initialization, since it never gets
61 	 * to execute. The _THREAD_DUMMY flag is sufficient to distinguish a
62 	 * dummy thread from a real one. The threads are *not* added to the
63 	 * kernel's list of known threads.
64 	 *
65 	 * Once initialized, the address of each descriptor is added to a stack
66 	 * that governs access to them.
67 	 */
68 
69 	for (int i = 0; i < CONFIG_NUM_PIPE_ASYNC_MSGS; i++) {
70 		async_msg[i].thread.thread_state = _THREAD_DUMMY;
71 		async_msg[i].thread.swap_data = &async_msg[i].desc;
72 
73 		z_init_thread_timeout(&async_msg[i].thread);
74 
75 		k_stack_push(&pipe_async_msgs, (stack_data_t)&async_msg[i]);
76 	}
77 #endif /* CONFIG_NUM_PIPE_ASYNC_MSGS > 0 */
78 
79 	/* Complete initialization of statically defined mailboxes. */
80 
81 	return 0;
82 }
83 
84 SYS_INIT(init_pipes_module, PRE_KERNEL_1, CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
85 
86 #endif /* CONFIG_NUM_PIPE_ASYNC_MSGS */
87 
k_pipe_init(struct k_pipe * pipe,unsigned char * buffer,size_t size)88 void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)
89 {
90 	pipe->buffer = buffer;
91 	pipe->size = size;
92 	pipe->bytes_used = 0;
93 	pipe->read_index = 0;
94 	pipe->write_index = 0;
95 	pipe->lock = (struct k_spinlock){};
96 	z_waitq_init(&pipe->wait_q.writers);
97 	z_waitq_init(&pipe->wait_q.readers);
98 	SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe);
99 
100 	pipe->flags = 0;
101 	z_object_init(pipe);
102 }
103 
z_impl_k_pipe_alloc_init(struct k_pipe * pipe,size_t size)104 int z_impl_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
105 {
106 	void *buffer;
107 	int ret;
108 
109 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, alloc_init, pipe);
110 
111 	if (size != 0U) {
112 		buffer = z_thread_malloc(size);
113 		if (buffer != NULL) {
114 			k_pipe_init(pipe, buffer, size);
115 			pipe->flags = K_PIPE_FLAG_ALLOC;
116 			ret = 0;
117 		} else {
118 			ret = -ENOMEM;
119 		}
120 	} else {
121 		k_pipe_init(pipe, NULL, 0);
122 		ret = 0;
123 	}
124 
125 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, alloc_init, pipe, ret);
126 
127 	return ret;
128 }
129 
130 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_alloc_init(struct k_pipe * pipe,size_t size)131 static inline int z_vrfy_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
132 {
133 	Z_OOPS(Z_SYSCALL_OBJ_NEVER_INIT(pipe, K_OBJ_PIPE));
134 
135 	return z_impl_k_pipe_alloc_init(pipe, size);
136 }
137 #include <syscalls/k_pipe_alloc_init_mrsh.c>
138 #endif
139 
k_pipe_cleanup(struct k_pipe * pipe)140 int k_pipe_cleanup(struct k_pipe *pipe)
141 {
142 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, cleanup, pipe);
143 
144 	CHECKIF(z_waitq_head(&pipe->wait_q.readers) != NULL ||
145 			z_waitq_head(&pipe->wait_q.writers) != NULL) {
146 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, -EAGAIN);
147 
148 		return -EAGAIN;
149 	}
150 
151 	if ((pipe->flags & K_PIPE_FLAG_ALLOC) != 0U) {
152 		k_free(pipe->buffer);
153 		pipe->buffer = NULL;
154 		pipe->flags &= ~K_PIPE_FLAG_ALLOC;
155 	}
156 
157 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, 0);
158 
159 	return 0;
160 }
161 
162 /**
163  * @brief Copy bytes from @a src to @a dest
164  *
165  * @return Number of bytes copied
166  */
pipe_xfer(unsigned char * dest,size_t dest_size,const unsigned char * src,size_t src_size)167 static size_t pipe_xfer(unsigned char *dest, size_t dest_size,
168 			 const unsigned char *src, size_t src_size)
169 {
170 	size_t num_bytes = MIN(dest_size, src_size);
171 	const unsigned char *end = src + num_bytes;
172 
173 	while (src != end) {
174 		*dest = *src;
175 		dest++;
176 		src++;
177 	}
178 
179 	return num_bytes;
180 }
181 
182 /**
183  * @brief Put data from @a src into the pipe's circular buffer
184  *
185  * Modifies the following fields in @a pipe:
186  *        buffer, bytes_used, write_index
187  *
188  * @return Number of bytes written to the pipe's circular buffer
189  */
pipe_buffer_put(struct k_pipe * pipe,const unsigned char * src,size_t src_size)190 static size_t pipe_buffer_put(struct k_pipe *pipe,
191 			       const unsigned char *src, size_t src_size)
192 {
193 	size_t  bytes_copied;
194 	size_t  run_length;
195 	size_t  num_bytes_written = 0;
196 	int     i;
197 
198 
199 	for (i = 0; i < 2; i++) {
200 		run_length = MIN(pipe->size - pipe->bytes_used,
201 				 pipe->size - pipe->write_index);
202 
203 		bytes_copied = pipe_xfer(pipe->buffer + pipe->write_index,
204 					  run_length,
205 					  src + num_bytes_written,
206 					  src_size - num_bytes_written);
207 
208 		num_bytes_written += bytes_copied;
209 		pipe->bytes_used += bytes_copied;
210 		pipe->write_index += bytes_copied;
211 		if (pipe->write_index == pipe->size) {
212 			pipe->write_index = 0;
213 		}
214 	}
215 
216 	return num_bytes_written;
217 }
218 
219 /**
220  * @brief Get data from the pipe's circular buffer
221  *
222  * Modifies the following fields in @a pipe:
223  *        bytes_used, read_index
224  *
225  * @return Number of bytes read from the pipe's circular buffer
226  */
pipe_buffer_get(struct k_pipe * pipe,unsigned char * dest,size_t dest_size)227 static size_t pipe_buffer_get(struct k_pipe *pipe,
228 			       unsigned char *dest, size_t dest_size)
229 {
230 	size_t  bytes_copied;
231 	size_t  run_length;
232 	size_t  num_bytes_read = 0;
233 	int     i;
234 
235 	for (i = 0; i < 2; i++) {
236 		run_length = MIN(pipe->bytes_used,
237 				 pipe->size - pipe->read_index);
238 
239 		bytes_copied = pipe_xfer(dest + num_bytes_read,
240 					  dest_size - num_bytes_read,
241 					  pipe->buffer + pipe->read_index,
242 					  run_length);
243 
244 		num_bytes_read += bytes_copied;
245 		pipe->bytes_used -= bytes_copied;
246 		pipe->read_index += bytes_copied;
247 		if (pipe->read_index == pipe->size) {
248 			pipe->read_index = 0;
249 		}
250 	}
251 
252 	return num_bytes_read;
253 }
254 
255 /**
256  * @brief Prepare a working set of readers/writers
257  *
258  * Prepare a list of "working threads" into/from which the data
259  * will be directly copied. This list is useful as it is used to ...
260  *
261  *  1. avoid double copying
262  *  2. minimize interrupt latency as interrupts are unlocked
263  *     while copying data
264  *  3. ensure a timeout can not make the request impossible to satisfy
265  *
266  * The list is populated with previously pended threads that will be ready to
267  * run after the pipe call is complete.
268  *
269  * Important things to remember when reading from the pipe ...
270  * 1. If there are writers int @a wait_q, then the pipe's buffer is full.
271  * 2. Conversely if the pipe's buffer is not full, there are no writers.
272  * 3. The amount of available data in the pipe is the sum the bytes used in
273  *    the pipe (@a pipe_space) and all the requests from the waiting writers.
274  * 4. Since data is read from the pipe's buffer first, the working set must
275  *    include writers that will (try to) re-fill the pipe's buffer afterwards.
276  *
277  * Important things to remember when writing to the pipe ...
278  * 1. If there are readers in @a wait_q, then the pipe's buffer is empty.
279  * 2. Conversely if the pipe's buffer is not empty, then there are no readers.
280  * 3. The amount of space available in the pipe is the sum of the bytes unused
281  *    in the pipe (@a pipe_space) and all the requests from the waiting readers.
282  *
283  * @return false if request is unsatisfiable, otherwise true
284  */
pipe_xfer_prepare(sys_dlist_t * xfer_list,struct k_thread ** waiter,_wait_q_t * wait_q,size_t pipe_space,size_t bytes_to_xfer,size_t min_xfer,k_timeout_t timeout)285 static bool pipe_xfer_prepare(sys_dlist_t      *xfer_list,
286 			       struct k_thread **waiter,
287 			       _wait_q_t        *wait_q,
288 			       size_t            pipe_space,
289 			       size_t            bytes_to_xfer,
290 			       size_t            min_xfer,
291 			       k_timeout_t           timeout)
292 {
293 	struct k_thread  *thread;
294 	struct k_pipe_desc *desc;
295 	size_t num_bytes = 0;
296 
297 	if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
298 		_WAIT_Q_FOR_EACH(wait_q, thread) {
299 			desc = (struct k_pipe_desc *)thread->base.swap_data;
300 
301 			num_bytes += desc->bytes_to_xfer;
302 
303 			if (num_bytes >= bytes_to_xfer) {
304 				break;
305 			}
306 		}
307 
308 		if (num_bytes + pipe_space < min_xfer) {
309 			return false;
310 		}
311 	}
312 
313 	/*
314 	 * Either @a timeout is not K_NO_WAIT (so the thread may pend) or
315 	 * the entire request can be satisfied. Generate the working list.
316 	 */
317 
318 	sys_dlist_init(xfer_list);
319 	num_bytes = 0;
320 
321 	while ((thread = z_waitq_head(wait_q)) != NULL) {
322 		desc = (struct k_pipe_desc *)thread->base.swap_data;
323 		num_bytes += desc->bytes_to_xfer;
324 
325 		if (num_bytes > bytes_to_xfer) {
326 			/*
327 			 * This request can not be fully satisfied.
328 			 * Do not remove it from the wait_q.
329 			 * Do not abort its timeout (if applicable).
330 			 * Do not add it to the transfer list
331 			 */
332 			break;
333 		}
334 
335 		/*
336 		 * This request can be fully satisfied.
337 		 * Remove it from the wait_q.
338 		 * Abort its timeout.
339 		 * Add it to the transfer list.
340 		 */
341 		z_unpend_thread(thread);
342 		sys_dlist_append(xfer_list, &thread->base.qnode_dlist);
343 	}
344 
345 	*waiter = (num_bytes > bytes_to_xfer) ? thread : NULL;
346 
347 	return true;
348 }
349 
350 /**
351  * @brief Determine the correct return code
352  *
353  * Bytes Xferred   No Wait   Wait
354  *   >= Minimum       0       0
355  *    < Minimum      -EIO*   -EAGAIN
356  *
357  * * The "-EIO No Wait" case was already checked when the "working set"
358  *   was created in  _pipe_xfer_prepare().
359  *
360  * @return See table above
361  */
pipe_return_code(size_t min_xfer,size_t bytes_remaining,size_t bytes_requested)362 static int pipe_return_code(size_t min_xfer, size_t bytes_remaining,
363 			     size_t bytes_requested)
364 {
365 	if (bytes_requested - bytes_remaining >= min_xfer) {
366 		/*
367 		 * At least the minimum number of requested
368 		 * bytes have been transferred.
369 		 */
370 		return 0;
371 	}
372 
373 	return -EAGAIN;
374 }
375 
376 /**
377  * @brief Ready a pipe thread
378  *
379  * If the pipe thread is a real thread, then add it to the ready queue.
380  * If it is a dummy thread, then finish the asynchronous work.
381  *
382  * @return N/A
383  */
pipe_thread_ready(struct k_thread * thread)384 static void pipe_thread_ready(struct k_thread *thread)
385 {
386 #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
387 	if ((thread->base.thread_state & _THREAD_DUMMY) != 0U) {
388 		return;
389 	}
390 #endif
391 
392 	z_ready_thread(thread);
393 }
394 
395 /**
396  * @brief Internal API used to send data to a pipe
397  */
z_pipe_put_internal(struct k_pipe * pipe,struct k_pipe_async * async_desc,unsigned char * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)398 int z_pipe_put_internal(struct k_pipe *pipe, struct k_pipe_async *async_desc,
399 			 unsigned char *data, size_t bytes_to_write,
400 			 size_t *bytes_written, size_t min_xfer,
401 			 k_timeout_t timeout)
402 {
403 	struct k_thread    *reader;
404 	struct k_pipe_desc *desc;
405 	sys_dlist_t    xfer_list;
406 	size_t         num_bytes_written = 0;
407 	size_t         bytes_copied;
408 
409 #if (CONFIG_NUM_PIPE_ASYNC_MSGS == 0)
410 	ARG_UNUSED(async_desc);
411 #endif
412 
413 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, put, pipe, timeout);
414 
415 	CHECKIF((min_xfer > bytes_to_write) || bytes_written == NULL) {
416 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, -EINVAL);
417 
418 		return -EINVAL;
419 	}
420 
421 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
422 
423 	/*
424 	 * Create a list of "working readers" into which the data will be
425 	 * directly copied.
426 	 */
427 
428 	if (!pipe_xfer_prepare(&xfer_list, &reader, &pipe->wait_q.readers,
429 				pipe->size - pipe->bytes_used, bytes_to_write,
430 				min_xfer, timeout)) {
431 		k_spin_unlock(&pipe->lock, key);
432 		*bytes_written = 0;
433 
434 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, -EIO);
435 
436 		return -EIO;
437 	}
438 
439 	SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, put, pipe, timeout);
440 
441 	z_sched_lock();
442 	k_spin_unlock(&pipe->lock, key);
443 
444 	/*
445 	 * 1. 'xfer_list' currently contains a list of reader threads that can
446 	 * have their read requests fulfilled by the current call.
447 	 * 2. 'reader' if not NULL points to a thread on the reader wait_q
448 	 * that can get some of its requested data.
449 	 * 3. Interrupts are unlocked but the scheduler is locked to allow
450 	 * ticks to be delivered but no scheduling to occur
451 	 * 4. If 'reader' times out while we are copying data, not only do we
452 	 * still have a pointer to it, but it can not execute until this call
453 	 * is complete so it is still safe to copy data to it.
454 	 */
455 
456 	struct k_thread *thread = (struct k_thread *)
457 				  sys_dlist_get(&xfer_list);
458 	while (thread != NULL) {
459 		desc = (struct k_pipe_desc *)thread->base.swap_data;
460 		bytes_copied = pipe_xfer(desc->buffer, desc->bytes_to_xfer,
461 					  data + num_bytes_written,
462 					  bytes_to_write - num_bytes_written);
463 
464 		num_bytes_written   += bytes_copied;
465 		desc->buffer        += bytes_copied;
466 		desc->bytes_to_xfer -= bytes_copied;
467 
468 		/* The thread's read request has been satisfied. Ready it. */
469 		z_ready_thread(thread);
470 
471 		thread = (struct k_thread *)sys_dlist_get(&xfer_list);
472 	}
473 
474 	/*
475 	 * Copy any data to the reader that we left on the wait_q.
476 	 * It is possible no data will be copied.
477 	 */
478 	if (reader != NULL) {
479 		desc = (struct k_pipe_desc *)reader->base.swap_data;
480 		bytes_copied = pipe_xfer(desc->buffer, desc->bytes_to_xfer,
481 					  data + num_bytes_written,
482 					  bytes_to_write - num_bytes_written);
483 
484 		num_bytes_written   += bytes_copied;
485 		desc->buffer        += bytes_copied;
486 		desc->bytes_to_xfer -= bytes_copied;
487 	}
488 
489 	/*
490 	 * As much data as possible has been directly copied to any waiting
491 	 * readers. Add as much as possible to the pipe's circular buffer.
492 	 */
493 
494 	num_bytes_written +=
495 		pipe_buffer_put(pipe, data + num_bytes_written,
496 				 bytes_to_write - num_bytes_written);
497 
498 	if (num_bytes_written == bytes_to_write) {
499 		*bytes_written = num_bytes_written;
500 		k_sched_unlock();
501 
502 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
503 
504 		return 0;
505 	}
506 
507 	if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)
508 	    && num_bytes_written >= min_xfer
509 	    && min_xfer > 0U) {
510 		*bytes_written = num_bytes_written;
511 		k_sched_unlock();
512 
513 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
514 
515 		return 0;
516 	}
517 
518 	/* Not all data was copied */
519 
520 	struct k_pipe_desc  pipe_desc;
521 
522 	pipe_desc.buffer         = data + num_bytes_written;
523 	pipe_desc.bytes_to_xfer  = bytes_to_write - num_bytes_written;
524 
525 	if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
526 		_current->base.swap_data = &pipe_desc;
527 		/*
528 		 * Lock interrupts and unlock the scheduler before
529 		 * manipulating the writers wait_q.
530 		 */
531 		k_spinlock_key_t key2 = k_spin_lock(&pipe->lock);
532 		z_sched_unlock_no_reschedule();
533 		(void)z_pend_curr(&pipe->lock, key2,
534 				 &pipe->wait_q.writers, timeout);
535 	} else {
536 		k_sched_unlock();
537 	}
538 
539 	*bytes_written = bytes_to_write - pipe_desc.bytes_to_xfer;
540 
541 	int ret = pipe_return_code(min_xfer, pipe_desc.bytes_to_xfer,
542 				 bytes_to_write);
543 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, ret);
544 	return ret;
545 }
546 
z_impl_k_pipe_get(struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)547 int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
548 		     size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
549 {
550 	struct k_thread    *writer;
551 	struct k_pipe_desc *desc;
552 	sys_dlist_t    xfer_list;
553 	size_t         num_bytes_read = 0;
554 	size_t         bytes_copied;
555 
556 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, get, pipe, timeout);
557 
558 	CHECKIF((min_xfer > bytes_to_read) || bytes_read == NULL) {
559 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, -EINVAL);
560 
561 		return -EINVAL;
562 	}
563 
564 	k_spinlock_key_t key = k_spin_lock(&pipe->lock);
565 
566 	/*
567 	 * Create a list of "working readers" into which the data will be
568 	 * directly copied.
569 	 */
570 	if (!pipe_xfer_prepare(&xfer_list, &writer, &pipe->wait_q.writers,
571 				pipe->bytes_used, bytes_to_read,
572 				min_xfer, timeout)) {
573 		k_spin_unlock(&pipe->lock, key);
574 		*bytes_read = 0;
575 
576 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, -EIO);
577 
578 		return -EIO;
579 	}
580 
581 	SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, get, pipe, timeout);
582 
583 	z_sched_lock();
584 	k_spin_unlock(&pipe->lock, key);
585 
586 	num_bytes_read = pipe_buffer_get(pipe, data, bytes_to_read);
587 
588 	/*
589 	 * 1. 'xfer_list' currently contains a list of writer threads that can
590 	 *     have their write requests fulfilled by the current call.
591 	 * 2. 'writer' if not NULL points to a thread on the writer wait_q
592 	 *    that can post some of its requested data.
593 	 * 3. Data will be copied from each writer's buffer to either the
594 	 *    reader's buffer and/or to the pipe's circular buffer.
595 	 * 4. Interrupts are unlocked but the scheduler is locked to allow
596 	 *    ticks to be delivered but no scheduling to occur
597 	 * 5. If 'writer' times out while we are copying data, not only do we
598 	 *    still have a pointer to it, but it can not execute until this
599 	 *    call is complete so it is still safe to copy data from it.
600 	 */
601 
602 	struct k_thread *thread = (struct k_thread *)
603 				  sys_dlist_get(&xfer_list);
604 	while ((thread != NULL) && (num_bytes_read < bytes_to_read)) {
605 		desc = (struct k_pipe_desc *)thread->base.swap_data;
606 		bytes_copied = pipe_xfer((uint8_t *)data + num_bytes_read,
607 					  bytes_to_read - num_bytes_read,
608 					  desc->buffer, desc->bytes_to_xfer);
609 
610 		num_bytes_read       += bytes_copied;
611 		desc->buffer         += bytes_copied;
612 		desc->bytes_to_xfer  -= bytes_copied;
613 
614 		/*
615 		 * It is expected that the write request will be satisfied.
616 		 * However, if the read request was satisfied before the
617 		 * write request was satisfied, then the write request must
618 		 * finish later when writing to the pipe's circular buffer.
619 		 */
620 		if (num_bytes_read == bytes_to_read) {
621 			break;
622 		}
623 		pipe_thread_ready(thread);
624 
625 		thread = (struct k_thread *)sys_dlist_get(&xfer_list);
626 	}
627 
628 	if ((writer != NULL) && (num_bytes_read < bytes_to_read)) {
629 		desc = (struct k_pipe_desc *)writer->base.swap_data;
630 		bytes_copied = pipe_xfer((uint8_t *)data + num_bytes_read,
631 					  bytes_to_read - num_bytes_read,
632 					  desc->buffer, desc->bytes_to_xfer);
633 
634 		num_bytes_read       += bytes_copied;
635 		desc->buffer         += bytes_copied;
636 		desc->bytes_to_xfer  -= bytes_copied;
637 	}
638 
639 	/*
640 	 * Copy as much data as possible from the writers (if any)
641 	 * into the pipe's circular buffer.
642 	 */
643 
644 	while (thread != NULL) {
645 		desc = (struct k_pipe_desc *)thread->base.swap_data;
646 		bytes_copied = pipe_buffer_put(pipe, desc->buffer,
647 						desc->bytes_to_xfer);
648 
649 		desc->buffer         += bytes_copied;
650 		desc->bytes_to_xfer  -= bytes_copied;
651 
652 		/* Write request has been satisfied */
653 		pipe_thread_ready(thread);
654 
655 		thread = (struct k_thread *)sys_dlist_get(&xfer_list);
656 	}
657 
658 	if (writer != NULL) {
659 		desc = (struct k_pipe_desc *)writer->base.swap_data;
660 		bytes_copied = pipe_buffer_put(pipe, desc->buffer,
661 						desc->bytes_to_xfer);
662 
663 		desc->buffer         += bytes_copied;
664 		desc->bytes_to_xfer  -= bytes_copied;
665 	}
666 
667 	if (num_bytes_read == bytes_to_read) {
668 		k_sched_unlock();
669 
670 		*bytes_read = num_bytes_read;
671 
672 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, 0);
673 
674 		return 0;
675 	}
676 
677 	if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)
678 	    && num_bytes_read >= min_xfer
679 	    && min_xfer > 0U) {
680 		k_sched_unlock();
681 
682 		*bytes_read = num_bytes_read;
683 
684 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, 0);
685 
686 		return 0;
687 	}
688 
689 	/* Not all data was read */
690 
691 	struct k_pipe_desc  pipe_desc;
692 
693 	pipe_desc.buffer        = (uint8_t *)data + num_bytes_read;
694 	pipe_desc.bytes_to_xfer = bytes_to_read - num_bytes_read;
695 
696 	if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
697 		_current->base.swap_data = &pipe_desc;
698 		k_spinlock_key_t key2 = k_spin_lock(&pipe->lock);
699 
700 		z_sched_unlock_no_reschedule();
701 		(void)z_pend_curr(&pipe->lock, key2,
702 				 &pipe->wait_q.readers, timeout);
703 	} else {
704 		k_sched_unlock();
705 	}
706 
707 	*bytes_read = bytes_to_read - pipe_desc.bytes_to_xfer;
708 
709 	int ret = pipe_return_code(min_xfer, pipe_desc.bytes_to_xfer,
710 				 bytes_to_read);
711 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, ret);
712 	return ret;
713 }
714 
715 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_get(struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)716 int z_vrfy_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
717 		      size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
718 {
719 	Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
720 	Z_OOPS(Z_SYSCALL_MEMORY_WRITE(bytes_read, sizeof(*bytes_read)));
721 	Z_OOPS(Z_SYSCALL_MEMORY_WRITE((void *)data, bytes_to_read));
722 
723 	return z_impl_k_pipe_get((struct k_pipe *)pipe, (void *)data,
724 				bytes_to_read, bytes_read, min_xfer,
725 				timeout);
726 }
727 #include <syscalls/k_pipe_get_mrsh.c>
728 #endif
729 
z_impl_k_pipe_put(struct k_pipe * pipe,void * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)730 int z_impl_k_pipe_put(struct k_pipe *pipe, void *data, size_t bytes_to_write,
731 		     size_t *bytes_written, size_t min_xfer,
732 		      k_timeout_t timeout)
733 {
734 	return z_pipe_put_internal(pipe, NULL, data,
735 				    bytes_to_write, bytes_written,
736 				    min_xfer, timeout);
737 }
738 
739 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_put(struct k_pipe * pipe,void * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)740 int z_vrfy_k_pipe_put(struct k_pipe *pipe, void *data, size_t bytes_to_write,
741 		     size_t *bytes_written, size_t min_xfer,
742 		      k_timeout_t timeout)
743 {
744 	Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
745 	Z_OOPS(Z_SYSCALL_MEMORY_WRITE(bytes_written, sizeof(*bytes_written)));
746 	Z_OOPS(Z_SYSCALL_MEMORY_READ((void *)data, bytes_to_write));
747 
748 	return z_impl_k_pipe_put((struct k_pipe *)pipe, (void *)data,
749 				bytes_to_write, bytes_written, min_xfer,
750 				timeout);
751 }
752 #include <syscalls/k_pipe_put_mrsh.c>
753 #endif
754 
z_impl_k_pipe_read_avail(struct k_pipe * pipe)755 size_t z_impl_k_pipe_read_avail(struct k_pipe *pipe)
756 {
757 	size_t res;
758 	k_spinlock_key_t key;
759 
760 	/* Buffer and size are fixed. No need to spin. */
761 	if (pipe->buffer == NULL || pipe->size == 0U) {
762 		res = 0;
763 		goto out;
764 	}
765 
766 	key = k_spin_lock(&pipe->lock);
767 
768 	if (pipe->read_index == pipe->write_index) {
769 		res = pipe->bytes_used;
770 	} else if (pipe->read_index < pipe->write_index) {
771 		res = pipe->write_index - pipe->read_index;
772 	} else {
773 		res = pipe->size - (pipe->read_index - pipe->write_index);
774 	}
775 
776 	k_spin_unlock(&pipe->lock, key);
777 
778 out:
779 	return res;
780 }
781 
782 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_read_avail(struct k_pipe * pipe)783 size_t z_vrfy_k_pipe_read_avail(struct k_pipe *pipe)
784 {
785 	Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
786 
787 	return z_impl_k_pipe_read_avail(pipe);
788 }
789 #include <syscalls/k_pipe_read_avail_mrsh.c>
790 #endif
791 
z_impl_k_pipe_write_avail(struct k_pipe * pipe)792 size_t z_impl_k_pipe_write_avail(struct k_pipe *pipe)
793 {
794 	size_t res;
795 	k_spinlock_key_t key;
796 
797 	/* Buffer and size are fixed. No need to spin. */
798 	if (pipe->buffer == NULL || pipe->size == 0U) {
799 		res = 0;
800 		goto out;
801 	}
802 
803 	key = k_spin_lock(&pipe->lock);
804 
805 	if (pipe->write_index == pipe->read_index) {
806 		res = pipe->size - pipe->bytes_used;
807 	} else if (pipe->write_index < pipe->read_index) {
808 		res = pipe->read_index - pipe->write_index;
809 	} else {
810 		res = pipe->size - (pipe->write_index - pipe->read_index);
811 	}
812 
813 	k_spin_unlock(&pipe->lock, key);
814 
815 out:
816 	return res;
817 }
818 
819 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_write_avail(struct k_pipe * pipe)820 size_t z_vrfy_k_pipe_write_avail(struct k_pipe *pipe)
821 {
822 	Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
823 
824 	return z_impl_k_pipe_write_avail(pipe);
825 }
826 #include <syscalls/k_pipe_write_avail_mrsh.c>
827 #endif
828