1 /*
2 * Copyright (c) 2016 Wind River Systems, Inc.
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /**
8 * @file
9 * @brief Message queues.
10 */
11
12
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15
16 #include <zephyr/toolchain.h>
17 #include <zephyr/linker/sections.h>
18 #include <string.h>
19 #include <ksched.h>
20 #include <wait_q.h>
21 #include <zephyr/sys/dlist.h>
22 #include <zephyr/sys/math_extras.h>
23 #include <zephyr/init.h>
24 #include <zephyr/internal/syscall_handler.h>
25 #include <kernel_internal.h>
26 #include <zephyr/sys/check.h>
27
28 #ifdef CONFIG_OBJ_CORE_MSGQ
29 static struct k_obj_type obj_type_msgq;
30 #endif /* CONFIG_OBJ_CORE_MSGQ */
31
32 #ifdef CONFIG_POLL
handle_poll_events(struct k_msgq * msgq,uint32_t state)33 static inline void handle_poll_events(struct k_msgq *msgq, uint32_t state)
34 {
35 z_handle_obj_poll_events(&msgq->poll_events, state);
36 }
37 #endif /* CONFIG_POLL */
38
k_msgq_init(struct k_msgq * msgq,char * buffer,size_t msg_size,uint32_t max_msgs)39 void k_msgq_init(struct k_msgq *msgq, char *buffer, size_t msg_size,
40 uint32_t max_msgs)
41 {
42 msgq->msg_size = msg_size;
43 msgq->max_msgs = max_msgs;
44 msgq->buffer_start = buffer;
45 msgq->buffer_end = buffer + (max_msgs * msg_size);
46 msgq->read_ptr = buffer;
47 msgq->write_ptr = buffer;
48 msgq->used_msgs = 0;
49 msgq->flags = 0;
50 z_waitq_init(&msgq->wait_q);
51 msgq->lock = (struct k_spinlock) {};
52 #ifdef CONFIG_POLL
53 sys_dlist_init(&msgq->poll_events);
54 #endif /* CONFIG_POLL */
55
56 #ifdef CONFIG_OBJ_CORE_MSGQ
57 k_obj_core_init_and_link(K_OBJ_CORE(msgq), &obj_type_msgq);
58 #endif /* CONFIG_OBJ_CORE_MSGQ */
59
60 SYS_PORT_TRACING_OBJ_INIT(k_msgq, msgq);
61
62 k_object_init(msgq);
63 }
64
z_impl_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)65 int z_impl_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
66 uint32_t max_msgs)
67 {
68 void *buffer;
69 int ret;
70 size_t total_size;
71
72 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, alloc_init, msgq);
73
74 if (size_mul_overflow(msg_size, max_msgs, &total_size)) {
75 ret = -EINVAL;
76 } else {
77 buffer = z_thread_malloc(total_size);
78 if (buffer != NULL) {
79 k_msgq_init(msgq, buffer, msg_size, max_msgs);
80 msgq->flags = K_MSGQ_FLAG_ALLOC;
81 ret = 0;
82 } else {
83 ret = -ENOMEM;
84 }
85 }
86
87 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, alloc_init, msgq, ret);
88
89 return ret;
90 }
91
92 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)93 int z_vrfy_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
94 uint32_t max_msgs)
95 {
96 K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(msgq, K_OBJ_MSGQ));
97
98 return z_impl_k_msgq_alloc_init(msgq, msg_size, max_msgs);
99 }
100 #include <zephyr/syscalls/k_msgq_alloc_init_mrsh.c>
101 #endif /* CONFIG_USERSPACE */
102
k_msgq_cleanup(struct k_msgq * msgq)103 int k_msgq_cleanup(struct k_msgq *msgq)
104 {
105 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, cleanup, msgq);
106
107 CHECKIF(z_waitq_head(&msgq->wait_q) != NULL) {
108 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, -EBUSY);
109
110 return -EBUSY;
111 }
112
113 if ((msgq->flags & K_MSGQ_FLAG_ALLOC) != 0U) {
114 k_free(msgq->buffer_start);
115 msgq->flags &= ~K_MSGQ_FLAG_ALLOC;
116 }
117
118 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, 0);
119
120 return 0;
121 }
122
123
z_impl_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)124 int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout)
125 {
126 __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
127
128 struct k_thread *pending_thread;
129 k_spinlock_key_t key;
130 int result;
131
132 key = k_spin_lock(&msgq->lock);
133
134 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout);
135
136 if (msgq->used_msgs < msgq->max_msgs) {
137 /* message queue isn't full */
138 pending_thread = z_unpend_first_thread(&msgq->wait_q);
139 if (unlikely(pending_thread != NULL)) {
140 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, 0);
141
142 /* give message to waiting thread */
143 (void)memcpy(pending_thread->base.swap_data, data,
144 msgq->msg_size);
145 /* wake up waiting thread */
146 arch_thread_return_value_set(pending_thread, 0);
147 z_ready_thread(pending_thread);
148 z_reschedule(&msgq->lock, key);
149 return 0;
150 } else {
151 /* put message in queue */
152 __ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
153 msgq->write_ptr < msgq->buffer_end);
154 (void)memcpy(msgq->write_ptr, (char *)data, msgq->msg_size);
155 msgq->write_ptr += msgq->msg_size;
156 if (msgq->write_ptr == msgq->buffer_end) {
157 msgq->write_ptr = msgq->buffer_start;
158 }
159 msgq->used_msgs++;
160 #ifdef CONFIG_POLL
161 handle_poll_events(msgq, K_POLL_STATE_MSGQ_DATA_AVAILABLE);
162 #endif /* CONFIG_POLL */
163 }
164 result = 0;
165 } else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
166 /* don't wait for message space to become available */
167 result = -ENOMSG;
168 } else {
169 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout);
170
171 /* wait for put message success, failure, or timeout */
172 arch_current_thread()->base.swap_data = (void *) data;
173
174 result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
175 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
176 return result;
177 }
178
179 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
180
181 z_reschedule(&msgq->lock, key);
182
183 return result;
184 }
185
186 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)187 static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
188 k_timeout_t timeout)
189 {
190 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
191 K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size));
192
193 return z_impl_k_msgq_put(msgq, data, timeout);
194 }
195 #include <zephyr/syscalls/k_msgq_put_mrsh.c>
196 #endif /* CONFIG_USERSPACE */
197
z_impl_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)198 void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs)
199 {
200 attrs->msg_size = msgq->msg_size;
201 attrs->max_msgs = msgq->max_msgs;
202 attrs->used_msgs = msgq->used_msgs;
203 }
204
205 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)206 static inline void z_vrfy_k_msgq_get_attrs(struct k_msgq *msgq,
207 struct k_msgq_attrs *attrs)
208 {
209 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
210 K_OOPS(K_SYSCALL_MEMORY_WRITE(attrs, sizeof(struct k_msgq_attrs)));
211 z_impl_k_msgq_get_attrs(msgq, attrs);
212 }
213 #include <zephyr/syscalls/k_msgq_get_attrs_mrsh.c>
214 #endif /* CONFIG_USERSPACE */
215
z_impl_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)216 int z_impl_k_msgq_get(struct k_msgq *msgq, void *data, k_timeout_t timeout)
217 {
218 __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
219
220 k_spinlock_key_t key;
221 struct k_thread *pending_thread;
222 int result;
223
224 key = k_spin_lock(&msgq->lock);
225
226 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, get, msgq, timeout);
227
228 if (msgq->used_msgs > 0U) {
229 /* take first available message from queue */
230 (void)memcpy((char *)data, msgq->read_ptr, msgq->msg_size);
231 msgq->read_ptr += msgq->msg_size;
232 if (msgq->read_ptr == msgq->buffer_end) {
233 msgq->read_ptr = msgq->buffer_start;
234 }
235 msgq->used_msgs--;
236
237 /* handle first thread waiting to write (if any) */
238 pending_thread = z_unpend_first_thread(&msgq->wait_q);
239 if (unlikely(pending_thread != NULL)) {
240 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
241
242 /* add thread's message to queue */
243 __ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
244 msgq->write_ptr < msgq->buffer_end);
245 (void)memcpy(msgq->write_ptr, (char *)pending_thread->base.swap_data,
246 msgq->msg_size);
247 msgq->write_ptr += msgq->msg_size;
248 if (msgq->write_ptr == msgq->buffer_end) {
249 msgq->write_ptr = msgq->buffer_start;
250 }
251 msgq->used_msgs++;
252
253 /* wake up waiting thread */
254 arch_thread_return_value_set(pending_thread, 0);
255 z_ready_thread(pending_thread);
256 z_reschedule(&msgq->lock, key);
257
258 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, 0);
259
260 return 0;
261 }
262 result = 0;
263 } else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
264 /* don't wait for a message to become available */
265 result = -ENOMSG;
266 } else {
267 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
268
269 /* wait for get message success or timeout */
270 arch_current_thread()->base.swap_data = data;
271
272 result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
273 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
274 return result;
275 }
276
277 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
278
279 k_spin_unlock(&msgq->lock, key);
280
281 return result;
282 }
283
284 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)285 static inline int z_vrfy_k_msgq_get(struct k_msgq *msgq, void *data,
286 k_timeout_t timeout)
287 {
288 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
289 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
290
291 return z_impl_k_msgq_get(msgq, data, timeout);
292 }
293 #include <zephyr/syscalls/k_msgq_get_mrsh.c>
294 #endif /* CONFIG_USERSPACE */
295
z_impl_k_msgq_peek(struct k_msgq * msgq,void * data)296 int z_impl_k_msgq_peek(struct k_msgq *msgq, void *data)
297 {
298 k_spinlock_key_t key;
299 int result;
300
301 key = k_spin_lock(&msgq->lock);
302
303 if (msgq->used_msgs > 0U) {
304 /* take first available message from queue */
305 (void)memcpy((char *)data, msgq->read_ptr, msgq->msg_size);
306 result = 0;
307 } else {
308 /* don't wait for a message to become available */
309 result = -ENOMSG;
310 }
311
312 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
313
314 k_spin_unlock(&msgq->lock, key);
315
316 return result;
317 }
318
319 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek(struct k_msgq * msgq,void * data)320 static inline int z_vrfy_k_msgq_peek(struct k_msgq *msgq, void *data)
321 {
322 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
323 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
324
325 return z_impl_k_msgq_peek(msgq, data);
326 }
327 #include <zephyr/syscalls/k_msgq_peek_mrsh.c>
328 #endif /* CONFIG_USERSPACE */
329
z_impl_k_msgq_peek_at(struct k_msgq * msgq,void * data,uint32_t idx)330 int z_impl_k_msgq_peek_at(struct k_msgq *msgq, void *data, uint32_t idx)
331 {
332 k_spinlock_key_t key;
333 int result;
334 uint32_t bytes_to_end;
335 uint32_t byte_offset;
336 char *start_addr;
337
338 key = k_spin_lock(&msgq->lock);
339
340 if (msgq->used_msgs > idx) {
341 bytes_to_end = (msgq->buffer_end - msgq->read_ptr);
342 byte_offset = idx * msgq->msg_size;
343 start_addr = msgq->read_ptr;
344 /* check item available in start/end of ring buffer */
345 if (bytes_to_end <= byte_offset) {
346 /* Tweak the values in case */
347 byte_offset -= bytes_to_end;
348 /* wrap-around is required */
349 start_addr = msgq->buffer_start;
350 }
351 (void)memcpy(data, start_addr + byte_offset, msgq->msg_size);
352 result = 0;
353 } else {
354 /* don't wait for a message to become available */
355 result = -ENOMSG;
356 }
357
358 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
359
360 k_spin_unlock(&msgq->lock, key);
361
362 return result;
363 }
364
365 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek_at(struct k_msgq * msgq,void * data,uint32_t idx)366 static inline int z_vrfy_k_msgq_peek_at(struct k_msgq *msgq, void *data, uint32_t idx)
367 {
368 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
369 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
370
371 return z_impl_k_msgq_peek_at(msgq, data, idx);
372 }
373 #include <zephyr/syscalls/k_msgq_peek_at_mrsh.c>
374 #endif /* CONFIG_USERSPACE */
375
z_impl_k_msgq_purge(struct k_msgq * msgq)376 void z_impl_k_msgq_purge(struct k_msgq *msgq)
377 {
378 k_spinlock_key_t key;
379 struct k_thread *pending_thread;
380
381 key = k_spin_lock(&msgq->lock);
382
383 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, purge, msgq);
384
385 /* wake up any threads that are waiting to write */
386 for (pending_thread = z_unpend_first_thread(&msgq->wait_q); pending_thread != NULL;
387 pending_thread = z_unpend_first_thread(&msgq->wait_q)) {
388 arch_thread_return_value_set(pending_thread, -ENOMSG);
389 z_ready_thread(pending_thread);
390 }
391
392 msgq->used_msgs = 0;
393 msgq->read_ptr = msgq->write_ptr;
394
395 z_reschedule(&msgq->lock, key);
396 }
397
398 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_purge(struct k_msgq * msgq)399 static inline void z_vrfy_k_msgq_purge(struct k_msgq *msgq)
400 {
401 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
402 z_impl_k_msgq_purge(msgq);
403 }
404 #include <zephyr/syscalls/k_msgq_purge_mrsh.c>
405
z_vrfy_k_msgq_num_free_get(struct k_msgq * msgq)406 static inline uint32_t z_vrfy_k_msgq_num_free_get(struct k_msgq *msgq)
407 {
408 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
409 return z_impl_k_msgq_num_free_get(msgq);
410 }
411 #include <zephyr/syscalls/k_msgq_num_free_get_mrsh.c>
412
z_vrfy_k_msgq_num_used_get(struct k_msgq * msgq)413 static inline uint32_t z_vrfy_k_msgq_num_used_get(struct k_msgq *msgq)
414 {
415 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
416 return z_impl_k_msgq_num_used_get(msgq);
417 }
418 #include <zephyr/syscalls/k_msgq_num_used_get_mrsh.c>
419
420 #endif /* CONFIG_USERSPACE */
421
422 #ifdef CONFIG_OBJ_CORE_MSGQ
init_msgq_obj_core_list(void)423 static int init_msgq_obj_core_list(void)
424 {
425 /* Initialize msgq object type */
426
427 z_obj_type_init(&obj_type_msgq, K_OBJ_TYPE_MSGQ_ID,
428 offsetof(struct k_msgq, obj_core));
429
430 /* Initialize and link statically defined message queues */
431
432 STRUCT_SECTION_FOREACH(k_msgq, msgq) {
433 k_obj_core_init_and_link(K_OBJ_CORE(msgq), &obj_type_msgq);
434 }
435
436 return 0;
437 };
438
439 SYS_INIT(init_msgq_obj_core_list, PRE_KERNEL_1,
440 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
441
442 #endif /* CONFIG_OBJ_CORE_MSGQ */
443