/* * Copyright (c) 2024 MÃ¥ns Ansgariusson <mansgariusson@gmail.com> * * SPDX-License-Identifier: Apache-2.0 */ #include <zephyr/init.h> #include <zephyr/kernel.h> #include <zephyr/internal/syscall_handler.h> #include <ksched.h> #include <kthread.h> #include <wait_q.h> #ifdef CONFIG_OBJ_CORE_PIPE static struct k_obj_type obj_type_pipe; #endif /* CONFIG_OBJ_CORE_PIPE */ static inline bool pipe_closed(struct k_pipe *pipe) { return (pipe->flags & PIPE_FLAG_OPEN) == 0; } static inline bool pipe_resetting(struct k_pipe *pipe) { return (pipe->flags & PIPE_FLAG_RESET) != 0; } static inline bool pipe_full(struct k_pipe *pipe) { return ring_buf_space_get(&pipe->buf) == 0; } static inline bool pipe_empty(struct k_pipe *pipe) { return ring_buf_is_empty(&pipe->buf); } static int wait_for(_wait_q_t *waitq, struct k_pipe *pipe, k_spinlock_key_t *key, k_timepoint_t time_limit, bool *need_resched) { k_timeout_t timeout = sys_timepoint_timeout(time_limit); int rc; if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) { return -EAGAIN; } pipe->waiting++; *need_resched = false; if (waitq == &pipe->space) { SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, write, pipe, timeout); } else { SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_pipe, read, pipe, timeout); } rc = z_pend_curr(&pipe->lock, *key, waitq, timeout); *key = k_spin_lock(&pipe->lock); pipe->waiting--; if (unlikely(pipe_resetting(pipe))) { if (pipe->waiting == 0) { pipe->flags &= ~PIPE_FLAG_RESET; } rc = -ECANCELED; } return rc; } void z_impl_k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size) { ring_buf_init(&pipe->buf, buffer_size, buffer); pipe->flags = PIPE_FLAG_OPEN; pipe->waiting = 0; pipe->lock = (struct k_spinlock){}; z_waitq_init(&pipe->data); z_waitq_init(&pipe->space); k_object_init(pipe); #ifdef CONFIG_POLL sys_dlist_init(&pipe->poll_events); #endif /* CONFIG_POLL */ #ifdef CONFIG_OBJ_CORE_PIPE k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe); #endif /* CONFIG_OBJ_CORE_PIPE */ SYS_PORT_TRACING_OBJ_INIT(k_pipe, pipe, buffer, buffer_size); } struct pipe_buf_spec { uint8_t * const data; const size_t len; size_t used; }; static size_t copy_to_pending_readers(struct k_pipe *pipe, bool *need_resched, const uint8_t *data, size_t len) { struct k_thread *reader = NULL; struct pipe_buf_spec *reader_buf; size_t copy_size, written = 0; /* * Attempt a direct data copy to waiting readers if any. * The copy has to be done under the scheduler lock to ensure all the * needed data is copied to the target thread whose buffer spec lives * on that thread's stack, and then the thread unpended only if it * received all the data it wanted, without racing with a potential * thread timeout/cancellation event. */ do { LOCK_SCHED_SPINLOCK { reader = _priq_wait_best(&pipe->data.waitq); if (reader == NULL) { K_SPINLOCK_BREAK; } reader_buf = reader->base.swap_data; copy_size = MIN(len - written, reader_buf->len - reader_buf->used); memcpy(&reader_buf->data[reader_buf->used], &data[written], copy_size); written += copy_size; reader_buf->used += copy_size; if (reader_buf->used < reader_buf->len) { /* This reader wants more: don't unpend. */ reader = NULL; } else { /* * This reader has received all the data * it was waiting for: wake it up with * the scheduler lock still held. */ unpend_thread_no_timeout(reader); z_abort_thread_timeout(reader); } } if (reader != NULL) { /* rest of thread wake-up outside the scheduler lock */ z_thread_return_value_set_with_data(reader, 0, NULL); z_ready_thread(reader); *need_resched = true; } } while (reader != NULL && written < len); return written; } int z_impl_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_timeout_t timeout) { int rc; size_t written = 0; k_timepoint_t end = sys_timepoint_calc(timeout); k_spinlock_key_t key = k_spin_lock(&pipe->lock); bool need_resched = false; SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, write, pipe, data, len, timeout); if (unlikely(pipe_resetting(pipe))) { rc = -ECANCELED; goto exit; } for (;;) { if (unlikely(pipe_closed(pipe))) { rc = -EPIPE; break; } if (pipe_empty(pipe)) { if (IS_ENABLED(CONFIG_KERNEL_COHERENCE)) { /* * Systems that enabled this option don't have * their stacks in coherent memory. Given our * pipe_buf_spec is stored on the stack, and * readers may also have their destination * buffer on their stack too, it is not worth * supporting direct-to-readers copy with them. * Simply wake up all pending readers instead. */ need_resched = z_sched_wake_all(&pipe->data, 0, NULL); } else if (pipe->waiting != 0) { written += copy_to_pending_readers(pipe, &need_resched, &data[written], len - written); if (written >= len) { rc = written; break; } } #ifdef CONFIG_POLL z_handle_obj_poll_events(&pipe->poll_events, K_POLL_STATE_PIPE_DATA_AVAILABLE); #endif /* CONFIG_POLL */ } written += ring_buf_put(&pipe->buf, &data[written], len - written); if (likely(written == len)) { rc = written; break; } rc = wait_for(&pipe->space, pipe, &key, end, &need_resched); if (rc != 0) { if (rc == -EAGAIN) { rc = written ? written : -EAGAIN; } break; } } exit: SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, write, pipe, rc); if (need_resched) { z_reschedule(&pipe->lock, key); } else { k_spin_unlock(&pipe->lock, key); } return rc; } int z_impl_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout_t timeout) { struct pipe_buf_spec buf = { data, len, 0 }; int rc; k_timepoint_t end = sys_timepoint_calc(timeout); k_spinlock_key_t key = k_spin_lock(&pipe->lock); bool need_resched = false; SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, read, pipe, data, len, timeout); if (unlikely(pipe_resetting(pipe))) { rc = -ECANCELED; goto exit; } for (;;) { if (pipe_full(pipe)) { /* One or more pending writers may exist. */ need_resched = z_sched_wake_all(&pipe->space, 0, NULL); } buf.used += ring_buf_get(&pipe->buf, &data[buf.used], len - buf.used); if (likely(buf.used == len)) { rc = buf.used; break; } if (unlikely(pipe_closed(pipe))) { rc = buf.used ? buf.used : -EPIPE; break; } /* provide our "direct copy" info to potential writers */ _current->base.swap_data = &buf; rc = wait_for(&pipe->data, pipe, &key, end, &need_resched); if (rc != 0) { if (rc == -EAGAIN) { rc = buf.used ? buf.used : -EAGAIN; } break; } } exit: SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, read, pipe, rc); if (need_resched) { z_reschedule(&pipe->lock, key); } else { k_spin_unlock(&pipe->lock, key); } return rc; } void z_impl_k_pipe_reset(struct k_pipe *pipe) { SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, reset, pipe); K_SPINLOCK(&pipe->lock) { ring_buf_reset(&pipe->buf); if (likely(pipe->waiting != 0)) { pipe->flags |= PIPE_FLAG_RESET; z_sched_wake_all(&pipe->data, 0, NULL); z_sched_wake_all(&pipe->space, 0, NULL); } } SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, reset, pipe); } void z_impl_k_pipe_close(struct k_pipe *pipe) { SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_pipe, close, pipe); K_SPINLOCK(&pipe->lock) { pipe->flags = 0; z_sched_wake_all(&pipe->data, 0, NULL); z_sched_wake_all(&pipe->space, 0, NULL); } SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_pipe, close, pipe); } #ifdef CONFIG_USERSPACE void z_vrfy_k_pipe_init(struct k_pipe *pipe, uint8_t *buffer, size_t buffer_size) { K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE)); K_OOPS(K_SYSCALL_MEMORY_WRITE(buffer, buffer_size)); z_impl_k_pipe_init(pipe, buffer, buffer_size); } #include <zephyr/syscalls/k_pipe_init_mrsh.c> int z_vrfy_k_pipe_read(struct k_pipe *pipe, uint8_t *data, size_t len, k_timeout_t timeout) { K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE)); K_OOPS(K_SYSCALL_MEMORY_WRITE(data, len)); return z_impl_k_pipe_read(pipe, data, len, timeout); } #include <zephyr/syscalls/k_pipe_read_mrsh.c> int z_vrfy_k_pipe_write(struct k_pipe *pipe, const uint8_t *data, size_t len, k_timeout_t timeout) { K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE)); K_OOPS(K_SYSCALL_MEMORY_READ(data, len)); return z_impl_k_pipe_write(pipe, data, len, timeout); } #include <zephyr/syscalls/k_pipe_write_mrsh.c> void z_vrfy_k_pipe_reset(struct k_pipe *pipe) { K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE)); z_impl_k_pipe_reset(pipe); } #include <zephyr/syscalls/k_pipe_reset_mrsh.c> void z_vrfy_k_pipe_close(struct k_pipe *pipe) { K_OOPS(K_SYSCALL_OBJ(pipe, K_OBJ_PIPE)); z_impl_k_pipe_close(pipe); } #include <zephyr/syscalls/k_pipe_close_mrsh.c> #endif /* CONFIG_USERSPACE */ #ifdef CONFIG_OBJ_CORE_PIPE static int init_pipe_obj_core_list(void) { /* Initialize pipe object type */ z_obj_type_init(&obj_type_pipe, K_OBJ_TYPE_PIPE_ID, offsetof(struct k_pipe, obj_core)); /* Initialize and link statically defined pipes */ STRUCT_SECTION_FOREACH(k_pipe, pipe) { k_obj_core_init_and_link(K_OBJ_CORE(pipe), &obj_type_pipe); } return 0; } SYS_INIT(init_pipe_obj_core_list, PRE_KERNEL_1, CONFIG_KERNEL_INIT_PRIORITY_OBJECTS); #endif /* CONFIG_OBJ_CORE_PIPE */