1 /*
2 * Copyright (c) 2016 Wind River Systems, Inc.
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /**
8 * @file
9 *
10 * @brief Pipes
11 */
12
13 #include <kernel.h>
14 #include <kernel_structs.h>
15
16 #include <toolchain.h>
17 #include <ksched.h>
18 #include <wait_q.h>
19 #include <init.h>
20 #include <syscall_handler.h>
21 #include <kernel_internal.h>
22 #include <sys/check.h>
23
24 struct k_pipe_desc {
25 unsigned char *buffer; /* Position in src/dest buffer */
26 size_t bytes_to_xfer; /* # bytes left to transfer */
27 #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
28 struct k_mem_block *block; /* Pointer to memory block */
29 struct k_mem_block copy_block; /* For backwards compatibility */
30 struct k_sem *sem; /* Semaphore to give if async */
31 #endif
32 };
33
34 struct k_pipe_async {
35 struct _thread_base thread; /* Dummy thread object */
36 struct k_pipe_desc desc; /* Pipe message descriptor */
37 };
38
39 #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
40 /* stack of unused asynchronous message descriptors */
41 K_STACK_DEFINE(pipe_async_msgs, CONFIG_NUM_PIPE_ASYNC_MSGS);
42 #endif /* CONFIG_NUM_PIPE_ASYNC_MSGS > 0 */
43
44 #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
45
46 /*
47 * Do run-time initialization of pipe object subsystem.
48 */
init_pipes_module(const struct device * dev)49 static int init_pipes_module(const struct device *dev)
50 {
51 ARG_UNUSED(dev);
52
53 /* Array of asynchronous message descriptors */
54 static struct k_pipe_async __noinit async_msg[CONFIG_NUM_PIPE_ASYNC_MSGS];
55
56 #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
57 /*
58 * Create pool of asynchronous pipe message descriptors.
59 *
60 * A dummy thread requires minimal initialization, since it never gets
61 * to execute. The _THREAD_DUMMY flag is sufficient to distinguish a
62 * dummy thread from a real one. The threads are *not* added to the
63 * kernel's list of known threads.
64 *
65 * Once initialized, the address of each descriptor is added to a stack
66 * that governs access to them.
67 */
68
69 for (int i = 0; i < CONFIG_NUM_PIPE_ASYNC_MSGS; i++) {
70 async_msg[i].thread.thread_state = _THREAD_DUMMY;
71 async_msg[i].thread.swap_data = &async_msg[i].desc;
72
73 z_init_thread_timeout(&async_msg[i].thread);
74
75 k_stack_push(&pipe_async_msgs, (stack_data_t)&async_msg[i]);
76 }
77 #endif /* CONFIG_NUM_PIPE_ASYNC_MSGS > 0 */
78
79 /* Complete initialization of statically defined mailboxes. */
80
81 return 0;
82 }
83
84 SYS_INIT(init_pipes_module, PRE_KERNEL_1, CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
85
86 #endif /* CONFIG_NUM_PIPE_ASYNC_MSGS */
87
k_pipe_init(struct k_pipe * pipe,unsigned char * buffer,size_t size)88 void k_pipe_init(struct k_pipe *pipe, unsigned char *buffer, size_t size)
89 {
90 pipe->buffer = buffer;
91 pipe->size = size;
92 pipe->bytes_used = 0;
93 pipe->read_index = 0;
94 pipe->write_index = 0;
95 pipe->lock = (struct k_spinlock){};
96 z_waitq_init(&pipe->wait_q.writers);
97 z_waitq_init(&pipe->wait_q.readers);
98 SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe);
99
100 pipe->flags = 0;
101 z_object_init(pipe);
102 }
103
z_impl_k_pipe_alloc_init(struct k_pipe * pipe,size_t size)104 int z_impl_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
105 {
106 void *buffer;
107 int ret;
108
109 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, alloc_init, pipe);
110
111 if (size != 0U) {
112 buffer = z_thread_malloc(size);
113 if (buffer != NULL) {
114 k_pipe_init(pipe, buffer, size);
115 pipe->flags = K_PIPE_FLAG_ALLOC;
116 ret = 0;
117 } else {
118 ret = -ENOMEM;
119 }
120 } else {
121 k_pipe_init(pipe, NULL, 0);
122 ret = 0;
123 }
124
125 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, alloc_init, pipe, ret);
126
127 return ret;
128 }
129
130 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_alloc_init(struct k_pipe * pipe,size_t size)131 static inline int z_vrfy_k_pipe_alloc_init(struct k_pipe *pipe, size_t size)
132 {
133 Z_OOPS(Z_SYSCALL_OBJ_NEVER_INIT(pipe, K_OBJ_PIPE));
134
135 return z_impl_k_pipe_alloc_init(pipe, size);
136 }
137 #include <syscalls/k_pipe_alloc_init_mrsh.c>
138 #endif
139
k_pipe_cleanup(struct k_pipe * pipe)140 int k_pipe_cleanup(struct k_pipe *pipe)
141 {
142 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, cleanup, pipe);
143
144 CHECKIF(z_waitq_head(&pipe->wait_q.readers) != NULL ||
145 z_waitq_head(&pipe->wait_q.writers) != NULL) {
146 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, -EAGAIN);
147
148 return -EAGAIN;
149 }
150
151 if ((pipe->flags & K_PIPE_FLAG_ALLOC) != 0U) {
152 k_free(pipe->buffer);
153 pipe->buffer = NULL;
154 pipe->flags &= ~K_PIPE_FLAG_ALLOC;
155 }
156
157 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, cleanup, pipe, 0);
158
159 return 0;
160 }
161
162 /**
163 * @brief Copy bytes from @a src to @a dest
164 *
165 * @return Number of bytes copied
166 */
pipe_xfer(unsigned char * dest,size_t dest_size,const unsigned char * src,size_t src_size)167 static size_t pipe_xfer(unsigned char *dest, size_t dest_size,
168 const unsigned char *src, size_t src_size)
169 {
170 size_t num_bytes = MIN(dest_size, src_size);
171 const unsigned char *end = src + num_bytes;
172
173 while (src != end) {
174 *dest = *src;
175 dest++;
176 src++;
177 }
178
179 return num_bytes;
180 }
181
182 /**
183 * @brief Put data from @a src into the pipe's circular buffer
184 *
185 * Modifies the following fields in @a pipe:
186 * buffer, bytes_used, write_index
187 *
188 * @return Number of bytes written to the pipe's circular buffer
189 */
pipe_buffer_put(struct k_pipe * pipe,const unsigned char * src,size_t src_size)190 static size_t pipe_buffer_put(struct k_pipe *pipe,
191 const unsigned char *src, size_t src_size)
192 {
193 size_t bytes_copied;
194 size_t run_length;
195 size_t num_bytes_written = 0;
196 int i;
197
198
199 for (i = 0; i < 2; i++) {
200 run_length = MIN(pipe->size - pipe->bytes_used,
201 pipe->size - pipe->write_index);
202
203 bytes_copied = pipe_xfer(pipe->buffer + pipe->write_index,
204 run_length,
205 src + num_bytes_written,
206 src_size - num_bytes_written);
207
208 num_bytes_written += bytes_copied;
209 pipe->bytes_used += bytes_copied;
210 pipe->write_index += bytes_copied;
211 if (pipe->write_index == pipe->size) {
212 pipe->write_index = 0;
213 }
214 }
215
216 return num_bytes_written;
217 }
218
219 /**
220 * @brief Get data from the pipe's circular buffer
221 *
222 * Modifies the following fields in @a pipe:
223 * bytes_used, read_index
224 *
225 * @return Number of bytes read from the pipe's circular buffer
226 */
pipe_buffer_get(struct k_pipe * pipe,unsigned char * dest,size_t dest_size)227 static size_t pipe_buffer_get(struct k_pipe *pipe,
228 unsigned char *dest, size_t dest_size)
229 {
230 size_t bytes_copied;
231 size_t run_length;
232 size_t num_bytes_read = 0;
233 int i;
234
235 for (i = 0; i < 2; i++) {
236 run_length = MIN(pipe->bytes_used,
237 pipe->size - pipe->read_index);
238
239 bytes_copied = pipe_xfer(dest + num_bytes_read,
240 dest_size - num_bytes_read,
241 pipe->buffer + pipe->read_index,
242 run_length);
243
244 num_bytes_read += bytes_copied;
245 pipe->bytes_used -= bytes_copied;
246 pipe->read_index += bytes_copied;
247 if (pipe->read_index == pipe->size) {
248 pipe->read_index = 0;
249 }
250 }
251
252 return num_bytes_read;
253 }
254
255 /**
256 * @brief Prepare a working set of readers/writers
257 *
258 * Prepare a list of "working threads" into/from which the data
259 * will be directly copied. This list is useful as it is used to ...
260 *
261 * 1. avoid double copying
262 * 2. minimize interrupt latency as interrupts are unlocked
263 * while copying data
264 * 3. ensure a timeout can not make the request impossible to satisfy
265 *
266 * The list is populated with previously pended threads that will be ready to
267 * run after the pipe call is complete.
268 *
269 * Important things to remember when reading from the pipe ...
270 * 1. If there are writers int @a wait_q, then the pipe's buffer is full.
271 * 2. Conversely if the pipe's buffer is not full, there are no writers.
272 * 3. The amount of available data in the pipe is the sum the bytes used in
273 * the pipe (@a pipe_space) and all the requests from the waiting writers.
274 * 4. Since data is read from the pipe's buffer first, the working set must
275 * include writers that will (try to) re-fill the pipe's buffer afterwards.
276 *
277 * Important things to remember when writing to the pipe ...
278 * 1. If there are readers in @a wait_q, then the pipe's buffer is empty.
279 * 2. Conversely if the pipe's buffer is not empty, then there are no readers.
280 * 3. The amount of space available in the pipe is the sum of the bytes unused
281 * in the pipe (@a pipe_space) and all the requests from the waiting readers.
282 *
283 * @return false if request is unsatisfiable, otherwise true
284 */
pipe_xfer_prepare(sys_dlist_t * xfer_list,struct k_thread ** waiter,_wait_q_t * wait_q,size_t pipe_space,size_t bytes_to_xfer,size_t min_xfer,k_timeout_t timeout)285 static bool pipe_xfer_prepare(sys_dlist_t *xfer_list,
286 struct k_thread **waiter,
287 _wait_q_t *wait_q,
288 size_t pipe_space,
289 size_t bytes_to_xfer,
290 size_t min_xfer,
291 k_timeout_t timeout)
292 {
293 struct k_thread *thread;
294 struct k_pipe_desc *desc;
295 size_t num_bytes = 0;
296
297 if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
298 _WAIT_Q_FOR_EACH(wait_q, thread) {
299 desc = (struct k_pipe_desc *)thread->base.swap_data;
300
301 num_bytes += desc->bytes_to_xfer;
302
303 if (num_bytes >= bytes_to_xfer) {
304 break;
305 }
306 }
307
308 if (num_bytes + pipe_space < min_xfer) {
309 return false;
310 }
311 }
312
313 /*
314 * Either @a timeout is not K_NO_WAIT (so the thread may pend) or
315 * the entire request can be satisfied. Generate the working list.
316 */
317
318 sys_dlist_init(xfer_list);
319 num_bytes = 0;
320
321 while ((thread = z_waitq_head(wait_q)) != NULL) {
322 desc = (struct k_pipe_desc *)thread->base.swap_data;
323 num_bytes += desc->bytes_to_xfer;
324
325 if (num_bytes > bytes_to_xfer) {
326 /*
327 * This request can not be fully satisfied.
328 * Do not remove it from the wait_q.
329 * Do not abort its timeout (if applicable).
330 * Do not add it to the transfer list
331 */
332 break;
333 }
334
335 /*
336 * This request can be fully satisfied.
337 * Remove it from the wait_q.
338 * Abort its timeout.
339 * Add it to the transfer list.
340 */
341 z_unpend_thread(thread);
342 sys_dlist_append(xfer_list, &thread->base.qnode_dlist);
343 }
344
345 *waiter = (num_bytes > bytes_to_xfer) ? thread : NULL;
346
347 return true;
348 }
349
350 /**
351 * @brief Determine the correct return code
352 *
353 * Bytes Xferred No Wait Wait
354 * >= Minimum 0 0
355 * < Minimum -EIO* -EAGAIN
356 *
357 * * The "-EIO No Wait" case was already checked when the "working set"
358 * was created in _pipe_xfer_prepare().
359 *
360 * @return See table above
361 */
pipe_return_code(size_t min_xfer,size_t bytes_remaining,size_t bytes_requested)362 static int pipe_return_code(size_t min_xfer, size_t bytes_remaining,
363 size_t bytes_requested)
364 {
365 if (bytes_requested - bytes_remaining >= min_xfer) {
366 /*
367 * At least the minimum number of requested
368 * bytes have been transferred.
369 */
370 return 0;
371 }
372
373 return -EAGAIN;
374 }
375
376 /**
377 * @brief Ready a pipe thread
378 *
379 * If the pipe thread is a real thread, then add it to the ready queue.
380 * If it is a dummy thread, then finish the asynchronous work.
381 *
382 * @return N/A
383 */
pipe_thread_ready(struct k_thread * thread)384 static void pipe_thread_ready(struct k_thread *thread)
385 {
386 #if (CONFIG_NUM_PIPE_ASYNC_MSGS > 0)
387 if ((thread->base.thread_state & _THREAD_DUMMY) != 0U) {
388 return;
389 }
390 #endif
391
392 z_ready_thread(thread);
393 }
394
395 /**
396 * @brief Internal API used to send data to a pipe
397 */
z_pipe_put_internal(struct k_pipe * pipe,struct k_pipe_async * async_desc,unsigned char * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)398 int z_pipe_put_internal(struct k_pipe *pipe, struct k_pipe_async *async_desc,
399 unsigned char *data, size_t bytes_to_write,
400 size_t *bytes_written, size_t min_xfer,
401 k_timeout_t timeout)
402 {
403 struct k_thread *reader;
404 struct k_pipe_desc *desc;
405 sys_dlist_t xfer_list;
406 size_t num_bytes_written = 0;
407 size_t bytes_copied;
408
409 #if (CONFIG_NUM_PIPE_ASYNC_MSGS == 0)
410 ARG_UNUSED(async_desc);
411 #endif
412
413 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, put, pipe, timeout);
414
415 CHECKIF((min_xfer > bytes_to_write) || bytes_written == NULL) {
416 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, -EINVAL);
417
418 return -EINVAL;
419 }
420
421 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
422
423 /*
424 * Create a list of "working readers" into which the data will be
425 * directly copied.
426 */
427
428 if (!pipe_xfer_prepare(&xfer_list, &reader, &pipe->wait_q.readers,
429 pipe->size - pipe->bytes_used, bytes_to_write,
430 min_xfer, timeout)) {
431 k_spin_unlock(&pipe->lock, key);
432 *bytes_written = 0;
433
434 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, -EIO);
435
436 return -EIO;
437 }
438
439 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, put, pipe, timeout);
440
441 z_sched_lock();
442 k_spin_unlock(&pipe->lock, key);
443
444 /*
445 * 1. 'xfer_list' currently contains a list of reader threads that can
446 * have their read requests fulfilled by the current call.
447 * 2. 'reader' if not NULL points to a thread on the reader wait_q
448 * that can get some of its requested data.
449 * 3. Interrupts are unlocked but the scheduler is locked to allow
450 * ticks to be delivered but no scheduling to occur
451 * 4. If 'reader' times out while we are copying data, not only do we
452 * still have a pointer to it, but it can not execute until this call
453 * is complete so it is still safe to copy data to it.
454 */
455
456 struct k_thread *thread = (struct k_thread *)
457 sys_dlist_get(&xfer_list);
458 while (thread != NULL) {
459 desc = (struct k_pipe_desc *)thread->base.swap_data;
460 bytes_copied = pipe_xfer(desc->buffer, desc->bytes_to_xfer,
461 data + num_bytes_written,
462 bytes_to_write - num_bytes_written);
463
464 num_bytes_written += bytes_copied;
465 desc->buffer += bytes_copied;
466 desc->bytes_to_xfer -= bytes_copied;
467
468 /* The thread's read request has been satisfied. Ready it. */
469 z_ready_thread(thread);
470
471 thread = (struct k_thread *)sys_dlist_get(&xfer_list);
472 }
473
474 /*
475 * Copy any data to the reader that we left on the wait_q.
476 * It is possible no data will be copied.
477 */
478 if (reader != NULL) {
479 desc = (struct k_pipe_desc *)reader->base.swap_data;
480 bytes_copied = pipe_xfer(desc->buffer, desc->bytes_to_xfer,
481 data + num_bytes_written,
482 bytes_to_write - num_bytes_written);
483
484 num_bytes_written += bytes_copied;
485 desc->buffer += bytes_copied;
486 desc->bytes_to_xfer -= bytes_copied;
487 }
488
489 /*
490 * As much data as possible has been directly copied to any waiting
491 * readers. Add as much as possible to the pipe's circular buffer.
492 */
493
494 num_bytes_written +=
495 pipe_buffer_put(pipe, data + num_bytes_written,
496 bytes_to_write - num_bytes_written);
497
498 if (num_bytes_written == bytes_to_write) {
499 *bytes_written = num_bytes_written;
500 k_sched_unlock();
501
502 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
503
504 return 0;
505 }
506
507 if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)
508 && num_bytes_written >= min_xfer
509 && min_xfer > 0U) {
510 *bytes_written = num_bytes_written;
511 k_sched_unlock();
512
513 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, 0);
514
515 return 0;
516 }
517
518 /* Not all data was copied */
519
520 struct k_pipe_desc pipe_desc;
521
522 pipe_desc.buffer = data + num_bytes_written;
523 pipe_desc.bytes_to_xfer = bytes_to_write - num_bytes_written;
524
525 if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
526 _current->base.swap_data = &pipe_desc;
527 /*
528 * Lock interrupts and unlock the scheduler before
529 * manipulating the writers wait_q.
530 */
531 k_spinlock_key_t key2 = k_spin_lock(&pipe->lock);
532 z_sched_unlock_no_reschedule();
533 (void)z_pend_curr(&pipe->lock, key2,
534 &pipe->wait_q.writers, timeout);
535 } else {
536 k_sched_unlock();
537 }
538
539 *bytes_written = bytes_to_write - pipe_desc.bytes_to_xfer;
540
541 int ret = pipe_return_code(min_xfer, pipe_desc.bytes_to_xfer,
542 bytes_to_write);
543 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, put, pipe, timeout, ret);
544 return ret;
545 }
546
z_impl_k_pipe_get(struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)547 int z_impl_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
548 size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
549 {
550 struct k_thread *writer;
551 struct k_pipe_desc *desc;
552 sys_dlist_t xfer_list;
553 size_t num_bytes_read = 0;
554 size_t bytes_copied;
555
556 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, get, pipe, timeout);
557
558 CHECKIF((min_xfer > bytes_to_read) || bytes_read == NULL) {
559 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, -EINVAL);
560
561 return -EINVAL;
562 }
563
564 k_spinlock_key_t key = k_spin_lock(&pipe->lock);
565
566 /*
567 * Create a list of "working readers" into which the data will be
568 * directly copied.
569 */
570 if (!pipe_xfer_prepare(&xfer_list, &writer, &pipe->wait_q.writers,
571 pipe->bytes_used, bytes_to_read,
572 min_xfer, timeout)) {
573 k_spin_unlock(&pipe->lock, key);
574 *bytes_read = 0;
575
576 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, -EIO);
577
578 return -EIO;
579 }
580
581 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, get, pipe, timeout);
582
583 z_sched_lock();
584 k_spin_unlock(&pipe->lock, key);
585
586 num_bytes_read = pipe_buffer_get(pipe, data, bytes_to_read);
587
588 /*
589 * 1. 'xfer_list' currently contains a list of writer threads that can
590 * have their write requests fulfilled by the current call.
591 * 2. 'writer' if not NULL points to a thread on the writer wait_q
592 * that can post some of its requested data.
593 * 3. Data will be copied from each writer's buffer to either the
594 * reader's buffer and/or to the pipe's circular buffer.
595 * 4. Interrupts are unlocked but the scheduler is locked to allow
596 * ticks to be delivered but no scheduling to occur
597 * 5. If 'writer' times out while we are copying data, not only do we
598 * still have a pointer to it, but it can not execute until this
599 * call is complete so it is still safe to copy data from it.
600 */
601
602 struct k_thread *thread = (struct k_thread *)
603 sys_dlist_get(&xfer_list);
604 while ((thread != NULL) && (num_bytes_read < bytes_to_read)) {
605 desc = (struct k_pipe_desc *)thread->base.swap_data;
606 bytes_copied = pipe_xfer((uint8_t *)data + num_bytes_read,
607 bytes_to_read - num_bytes_read,
608 desc->buffer, desc->bytes_to_xfer);
609
610 num_bytes_read += bytes_copied;
611 desc->buffer += bytes_copied;
612 desc->bytes_to_xfer -= bytes_copied;
613
614 /*
615 * It is expected that the write request will be satisfied.
616 * However, if the read request was satisfied before the
617 * write request was satisfied, then the write request must
618 * finish later when writing to the pipe's circular buffer.
619 */
620 if (num_bytes_read == bytes_to_read) {
621 break;
622 }
623 pipe_thread_ready(thread);
624
625 thread = (struct k_thread *)sys_dlist_get(&xfer_list);
626 }
627
628 if ((writer != NULL) && (num_bytes_read < bytes_to_read)) {
629 desc = (struct k_pipe_desc *)writer->base.swap_data;
630 bytes_copied = pipe_xfer((uint8_t *)data + num_bytes_read,
631 bytes_to_read - num_bytes_read,
632 desc->buffer, desc->bytes_to_xfer);
633
634 num_bytes_read += bytes_copied;
635 desc->buffer += bytes_copied;
636 desc->bytes_to_xfer -= bytes_copied;
637 }
638
639 /*
640 * Copy as much data as possible from the writers (if any)
641 * into the pipe's circular buffer.
642 */
643
644 while (thread != NULL) {
645 desc = (struct k_pipe_desc *)thread->base.swap_data;
646 bytes_copied = pipe_buffer_put(pipe, desc->buffer,
647 desc->bytes_to_xfer);
648
649 desc->buffer += bytes_copied;
650 desc->bytes_to_xfer -= bytes_copied;
651
652 /* Write request has been satisfied */
653 pipe_thread_ready(thread);
654
655 thread = (struct k_thread *)sys_dlist_get(&xfer_list);
656 }
657
658 if (writer != NULL) {
659 desc = (struct k_pipe_desc *)writer->base.swap_data;
660 bytes_copied = pipe_buffer_put(pipe, desc->buffer,
661 desc->bytes_to_xfer);
662
663 desc->buffer += bytes_copied;
664 desc->bytes_to_xfer -= bytes_copied;
665 }
666
667 if (num_bytes_read == bytes_to_read) {
668 k_sched_unlock();
669
670 *bytes_read = num_bytes_read;
671
672 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, 0);
673
674 return 0;
675 }
676
677 if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)
678 && num_bytes_read >= min_xfer
679 && min_xfer > 0U) {
680 k_sched_unlock();
681
682 *bytes_read = num_bytes_read;
683
684 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, 0);
685
686 return 0;
687 }
688
689 /* Not all data was read */
690
691 struct k_pipe_desc pipe_desc;
692
693 pipe_desc.buffer = (uint8_t *)data + num_bytes_read;
694 pipe_desc.bytes_to_xfer = bytes_to_read - num_bytes_read;
695
696 if (!K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
697 _current->base.swap_data = &pipe_desc;
698 k_spinlock_key_t key2 = k_spin_lock(&pipe->lock);
699
700 z_sched_unlock_no_reschedule();
701 (void)z_pend_curr(&pipe->lock, key2,
702 &pipe->wait_q.readers, timeout);
703 } else {
704 k_sched_unlock();
705 }
706
707 *bytes_read = bytes_to_read - pipe_desc.bytes_to_xfer;
708
709 int ret = pipe_return_code(min_xfer, pipe_desc.bytes_to_xfer,
710 bytes_to_read);
711 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, get, pipe, timeout, ret);
712 return ret;
713 }
714
715 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_get(struct k_pipe * pipe,void * data,size_t bytes_to_read,size_t * bytes_read,size_t min_xfer,k_timeout_t timeout)716 int z_vrfy_k_pipe_get(struct k_pipe *pipe, void *data, size_t bytes_to_read,
717 size_t *bytes_read, size_t min_xfer, k_timeout_t timeout)
718 {
719 Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
720 Z_OOPS(Z_SYSCALL_MEMORY_WRITE(bytes_read, sizeof(*bytes_read)));
721 Z_OOPS(Z_SYSCALL_MEMORY_WRITE((void *)data, bytes_to_read));
722
723 return z_impl_k_pipe_get((struct k_pipe *)pipe, (void *)data,
724 bytes_to_read, bytes_read, min_xfer,
725 timeout);
726 }
727 #include <syscalls/k_pipe_get_mrsh.c>
728 #endif
729
z_impl_k_pipe_put(struct k_pipe * pipe,void * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)730 int z_impl_k_pipe_put(struct k_pipe *pipe, void *data, size_t bytes_to_write,
731 size_t *bytes_written, size_t min_xfer,
732 k_timeout_t timeout)
733 {
734 return z_pipe_put_internal(pipe, NULL, data,
735 bytes_to_write, bytes_written,
736 min_xfer, timeout);
737 }
738
739 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_put(struct k_pipe * pipe,void * data,size_t bytes_to_write,size_t * bytes_written,size_t min_xfer,k_timeout_t timeout)740 int z_vrfy_k_pipe_put(struct k_pipe *pipe, void *data, size_t bytes_to_write,
741 size_t *bytes_written, size_t min_xfer,
742 k_timeout_t timeout)
743 {
744 Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
745 Z_OOPS(Z_SYSCALL_MEMORY_WRITE(bytes_written, sizeof(*bytes_written)));
746 Z_OOPS(Z_SYSCALL_MEMORY_READ((void *)data, bytes_to_write));
747
748 return z_impl_k_pipe_put((struct k_pipe *)pipe, (void *)data,
749 bytes_to_write, bytes_written, min_xfer,
750 timeout);
751 }
752 #include <syscalls/k_pipe_put_mrsh.c>
753 #endif
754
z_impl_k_pipe_read_avail(struct k_pipe * pipe)755 size_t z_impl_k_pipe_read_avail(struct k_pipe *pipe)
756 {
757 size_t res;
758 k_spinlock_key_t key;
759
760 /* Buffer and size are fixed. No need to spin. */
761 if (pipe->buffer == NULL || pipe->size == 0U) {
762 res = 0;
763 goto out;
764 }
765
766 key = k_spin_lock(&pipe->lock);
767
768 if (pipe->read_index == pipe->write_index) {
769 res = pipe->bytes_used;
770 } else if (pipe->read_index < pipe->write_index) {
771 res = pipe->write_index - pipe->read_index;
772 } else {
773 res = pipe->size - (pipe->read_index - pipe->write_index);
774 }
775
776 k_spin_unlock(&pipe->lock, key);
777
778 out:
779 return res;
780 }
781
782 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_read_avail(struct k_pipe * pipe)783 size_t z_vrfy_k_pipe_read_avail(struct k_pipe *pipe)
784 {
785 Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
786
787 return z_impl_k_pipe_read_avail(pipe);
788 }
789 #include <syscalls/k_pipe_read_avail_mrsh.c>
790 #endif
791
z_impl_k_pipe_write_avail(struct k_pipe * pipe)792 size_t z_impl_k_pipe_write_avail(struct k_pipe *pipe)
793 {
794 size_t res;
795 k_spinlock_key_t key;
796
797 /* Buffer and size are fixed. No need to spin. */
798 if (pipe->buffer == NULL || pipe->size == 0U) {
799 res = 0;
800 goto out;
801 }
802
803 key = k_spin_lock(&pipe->lock);
804
805 if (pipe->write_index == pipe->read_index) {
806 res = pipe->size - pipe->bytes_used;
807 } else if (pipe->write_index < pipe->read_index) {
808 res = pipe->read_index - pipe->write_index;
809 } else {
810 res = pipe->size - (pipe->write_index - pipe->read_index);
811 }
812
813 k_spin_unlock(&pipe->lock, key);
814
815 out:
816 return res;
817 }
818
819 #ifdef CONFIG_USERSPACE
z_vrfy_k_pipe_write_avail(struct k_pipe * pipe)820 size_t z_vrfy_k_pipe_write_avail(struct k_pipe *pipe)
821 {
822 Z_OOPS(Z_SYSCALL_OBJ(pipe, K_OBJ_PIPE));
823
824 return z_impl_k_pipe_write_avail(pipe);
825 }
826 #include <syscalls/k_pipe_write_avail_mrsh.c>
827 #endif
828