1 /*
2 * Copyright (c) 2016 Wind River Systems, Inc.
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 /**
8 * @file
9 * @brief Message queues.
10 */
11
12
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15
16 #include <zephyr/toolchain.h>
17 #include <zephyr/linker/sections.h>
18 #include <string.h>
19 #include <ksched.h>
20 #include <wait_q.h>
21 #include <zephyr/sys/dlist.h>
22 #include <zephyr/sys/math_extras.h>
23 #include <zephyr/init.h>
24 #include <zephyr/internal/syscall_handler.h>
25 #include <kernel_internal.h>
26 #include <zephyr/sys/check.h>
27
28 #ifdef CONFIG_OBJ_CORE_MSGQ
29 static struct k_obj_type obj_type_msgq;
30 #endif /* CONFIG_OBJ_CORE_MSGQ */
31
handle_poll_events(struct k_msgq * msgq)32 static inline bool handle_poll_events(struct k_msgq *msgq)
33 {
34 #ifdef CONFIG_POLL
35 return z_handle_obj_poll_events(&msgq->poll_events,
36 K_POLL_STATE_MSGQ_DATA_AVAILABLE);
37 #else
38 ARG_UNUSED(msgq);
39 return false;
40 #endif /* CONFIG_POLL */
41 }
42
k_msgq_init(struct k_msgq * msgq,char * buffer,size_t msg_size,uint32_t max_msgs)43 void k_msgq_init(struct k_msgq *msgq, char *buffer, size_t msg_size,
44 uint32_t max_msgs)
45 {
46 msgq->msg_size = msg_size;
47 msgq->max_msgs = max_msgs;
48 msgq->buffer_start = buffer;
49 msgq->buffer_end = buffer + (max_msgs * msg_size);
50 msgq->read_ptr = buffer;
51 msgq->write_ptr = buffer;
52 msgq->used_msgs = 0;
53 msgq->flags = 0;
54 z_waitq_init(&msgq->wait_q);
55 msgq->lock = (struct k_spinlock) {};
56 #ifdef CONFIG_POLL
57 sys_dlist_init(&msgq->poll_events);
58 #endif /* CONFIG_POLL */
59
60 #ifdef CONFIG_OBJ_CORE_MSGQ
61 k_obj_core_init_and_link(K_OBJ_CORE(msgq), &obj_type_msgq);
62 #endif /* CONFIG_OBJ_CORE_MSGQ */
63
64 SYS_PORT_TRACING_OBJ_INIT(k_msgq, msgq);
65
66 k_object_init(msgq);
67 }
68
z_impl_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)69 int z_impl_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
70 uint32_t max_msgs)
71 {
72 void *buffer;
73 int ret;
74 size_t total_size;
75
76 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, alloc_init, msgq);
77
78 if (size_mul_overflow(msg_size, max_msgs, &total_size)) {
79 ret = -EINVAL;
80 } else {
81 buffer = z_thread_malloc(total_size);
82 if (buffer != NULL) {
83 k_msgq_init(msgq, buffer, msg_size, max_msgs);
84 msgq->flags = K_MSGQ_FLAG_ALLOC;
85 ret = 0;
86 } else {
87 ret = -ENOMEM;
88 }
89 }
90
91 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, alloc_init, msgq, ret);
92
93 return ret;
94 }
95
96 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)97 int z_vrfy_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
98 uint32_t max_msgs)
99 {
100 K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(msgq, K_OBJ_MSGQ));
101
102 return z_impl_k_msgq_alloc_init(msgq, msg_size, max_msgs);
103 }
104 #include <zephyr/syscalls/k_msgq_alloc_init_mrsh.c>
105 #endif /* CONFIG_USERSPACE */
106
k_msgq_cleanup(struct k_msgq * msgq)107 int k_msgq_cleanup(struct k_msgq *msgq)
108 {
109 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, cleanup, msgq);
110
111 CHECKIF(z_waitq_head(&msgq->wait_q) != NULL) {
112 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, -EBUSY);
113
114 return -EBUSY;
115 }
116
117 if ((msgq->flags & K_MSGQ_FLAG_ALLOC) != 0U) {
118 k_free(msgq->buffer_start);
119 msgq->flags &= ~K_MSGQ_FLAG_ALLOC;
120 }
121
122 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, 0);
123
124 return 0;
125 }
126
127
z_impl_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)128 int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout)
129 {
130 __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
131
132 struct k_thread *pending_thread;
133 k_spinlock_key_t key;
134 int result;
135 bool resched = false;
136
137 key = k_spin_lock(&msgq->lock);
138
139 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout);
140
141 if (msgq->used_msgs < msgq->max_msgs) {
142 /* message queue isn't full */
143 pending_thread = z_unpend_first_thread(&msgq->wait_q);
144 if (unlikely(pending_thread != NULL)) {
145 resched = true;
146
147 /* give message to waiting thread */
148 (void)memcpy(pending_thread->base.swap_data, data, msgq->msg_size);
149 /* wake up waiting thread */
150 arch_thread_return_value_set(pending_thread, 0);
151 z_ready_thread(pending_thread);
152 } else {
153 /* put message in queue */
154 __ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
155 msgq->write_ptr < msgq->buffer_end);
156 (void)memcpy(msgq->write_ptr, (char *)data, msgq->msg_size);
157 msgq->write_ptr += msgq->msg_size;
158 if (msgq->write_ptr == msgq->buffer_end) {
159 msgq->write_ptr = msgq->buffer_start;
160 }
161 msgq->used_msgs++;
162 resched = handle_poll_events(msgq);
163 }
164 result = 0;
165 } else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
166 /* don't wait for message space to become available */
167 result = -ENOMSG;
168 } else {
169 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout);
170
171 /* wait for put message success, failure, or timeout */
172 _current->base.swap_data = (void *) data;
173
174 result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
175 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
176 return result;
177 }
178
179 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
180
181 if (resched) {
182 z_reschedule(&msgq->lock, key);
183 } else {
184 k_spin_unlock(&msgq->lock, key);
185 }
186
187 return result;
188 }
189
190 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)191 static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
192 k_timeout_t timeout)
193 {
194 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
195 K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size));
196
197 return z_impl_k_msgq_put(msgq, data, timeout);
198 }
199 #include <zephyr/syscalls/k_msgq_put_mrsh.c>
200 #endif /* CONFIG_USERSPACE */
201
z_impl_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)202 void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs)
203 {
204 attrs->msg_size = msgq->msg_size;
205 attrs->max_msgs = msgq->max_msgs;
206 attrs->used_msgs = msgq->used_msgs;
207 }
208
209 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)210 static inline void z_vrfy_k_msgq_get_attrs(struct k_msgq *msgq,
211 struct k_msgq_attrs *attrs)
212 {
213 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
214 K_OOPS(K_SYSCALL_MEMORY_WRITE(attrs, sizeof(struct k_msgq_attrs)));
215 z_impl_k_msgq_get_attrs(msgq, attrs);
216 }
217 #include <zephyr/syscalls/k_msgq_get_attrs_mrsh.c>
218 #endif /* CONFIG_USERSPACE */
219
z_impl_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)220 int z_impl_k_msgq_get(struct k_msgq *msgq, void *data, k_timeout_t timeout)
221 {
222 __ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
223
224 k_spinlock_key_t key;
225 struct k_thread *pending_thread;
226 int result;
227 bool resched = false;
228
229 key = k_spin_lock(&msgq->lock);
230
231 SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, get, msgq, timeout);
232
233 if (msgq->used_msgs > 0U) {
234 /* take first available message from queue */
235 (void)memcpy((char *)data, msgq->read_ptr, msgq->msg_size);
236 msgq->read_ptr += msgq->msg_size;
237 if (msgq->read_ptr == msgq->buffer_end) {
238 msgq->read_ptr = msgq->buffer_start;
239 }
240 msgq->used_msgs--;
241
242 /* handle first thread waiting to write (if any) */
243 pending_thread = z_unpend_first_thread(&msgq->wait_q);
244 if (unlikely(pending_thread != NULL)) {
245 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
246
247 /* add thread's message to queue */
248 __ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
249 msgq->write_ptr < msgq->buffer_end);
250 (void)memcpy(msgq->write_ptr, (char *)pending_thread->base.swap_data,
251 msgq->msg_size);
252 msgq->write_ptr += msgq->msg_size;
253 if (msgq->write_ptr == msgq->buffer_end) {
254 msgq->write_ptr = msgq->buffer_start;
255 }
256 msgq->used_msgs++;
257
258 /* wake up waiting thread */
259 arch_thread_return_value_set(pending_thread, 0);
260 z_ready_thread(pending_thread);
261 resched = true;
262 }
263 result = 0;
264 } else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
265 /* don't wait for a message to become available */
266 result = -ENOMSG;
267 } else {
268 SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
269
270 /* wait for get message success or timeout */
271 _current->base.swap_data = data;
272
273 result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
274 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
275 return result;
276 }
277
278 SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
279
280 if (resched) {
281 z_reschedule(&msgq->lock, key);
282 } else {
283 k_spin_unlock(&msgq->lock, key);
284 }
285
286 return result;
287 }
288
289 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)290 static inline int z_vrfy_k_msgq_get(struct k_msgq *msgq, void *data,
291 k_timeout_t timeout)
292 {
293 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
294 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
295
296 return z_impl_k_msgq_get(msgq, data, timeout);
297 }
298 #include <zephyr/syscalls/k_msgq_get_mrsh.c>
299 #endif /* CONFIG_USERSPACE */
300
z_impl_k_msgq_peek(struct k_msgq * msgq,void * data)301 int z_impl_k_msgq_peek(struct k_msgq *msgq, void *data)
302 {
303 k_spinlock_key_t key;
304 int result;
305
306 key = k_spin_lock(&msgq->lock);
307
308 if (msgq->used_msgs > 0U) {
309 /* take first available message from queue */
310 (void)memcpy((char *)data, msgq->read_ptr, msgq->msg_size);
311 result = 0;
312 } else {
313 /* don't wait for a message to become available */
314 result = -ENOMSG;
315 }
316
317 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
318
319 k_spin_unlock(&msgq->lock, key);
320
321 return result;
322 }
323
324 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek(struct k_msgq * msgq,void * data)325 static inline int z_vrfy_k_msgq_peek(struct k_msgq *msgq, void *data)
326 {
327 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
328 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
329
330 return z_impl_k_msgq_peek(msgq, data);
331 }
332 #include <zephyr/syscalls/k_msgq_peek_mrsh.c>
333 #endif /* CONFIG_USERSPACE */
334
z_impl_k_msgq_peek_at(struct k_msgq * msgq,void * data,uint32_t idx)335 int z_impl_k_msgq_peek_at(struct k_msgq *msgq, void *data, uint32_t idx)
336 {
337 k_spinlock_key_t key;
338 int result;
339 uint32_t bytes_to_end;
340 uint32_t byte_offset;
341 char *start_addr;
342
343 key = k_spin_lock(&msgq->lock);
344
345 if (msgq->used_msgs > idx) {
346 bytes_to_end = (msgq->buffer_end - msgq->read_ptr);
347 byte_offset = idx * msgq->msg_size;
348 start_addr = msgq->read_ptr;
349 /* check item available in start/end of ring buffer */
350 if (bytes_to_end <= byte_offset) {
351 /* Tweak the values in case */
352 byte_offset -= bytes_to_end;
353 /* wrap-around is required */
354 start_addr = msgq->buffer_start;
355 }
356 (void)memcpy(data, start_addr + byte_offset, msgq->msg_size);
357 result = 0;
358 } else {
359 /* don't wait for a message to become available */
360 result = -ENOMSG;
361 }
362
363 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
364
365 k_spin_unlock(&msgq->lock, key);
366
367 return result;
368 }
369
370 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek_at(struct k_msgq * msgq,void * data,uint32_t idx)371 static inline int z_vrfy_k_msgq_peek_at(struct k_msgq *msgq, void *data, uint32_t idx)
372 {
373 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
374 K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
375
376 return z_impl_k_msgq_peek_at(msgq, data, idx);
377 }
378 #include <zephyr/syscalls/k_msgq_peek_at_mrsh.c>
379 #endif /* CONFIG_USERSPACE */
380
z_impl_k_msgq_purge(struct k_msgq * msgq)381 void z_impl_k_msgq_purge(struct k_msgq *msgq)
382 {
383 k_spinlock_key_t key;
384 struct k_thread *pending_thread;
385 bool resched = false;
386
387 key = k_spin_lock(&msgq->lock);
388
389 SYS_PORT_TRACING_OBJ_FUNC(k_msgq, purge, msgq);
390
391 /* wake up any threads that are waiting to write */
392 for (pending_thread = z_unpend_first_thread(&msgq->wait_q);
393 pending_thread != NULL;
394 pending_thread = z_unpend_first_thread(&msgq->wait_q)) {
395 arch_thread_return_value_set(pending_thread, -ENOMSG);
396 z_ready_thread(pending_thread);
397 resched = true;
398 }
399
400 msgq->used_msgs = 0;
401 msgq->read_ptr = msgq->write_ptr;
402
403 if (resched) {
404 z_reschedule(&msgq->lock, key);
405 } else {
406 k_spin_unlock(&msgq->lock, key);
407 }
408 }
409
410 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_purge(struct k_msgq * msgq)411 static inline void z_vrfy_k_msgq_purge(struct k_msgq *msgq)
412 {
413 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
414 z_impl_k_msgq_purge(msgq);
415 }
416 #include <zephyr/syscalls/k_msgq_purge_mrsh.c>
417
z_vrfy_k_msgq_num_free_get(struct k_msgq * msgq)418 static inline uint32_t z_vrfy_k_msgq_num_free_get(struct k_msgq *msgq)
419 {
420 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
421 return z_impl_k_msgq_num_free_get(msgq);
422 }
423 #include <zephyr/syscalls/k_msgq_num_free_get_mrsh.c>
424
z_vrfy_k_msgq_num_used_get(struct k_msgq * msgq)425 static inline uint32_t z_vrfy_k_msgq_num_used_get(struct k_msgq *msgq)
426 {
427 K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
428 return z_impl_k_msgq_num_used_get(msgq);
429 }
430 #include <zephyr/syscalls/k_msgq_num_used_get_mrsh.c>
431
432 #endif /* CONFIG_USERSPACE */
433
434 #ifdef CONFIG_OBJ_CORE_MSGQ
init_msgq_obj_core_list(void)435 static int init_msgq_obj_core_list(void)
436 {
437 /* Initialize msgq object type */
438
439 z_obj_type_init(&obj_type_msgq, K_OBJ_TYPE_MSGQ_ID,
440 offsetof(struct k_msgq, obj_core));
441
442 /* Initialize and link statically defined message queues */
443
444 STRUCT_SECTION_FOREACH(k_msgq, msgq) {
445 k_obj_core_init_and_link(K_OBJ_CORE(msgq), &obj_type_msgq);
446 }
447
448 return 0;
449 };
450
451 SYS_INIT(init_msgq_obj_core_list, PRE_KERNEL_1,
452 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
453
454 #endif /* CONFIG_OBJ_CORE_MSGQ */
455