1 /*
2 * Copyright (c) 2016 Wind River Systems, Inc.
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /**
8 * @file
9 * @brief Message queues.
10 */
11
12
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15
16 #include <zephyr/toolchain.h>
17 #include <zephyr/linker/sections.h>
18 #include <string.h>
19 #include <ksched.h>
20 #include <wait_q.h>
21 #include <zephyr/sys/dlist.h>
22 #include <zephyr/sys/math_extras.h>
23 #include <zephyr/init.h>
24 #include <zephyr/internal/syscall_handler.h>
25 #include <kernel_internal.h>
26 #include <zephyr/sys/check.h>
27
28 #ifdef CONFIG_OBJ_CORE_MSGQ
29 static struct k_obj_type obj_type_msgq;
30 #endif /* CONFIG_OBJ_CORE_MSGQ */
31
handle_poll_events(struct k_msgq * msgq)32 static inline bool handle_poll_events(struct k_msgq *msgq)
33 {
34 #ifdef CONFIG_POLL
35 return z_handle_obj_poll_events(&msgq->poll_events,
36 K_POLL_STATE_MSGQ_DATA_AVAILABLE);
37 #else
38 ARG_UNUSED(msgq);
39 return false;
40 #endif /* CONFIG_POLL */
41 }
42
k_msgq_init(struct k_msgq * msgq,char * buffer,size_t msg_size,uint32_t max_msgs)43 void k_msgq_init(struct k_msgq *msgq, char *buffer, size_t msg_size,
44 uint32_t max_msgs)
45 {
46 msgq->msg_size = msg_size;
47 msgq->max_msgs = max_msgs;
48 msgq->buffer_start = buffer;
49 msgq->buffer_end = buffer + (max_msgs * msg_size);
50 msgq->read_ptr = buffer;
51 msgq->write_ptr = buffer;
52 msgq->used_msgs = 0;
53 msgq->flags = 0;
54 z_waitq_init(&msgq->wait_q);
55 msgq->lock = (struct k_spinlock) {};
56 #ifdef CONFIG_POLL
57 sys_dlist_init(&msgq->poll_events);
58 #endif /* CONFIG_POLL */
59
60 #ifdef CONFIG_OBJ_CORE_MSGQ
61 k_obj_core_init_and_link(K_OBJ_CORE(msgq), &obj_type_msgq);
62 #endif /* CONFIG_OBJ_CORE_MSGQ */
63
64 SYS_PORT_TRACING_OBJ_INIT(k_msgq, msgq);
65
66 k_object_init(msgq);
67 }
68
z_impl_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)69 int z_impl_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
70 uint32_t max_msgs)
71 {
72 void *buffer;
73 int ret;
74 size_t total_size;
75
76 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, alloc_init, msgq);
77
78 if (size_mul_overflow(msg_size, max_msgs, &total_size)) {
79 ret = -EINVAL;
80 } else {
81 buffer = z_thread_malloc(total_size);
82 if (buffer != NULL) {
83 k_msgq_init(msgq, buffer, msg_size, max_msgs);
84 msgq->flags = K_MSGQ_FLAG_ALLOC;
85 ret = 0;
86 } else {
87 ret = -ENOMEM;
88 }
89 }
90
91 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, alloc_init, msgq, ret);
92 return ret;
93 }
94
95 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)96 int z_vrfy_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
97 uint32_t max_msgs)
98 {
99 K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(msgq, K_OBJ_MSGQ));
100
101 return z_impl_k_msgq_alloc_init(msgq, msg_size, max_msgs);
102 }
103 #include <zephyr/syscalls/k_msgq_alloc_init_mrsh.c>
104 #endif /* CONFIG_USERSPACE */
105
k_msgq_cleanup(struct k_msgq * msgq)106 int k_msgq_cleanup(struct k_msgq *msgq)
107 {
108 int ret = 0;
109 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, cleanup, msgq);
110
111 CHECKIF(z_waitq_head(&msgq->wait_q) != NULL) {
112 ret = -EBUSY;
113 goto exit;
114 }
115
116 if ((msgq->flags & K_MSGQ_FLAG_ALLOC) != 0U) {
117 k_free(msgq->buffer_start);
118 msgq->flags &= ~K_MSGQ_FLAG_ALLOC;
119 }
120
121 exit:
122 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, ret);
123 return ret;
124 }
125
put_msg_in_queue(struct k_msgq * msgq,const void * data,k_timeout_t timeout,bool put_at_back)126 static inline int put_msg_in_queue(struct k_msgq *msgq, const void *data,
127 k_timeout_t timeout, bool put_at_back)
128 {
129 __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
130
131 struct k_thread *pending_thread;
132 k_spinlock_key_t key;
133 int result;
134 bool resched = false;
135
136 key = k_spin_lock(&msgq->lock);
137
138 if (put_at_back) {
139 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout);
140 } else {
141 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put_front, msgq, timeout);
142 }
143
144 if (msgq->used_msgs < msgq->max_msgs) {
145 /* message queue isn't full */
146 pending_thread = z_unpend_first_thread(&msgq->wait_q);
147 if (unlikely(pending_thread != NULL)) {
148 resched = true;
149
150 /* give message to waiting thread */
151 (void)memcpy(pending_thread->base.swap_data, data, msgq->msg_size);
152 /* wake up waiting thread */
153 arch_thread_return_value_set(pending_thread, 0);
154 z_ready_thread(pending_thread);
155 } else {
156 __ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
157 msgq->write_ptr < msgq->buffer_end);
158 if (put_at_back) {
159 /*
160 * to write a message to the back of the queue,
161 * copy the message and increment write_ptr
162 */
163 (void)memcpy(msgq->write_ptr, (char *)data, msgq->msg_size);
164 msgq->write_ptr += msgq->msg_size;
165 if (msgq->write_ptr == msgq->buffer_end) {
166 msgq->write_ptr = msgq->buffer_start;
167 }
168 } else {
169 /*
170 * to write a message to the head of the queue,
171 * first decrement the read pointer (to open
172 * space at the front of the queue) then copy
173 * the message to the newly created space.
174 */
175 if (msgq->read_ptr == msgq->buffer_start) {
176 msgq->read_ptr = msgq->buffer_end;
177 }
178 msgq->read_ptr -= msgq->msg_size;
179 (void)memcpy(msgq->read_ptr, (char *)data, msgq->msg_size);
180 }
181 msgq->used_msgs++;
182 resched = handle_poll_events(msgq);
183 }
184 result = 0;
185 } else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
186 /* don't wait for message space to become available */
187 result = -ENOMSG;
188 } else {
189 if (put_at_back) {
190 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout);
191 } else {
192 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put_front, msgq, timeout);
193 }
194
195 /* wait for put message success, failure, or timeout */
196 _current->base.swap_data = (void *) data;
197
198 result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
199
200 if (put_at_back) {
201 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
202 } else {
203 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put_front, msgq, timeout, result);
204 }
205
206 return result;
207 }
208
209 if (put_at_back) {
210 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
211 } else {
212 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put_front, msgq, timeout, result);
213 }
214
215 if (resched) {
216 z_reschedule(&msgq->lock, key);
217 } else {
218 k_spin_unlock(&msgq->lock, key);
219 }
220
221 return result;
222 }
223
224
z_impl_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)225 int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout)
226 {
227 return put_msg_in_queue(msgq, data, timeout, true);
228 }
229
z_impl_k_msgq_put_front(struct k_msgq * msgq,const void * data)230 int z_impl_k_msgq_put_front(struct k_msgq *msgq, const void *data)
231 {
232 return put_msg_in_queue(msgq, data, K_NO_WAIT, false);
233 }
234
235 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)236 static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
237 k_timeout_t timeout)
238 {
239 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
240 K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size));
241
242 return z_impl_k_msgq_put(msgq, data, timeout);
243 }
244 #include <zephyr/syscalls/k_msgq_put_mrsh.c>
245
z_vrfy_k_msgq_put_front(struct k_msgq * msgq,const void * data)246 static inline int z_vrfy_k_msgq_put_front(struct k_msgq *msgq, const void *data)
247 {
248 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
249 K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size));
250
251 return z_impl_k_msgq_put_front(msgq, data);
252 }
253 #include <zephyr/syscalls/k_msgq_put_front_mrsh.c>
254 #endif /* CONFIG_USERSPACE */
255
z_impl_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)256 void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs)
257 {
258 attrs->msg_size = msgq->msg_size;
259 attrs->max_msgs = msgq->max_msgs;
260 attrs->used_msgs = msgq->used_msgs;
261 }
262
263 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)264 static inline void z_vrfy_k_msgq_get_attrs(struct k_msgq *msgq,
265 struct k_msgq_attrs *attrs)
266 {
267 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
268 K_OOPS(K_SYSCALL_MEMORY_WRITE(attrs, sizeof(struct k_msgq_attrs)));
269 z_impl_k_msgq_get_attrs(msgq, attrs);
270 }
271 #include <zephyr/syscalls/k_msgq_get_attrs_mrsh.c>
272 #endif /* CONFIG_USERSPACE */
273
z_impl_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)274 int z_impl_k_msgq_get(struct k_msgq *msgq, void *data, k_timeout_t timeout)
275 {
276 __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
277
278 k_spinlock_key_t key;
279 struct k_thread *pending_thread;
280 int result;
281 bool resched = false;
282
283 key = k_spin_lock(&msgq->lock);
284
285 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, get, msgq, timeout);
286
287 if (msgq->used_msgs > 0U) {
288 /* take first available message from queue */
289 (void)memcpy((char *)data, msgq->read_ptr, msgq->msg_size);
290 msgq->read_ptr += msgq->msg_size;
291 if (msgq->read_ptr == msgq->buffer_end) {
292 msgq->read_ptr = msgq->buffer_start;
293 }
294 msgq->used_msgs--;
295
296 /* handle first thread waiting to write (if any) */
297 pending_thread = z_unpend_first_thread(&msgq->wait_q);
298 if (unlikely(pending_thread != NULL)) {
299 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
300
301 /* add thread's message to queue */
302 __ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
303 msgq->write_ptr < msgq->buffer_end);
304 (void)memcpy(msgq->write_ptr, (char *)pending_thread->base.swap_data,
305 msgq->msg_size);
306 msgq->write_ptr += msgq->msg_size;
307 if (msgq->write_ptr == msgq->buffer_end) {
308 msgq->write_ptr = msgq->buffer_start;
309 }
310 msgq->used_msgs++;
311
312 /* wake up waiting thread */
313 arch_thread_return_value_set(pending_thread, 0);
314 z_ready_thread(pending_thread);
315 resched = true;
316 }
317 result = 0;
318 } else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
319 /* don't wait for a message to become available */
320 result = -ENOMSG;
321 } else {
322 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
323
324 /* wait for get message success or timeout */
325 _current->base.swap_data = data;
326
327 result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
328 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
329 return result;
330 }
331
332 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
333
334 if (resched) {
335 z_reschedule(&msgq->lock, key);
336 } else {
337 k_spin_unlock(&msgq->lock, key);
338 }
339
340 return result;
341 }
342
343 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)344 static inline int z_vrfy_k_msgq_get(struct k_msgq *msgq, void *data,
345 k_timeout_t timeout)
346 {
347 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
348 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
349
350 return z_impl_k_msgq_get(msgq, data, timeout);
351 }
352 #include <zephyr/syscalls/k_msgq_get_mrsh.c>
353 #endif /* CONFIG_USERSPACE */
354
z_impl_k_msgq_peek(struct k_msgq * msgq,void * data)355 int z_impl_k_msgq_peek(struct k_msgq *msgq, void *data)
356 {
357 k_spinlock_key_t key;
358 int result;
359
360 key = k_spin_lock(&msgq->lock);
361
362 if (msgq->used_msgs > 0U) {
363 /* take first available message from queue */
364 (void)memcpy((char *)data, msgq->read_ptr, msgq->msg_size);
365 result = 0;
366 } else {
367 /* don't wait for a message to become available */
368 result = -ENOMSG;
369 }
370
371 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
372
373 k_spin_unlock(&msgq->lock, key);
374
375 return result;
376 }
377
378 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek(struct k_msgq * msgq,void * data)379 static inline int z_vrfy_k_msgq_peek(struct k_msgq *msgq, void *data)
380 {
381 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
382 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
383
384 return z_impl_k_msgq_peek(msgq, data);
385 }
386 #include <zephyr/syscalls/k_msgq_peek_mrsh.c>
387 #endif /* CONFIG_USERSPACE */
388
z_impl_k_msgq_peek_at(struct k_msgq * msgq,void * data,uint32_t idx)389 int z_impl_k_msgq_peek_at(struct k_msgq *msgq, void *data, uint32_t idx)
390 {
391 k_spinlock_key_t key;
392 int result;
393 uint32_t bytes_to_end;
394 uint32_t byte_offset;
395 char *start_addr;
396
397 key = k_spin_lock(&msgq->lock);
398
399 if (msgq->used_msgs > idx) {
400 bytes_to_end = (msgq->buffer_end - msgq->read_ptr);
401 byte_offset = idx * msgq->msg_size;
402 start_addr = msgq->read_ptr;
403 /* check item available in start/end of ring buffer */
404 if (bytes_to_end <= byte_offset) {
405 /* Tweak the values in case */
406 byte_offset -= bytes_to_end;
407 /* wrap-around is required */
408 start_addr = msgq->buffer_start;
409 }
410 (void)memcpy(data, start_addr + byte_offset, msgq->msg_size);
411 result = 0;
412 } else {
413 /* don't wait for a message to become available */
414 result = -ENOMSG;
415 }
416
417 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
418
419 k_spin_unlock(&msgq->lock, key);
420
421 return result;
422 }
423
424 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek_at(struct k_msgq * msgq,void * data,uint32_t idx)425 static inline int z_vrfy_k_msgq_peek_at(struct k_msgq *msgq, void *data, uint32_t idx)
426 {
427 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
428 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
429
430 return z_impl_k_msgq_peek_at(msgq, data, idx);
431 }
432 #include <zephyr/syscalls/k_msgq_peek_at_mrsh.c>
433 #endif /* CONFIG_USERSPACE */
434
z_impl_k_msgq_purge(struct k_msgq * msgq)435 void z_impl_k_msgq_purge(struct k_msgq *msgq)
436 {
437 k_spinlock_key_t key;
438 struct k_thread *pending_thread;
439 bool resched = false;
440
441 key = k_spin_lock(&msgq->lock);
442
443 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, purge, msgq);
444
445 /* wake up any threads that are waiting to write */
446 for (pending_thread = z_unpend_first_thread(&msgq->wait_q);
447 pending_thread != NULL;
448 pending_thread = z_unpend_first_thread(&msgq->wait_q)) {
449 arch_thread_return_value_set(pending_thread, -ENOMSG);
450 z_ready_thread(pending_thread);
451 resched = true;
452 }
453
454 msgq->used_msgs = 0;
455 msgq->read_ptr = msgq->write_ptr;
456
457 if (resched) {
458 z_reschedule(&msgq->lock, key);
459 } else {
460 k_spin_unlock(&msgq->lock, key);
461 }
462 }
463
464 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_purge(struct k_msgq * msgq)465 static inline void z_vrfy_k_msgq_purge(struct k_msgq *msgq)
466 {
467 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
468 z_impl_k_msgq_purge(msgq);
469 }
470 #include <zephyr/syscalls/k_msgq_purge_mrsh.c>
471
z_vrfy_k_msgq_num_free_get(struct k_msgq * msgq)472 static inline uint32_t z_vrfy_k_msgq_num_free_get(struct k_msgq *msgq)
473 {
474 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
475 return z_impl_k_msgq_num_free_get(msgq);
476 }
477 #include <zephyr/syscalls/k_msgq_num_free_get_mrsh.c>
478
z_vrfy_k_msgq_num_used_get(struct k_msgq * msgq)479 static inline uint32_t z_vrfy_k_msgq_num_used_get(struct k_msgq *msgq)
480 {
481 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
482 return z_impl_k_msgq_num_used_get(msgq);
483 }
484 #include <zephyr/syscalls/k_msgq_num_used_get_mrsh.c>
485
486 #endif /* CONFIG_USERSPACE */
487
488 #ifdef CONFIG_OBJ_CORE_MSGQ
init_msgq_obj_core_list(void)489 static int init_msgq_obj_core_list(void)
490 {
491 /* Initialize msgq object type */
492
493 z_obj_type_init(&obj_type_msgq, K_OBJ_TYPE_MSGQ_ID,
494 offsetof(struct k_msgq, obj_core));
495
496 /* Initialize and link statically defined message queues */
497
498 STRUCT_SECTION_FOREACH(k_msgq, msgq) {
499 k_obj_core_init_and_link(K_OBJ_CORE(msgq), &obj_type_msgq);
500 }
501
502 return 0;
503 };
504
505 SYS_INIT(init_msgq_obj_core_list, PRE_KERNEL_1,
506 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
507
508 #endif /* CONFIG_OBJ_CORE_MSGQ */
509