1 /*
2 * Copyright (c) 2016 Wind River Systems, Inc.
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /**
8 * @file
9 * @brief Message queues.
10 */
11
12
13 #include <kernel.h>
14 #include <kernel_structs.h>
15
16 #include <toolchain.h>
17 #include <linker/sections.h>
18 #include <string.h>
19 #include <ksched.h>
20 #include <wait_q.h>
21 #include <sys/dlist.h>
22 #include <sys/math_extras.h>
23 #include <init.h>
24 #include <syscall_handler.h>
25 #include <kernel_internal.h>
26 #include <sys/check.h>
27
28 #ifdef CONFIG_POLL
handle_poll_events(struct k_msgq * msgq,uint32_t state)29 static inline void handle_poll_events(struct k_msgq *msgq, uint32_t state)
30 {
31 z_handle_obj_poll_events(&msgq->poll_events, state);
32 }
33 #endif /* CONFIG_POLL */
34
k_msgq_init(struct k_msgq * msgq,char * buffer,size_t msg_size,uint32_t max_msgs)35 void k_msgq_init(struct k_msgq *msgq, char *buffer, size_t msg_size,
36 uint32_t max_msgs)
37 {
38 msgq->msg_size = msg_size;
39 msgq->max_msgs = max_msgs;
40 msgq->buffer_start = buffer;
41 msgq->buffer_end = buffer + (max_msgs * msg_size);
42 msgq->read_ptr = buffer;
43 msgq->write_ptr = buffer;
44 msgq->used_msgs = 0;
45 msgq->flags = 0;
46 z_waitq_init(&msgq->wait_q);
47 msgq->lock = (struct k_spinlock) {};
48 #ifdef CONFIG_POLL
49 sys_dlist_init(&msgq->poll_events);
50 #endif /* CONFIG_POLL */
51
52 SYS_PORT_TRACING_OBJ_INIT(k_msgq, msgq);
53
54 z_object_init(msgq);
55 }
56
z_impl_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)57 int z_impl_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
58 uint32_t max_msgs)
59 {
60 void *buffer;
61 int ret;
62 size_t total_size;
63
64 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, alloc_init, msgq);
65
66 if (size_mul_overflow(msg_size, max_msgs, &total_size)) {
67 ret = -EINVAL;
68 } else {
69 buffer = z_thread_malloc(total_size);
70 if (buffer != NULL) {
71 k_msgq_init(msgq, buffer, msg_size, max_msgs);
72 msgq->flags = K_MSGQ_FLAG_ALLOC;
73 ret = 0;
74 } else {
75 ret = -ENOMEM;
76 }
77 }
78
79 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, alloc_init, msgq, ret);
80
81 return ret;
82 }
83
84 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)85 int z_vrfy_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
86 uint32_t max_msgs)
87 {
88 Z_OOPS(Z_SYSCALL_OBJ_NEVER_INIT(msgq, K_OBJ_MSGQ));
89
90 return z_impl_k_msgq_alloc_init(msgq, msg_size, max_msgs);
91 }
92 #include <syscalls/k_msgq_alloc_init_mrsh.c>
93 #endif
94
k_msgq_cleanup(struct k_msgq * msgq)95 int k_msgq_cleanup(struct k_msgq *msgq)
96 {
97 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, cleanup, msgq);
98
99 CHECKIF(z_waitq_head(&msgq->wait_q) != NULL) {
100 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, -EBUSY);
101
102 return -EBUSY;
103 }
104
105 if ((msgq->flags & K_MSGQ_FLAG_ALLOC) != 0U) {
106 k_free(msgq->buffer_start);
107 msgq->flags &= ~K_MSGQ_FLAG_ALLOC;
108 }
109
110 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, 0);
111
112 return 0;
113 }
114
115
z_impl_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)116 int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout)
117 {
118 __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
119
120 struct k_thread *pending_thread;
121 k_spinlock_key_t key;
122 int result;
123
124 key = k_spin_lock(&msgq->lock);
125
126 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout);
127
128 if (msgq->used_msgs < msgq->max_msgs) {
129 /* message queue isn't full */
130 pending_thread = z_unpend_first_thread(&msgq->wait_q);
131 if (pending_thread != NULL) {
132 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, 0);
133
134 /* give message to waiting thread */
135 (void)memcpy(pending_thread->base.swap_data, data,
136 msgq->msg_size);
137 /* wake up waiting thread */
138 arch_thread_return_value_set(pending_thread, 0);
139 z_ready_thread(pending_thread);
140 z_reschedule(&msgq->lock, key);
141 return 0;
142 } else {
143 /* put message in queue */
144 (void)memcpy(msgq->write_ptr, data, msgq->msg_size);
145 msgq->write_ptr += msgq->msg_size;
146 if (msgq->write_ptr == msgq->buffer_end) {
147 msgq->write_ptr = msgq->buffer_start;
148 }
149 msgq->used_msgs++;
150 #ifdef CONFIG_POLL
151 handle_poll_events(msgq, K_POLL_STATE_MSGQ_DATA_AVAILABLE);
152 #endif /* CONFIG_POLL */
153 }
154 result = 0;
155 } else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
156 /* don't wait for message space to become available */
157 result = -ENOMSG;
158 } else {
159 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout);
160
161 /* wait for put message success, failure, or timeout */
162 _current->base.swap_data = (void *) data;
163
164 result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
165 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
166 return result;
167 }
168
169 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
170
171 k_spin_unlock(&msgq->lock, key);
172
173 return result;
174 }
175
176 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)177 static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
178 k_timeout_t timeout)
179 {
180 Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
181 Z_OOPS(Z_SYSCALL_MEMORY_READ(data, msgq->msg_size));
182
183 return z_impl_k_msgq_put(msgq, data, timeout);
184 }
185 #include <syscalls/k_msgq_put_mrsh.c>
186 #endif
187
z_impl_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)188 void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs)
189 {
190 attrs->msg_size = msgq->msg_size;
191 attrs->max_msgs = msgq->max_msgs;
192 attrs->used_msgs = msgq->used_msgs;
193 }
194
195 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)196 static inline void z_vrfy_k_msgq_get_attrs(struct k_msgq *msgq,
197 struct k_msgq_attrs *attrs)
198 {
199 Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
200 Z_OOPS(Z_SYSCALL_MEMORY_WRITE(attrs, sizeof(struct k_msgq_attrs)));
201 z_impl_k_msgq_get_attrs(msgq, attrs);
202 }
203 #include <syscalls/k_msgq_get_attrs_mrsh.c>
204 #endif
205
z_impl_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)206 int z_impl_k_msgq_get(struct k_msgq *msgq, void *data, k_timeout_t timeout)
207 {
208 __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
209
210 k_spinlock_key_t key;
211 struct k_thread *pending_thread;
212 int result;
213
214 key = k_spin_lock(&msgq->lock);
215
216 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, get, msgq, timeout);
217
218 if (msgq->used_msgs > 0U) {
219 /* take first available message from queue */
220 (void)memcpy(data, msgq->read_ptr, msgq->msg_size);
221 msgq->read_ptr += msgq->msg_size;
222 if (msgq->read_ptr == msgq->buffer_end) {
223 msgq->read_ptr = msgq->buffer_start;
224 }
225 msgq->used_msgs--;
226
227 /* handle first thread waiting to write (if any) */
228 pending_thread = z_unpend_first_thread(&msgq->wait_q);
229 if (pending_thread != NULL) {
230 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
231
232 /* add thread's message to queue */
233 (void)memcpy(msgq->write_ptr, pending_thread->base.swap_data,
234 msgq->msg_size);
235 msgq->write_ptr += msgq->msg_size;
236 if (msgq->write_ptr == msgq->buffer_end) {
237 msgq->write_ptr = msgq->buffer_start;
238 }
239 msgq->used_msgs++;
240
241 /* wake up waiting thread */
242 arch_thread_return_value_set(pending_thread, 0);
243 z_ready_thread(pending_thread);
244 z_reschedule(&msgq->lock, key);
245
246 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, 0);
247
248 return 0;
249 }
250 result = 0;
251 } else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
252 /* don't wait for a message to become available */
253 result = -ENOMSG;
254 } else {
255 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
256
257 /* wait for get message success or timeout */
258 _current->base.swap_data = data;
259
260 result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
261 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
262 return result;
263 }
264
265 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
266
267 k_spin_unlock(&msgq->lock, key);
268
269 return result;
270 }
271
272 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)273 static inline int z_vrfy_k_msgq_get(struct k_msgq *msgq, void *data,
274 k_timeout_t timeout)
275 {
276 Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
277 Z_OOPS(Z_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
278
279 return z_impl_k_msgq_get(msgq, data, timeout);
280 }
281 #include <syscalls/k_msgq_get_mrsh.c>
282 #endif
283
z_impl_k_msgq_peek(struct k_msgq * msgq,void * data)284 int z_impl_k_msgq_peek(struct k_msgq *msgq, void *data)
285 {
286 k_spinlock_key_t key;
287 int result;
288
289 key = k_spin_lock(&msgq->lock);
290
291 if (msgq->used_msgs > 0U) {
292 /* take first available message from queue */
293 (void)memcpy(data, msgq->read_ptr, msgq->msg_size);
294 result = 0;
295 } else {
296 /* don't wait for a message to become available */
297 result = -ENOMSG;
298 }
299
300 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
301
302 k_spin_unlock(&msgq->lock, key);
303
304 return result;
305 }
306
307 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek(struct k_msgq * msgq,void * data)308 static inline int z_vrfy_k_msgq_peek(struct k_msgq *msgq, void *data)
309 {
310 Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
311 Z_OOPS(Z_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
312
313 return z_impl_k_msgq_peek(msgq, data);
314 }
315 #include <syscalls/k_msgq_peek_mrsh.c>
316 #endif
317
z_impl_k_msgq_purge(struct k_msgq * msgq)318 void z_impl_k_msgq_purge(struct k_msgq *msgq)
319 {
320 k_spinlock_key_t key;
321 struct k_thread *pending_thread;
322
323 key = k_spin_lock(&msgq->lock);
324
325 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, purge, msgq);
326
327 /* wake up any threads that are waiting to write */
328 while ((pending_thread = z_unpend_first_thread(&msgq->wait_q)) != NULL) {
329 arch_thread_return_value_set(pending_thread, -ENOMSG);
330 z_ready_thread(pending_thread);
331 }
332
333 msgq->used_msgs = 0;
334 msgq->read_ptr = msgq->write_ptr;
335
336 z_reschedule(&msgq->lock, key);
337 }
338
339 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_purge(struct k_msgq * msgq)340 static inline void z_vrfy_k_msgq_purge(struct k_msgq *msgq)
341 {
342 Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
343 z_impl_k_msgq_purge(msgq);
344 }
345 #include <syscalls/k_msgq_purge_mrsh.c>
346
z_vrfy_k_msgq_num_free_get(struct k_msgq * msgq)347 static inline uint32_t z_vrfy_k_msgq_num_free_get(struct k_msgq *msgq)
348 {
349 Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
350 return z_impl_k_msgq_num_free_get(msgq);
351 }
352 #include <syscalls/k_msgq_num_free_get_mrsh.c>
353
z_vrfy_k_msgq_num_used_get(struct k_msgq * msgq)354 static inline uint32_t z_vrfy_k_msgq_num_used_get(struct k_msgq *msgq)
355 {
356 Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
357 return z_impl_k_msgq_num_used_get(msgq);
358 }
359 #include <syscalls/k_msgq_num_used_get_mrsh.c>
360
361 #endif
362