1 /*
2  * Copyright (c) 2016 Wind River Systems, Inc.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 /**
8  * @file
9  * @brief Message queues.
10  */
11 
12 
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15 
16 #include <zephyr/toolchain.h>
17 #include <zephyr/linker/sections.h>
18 #include <string.h>
19 #include <ksched.h>
20 #include <wait_q.h>
21 #include <zephyr/sys/dlist.h>
22 #include <zephyr/sys/math_extras.h>
23 #include <zephyr/init.h>
24 #include <zephyr/internal/syscall_handler.h>
25 #include <kernel_internal.h>
26 #include <zephyr/sys/check.h>
27 
28 #ifdef CONFIG_OBJ_CORE_MSGQ
29 static struct k_obj_type obj_type_msgq;
30 #endif /* CONFIG_OBJ_CORE_MSGQ */
31 
handle_poll_events(struct k_msgq * msgq)32 static inline bool handle_poll_events(struct k_msgq *msgq)
33 {
34 #ifdef CONFIG_POLL
35 	return z_handle_obj_poll_events(&msgq->poll_events,
36 					K_POLL_STATE_MSGQ_DATA_AVAILABLE);
37 #else
38 	ARG_UNUSED(msgq);
39 	return false;
40 #endif /* CONFIG_POLL */
41 }
42 
k_msgq_init(struct k_msgq * msgq,char * buffer,size_t msg_size,uint32_t max_msgs)43 void k_msgq_init(struct k_msgq *msgq, char *buffer, size_t msg_size,
44 		 uint32_t max_msgs)
45 {
46 	msgq->msg_size = msg_size;
47 	msgq->max_msgs = max_msgs;
48 	msgq->buffer_start = buffer;
49 	msgq->buffer_end = buffer + (max_msgs * msg_size);
50 	msgq->read_ptr = buffer;
51 	msgq->write_ptr = buffer;
52 	msgq->used_msgs = 0;
53 	msgq->flags = 0;
54 	z_waitq_init(&msgq->wait_q);
55 	msgq->lock = (struct k_spinlock) {};
56 #ifdef CONFIG_POLL
57 	sys_dlist_init(&msgq->poll_events);
58 #endif	/* CONFIG_POLL */
59 
60 #ifdef CONFIG_OBJ_CORE_MSGQ
61 	k_obj_core_init_and_link(K_OBJ_CORE(msgq), &obj_type_msgq);
62 #endif /* CONFIG_OBJ_CORE_MSGQ */
63 
64 	SYS_PORT_TRACING_OBJ_INIT(k_msgq, msgq);
65 
66 	k_object_init(msgq);
67 }
68 
z_impl_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)69 int z_impl_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
70 			    uint32_t max_msgs)
71 {
72 	void *buffer;
73 	int ret;
74 	size_t total_size;
75 
76 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, alloc_init, msgq);
77 
78 	if (size_mul_overflow(msg_size, max_msgs, &total_size)) {
79 		ret = -EINVAL;
80 	} else {
81 		buffer = z_thread_malloc(total_size);
82 		if (buffer != NULL) {
83 			k_msgq_init(msgq, buffer, msg_size, max_msgs);
84 			msgq->flags = K_MSGQ_FLAG_ALLOC;
85 			ret = 0;
86 		} else {
87 			ret = -ENOMEM;
88 		}
89 	}
90 
91 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, alloc_init, msgq, ret);
92 	return ret;
93 }
94 
95 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)96 int z_vrfy_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
97 			    uint32_t max_msgs)
98 {
99 	K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(msgq, K_OBJ_MSGQ));
100 
101 	return z_impl_k_msgq_alloc_init(msgq, msg_size, max_msgs);
102 }
103 #include <zephyr/syscalls/k_msgq_alloc_init_mrsh.c>
104 #endif /* CONFIG_USERSPACE */
105 
k_msgq_cleanup(struct k_msgq * msgq)106 int k_msgq_cleanup(struct k_msgq *msgq)
107 {
108 	int ret = 0;
109 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, cleanup, msgq);
110 
111 	CHECKIF(z_waitq_head(&msgq->wait_q) != NULL) {
112 		ret = -EBUSY;
113 		goto exit;
114 	}
115 
116 	if ((msgq->flags & K_MSGQ_FLAG_ALLOC) != 0U) {
117 		k_free(msgq->buffer_start);
118 		msgq->flags &= ~K_MSGQ_FLAG_ALLOC;
119 	}
120 
121 exit:
122 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, ret);
123 	return ret;
124 }
125 
put_msg_in_queue(struct k_msgq * msgq,const void * data,k_timeout_t timeout,bool put_at_back)126 static inline int put_msg_in_queue(struct k_msgq *msgq, const void *data,
127 			k_timeout_t timeout, bool put_at_back)
128 {
129 	__ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
130 
131 	struct k_thread *pending_thread;
132 	k_spinlock_key_t key;
133 	int result;
134 	bool resched = false;
135 
136 	key = k_spin_lock(&msgq->lock);
137 
138 	if (put_at_back) {
139 		SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout);
140 	} else {
141 		SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put_front, msgq, timeout);
142 	}
143 
144 	if (msgq->used_msgs < msgq->max_msgs) {
145 		/* message queue isn't full */
146 		pending_thread = z_unpend_first_thread(&msgq->wait_q);
147 		if (unlikely(pending_thread != NULL)) {
148 			resched = true;
149 
150 			/* give message to waiting thread */
151 			(void)memcpy(pending_thread->base.swap_data, data, msgq->msg_size);
152 			/* wake up waiting thread */
153 			arch_thread_return_value_set(pending_thread, 0);
154 			z_ready_thread(pending_thread);
155 		} else {
156 			__ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
157 					msgq->write_ptr < msgq->buffer_end);
158 			if (put_at_back) {
159 				/*
160 				 * to write a message to the back of the queue,
161 				 * copy the message and increment write_ptr
162 				 */
163 				(void)memcpy(msgq->write_ptr, (char *)data, msgq->msg_size);
164 				msgq->write_ptr += msgq->msg_size;
165 				if (msgq->write_ptr == msgq->buffer_end) {
166 					msgq->write_ptr = msgq->buffer_start;
167 				}
168 			} else {
169 				/*
170 				 * to write a message to the head of the queue,
171 				 * first decrement the read pointer (to open
172 				 * space at the front of the queue) then copy
173 				 * the message to the newly created space.
174 				 */
175 				if (msgq->read_ptr == msgq->buffer_start) {
176 					msgq->read_ptr = msgq->buffer_end;
177 				}
178 				msgq->read_ptr -= msgq->msg_size;
179 				(void)memcpy(msgq->read_ptr, (char *)data, msgq->msg_size);
180 			}
181 			msgq->used_msgs++;
182 			resched = handle_poll_events(msgq);
183 		}
184 		result = 0;
185 	} else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
186 		/* don't wait for message space to become available */
187 		result = -ENOMSG;
188 	} else {
189 		if (put_at_back) {
190 			SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout);
191 		} else {
192 			SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put_front, msgq, timeout);
193 		}
194 
195 		/* wait for put message success, failure, or timeout */
196 		_current->base.swap_data = (void *) data;
197 
198 		result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
199 
200 		if (put_at_back) {
201 			SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
202 		} else {
203 			SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put_front, msgq, timeout, result);
204 		}
205 
206 		return result;
207 	}
208 
209 	if (put_at_back) {
210 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
211 	} else {
212 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put_front, msgq, timeout, result);
213 	}
214 
215 	if (resched) {
216 		z_reschedule(&msgq->lock, key);
217 	} else {
218 		k_spin_unlock(&msgq->lock, key);
219 	}
220 
221 	return result;
222 }
223 
224 
z_impl_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)225 int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout)
226 {
227 	return put_msg_in_queue(msgq, data, timeout, true);
228 }
229 
z_impl_k_msgq_put_front(struct k_msgq * msgq,const void * data)230 int z_impl_k_msgq_put_front(struct k_msgq *msgq, const void *data)
231 {
232 	return put_msg_in_queue(msgq, data, K_NO_WAIT, false);
233 }
234 
235 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)236 static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
237 				    k_timeout_t timeout)
238 {
239 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
240 	K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size));
241 
242 	return z_impl_k_msgq_put(msgq, data, timeout);
243 }
244 #include <zephyr/syscalls/k_msgq_put_mrsh.c>
245 
z_vrfy_k_msgq_put_front(struct k_msgq * msgq,const void * data)246 static inline int z_vrfy_k_msgq_put_front(struct k_msgq *msgq, const void *data)
247 {
248 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
249 	K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size));
250 
251 	return z_impl_k_msgq_put_front(msgq, data);
252 }
253 #include <zephyr/syscalls/k_msgq_put_front_mrsh.c>
254 #endif /* CONFIG_USERSPACE */
255 
z_impl_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)256 void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs)
257 {
258 	attrs->msg_size = msgq->msg_size;
259 	attrs->max_msgs = msgq->max_msgs;
260 	attrs->used_msgs = msgq->used_msgs;
261 }
262 
263 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)264 static inline void z_vrfy_k_msgq_get_attrs(struct k_msgq *msgq,
265 					   struct k_msgq_attrs *attrs)
266 {
267 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
268 	K_OOPS(K_SYSCALL_MEMORY_WRITE(attrs, sizeof(struct k_msgq_attrs)));
269 	z_impl_k_msgq_get_attrs(msgq, attrs);
270 }
271 #include <zephyr/syscalls/k_msgq_get_attrs_mrsh.c>
272 #endif /* CONFIG_USERSPACE */
273 
z_impl_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)274 int z_impl_k_msgq_get(struct k_msgq *msgq, void *data, k_timeout_t timeout)
275 {
276 	__ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
277 
278 	k_spinlock_key_t key;
279 	struct k_thread *pending_thread;
280 	int result;
281 	bool resched = false;
282 
283 	key = k_spin_lock(&msgq->lock);
284 
285 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, get, msgq, timeout);
286 
287 	if (msgq->used_msgs > 0U) {
288 		/* take first available message from queue */
289 		(void)memcpy((char *)data, msgq->read_ptr, msgq->msg_size);
290 		msgq->read_ptr += msgq->msg_size;
291 		if (msgq->read_ptr == msgq->buffer_end) {
292 			msgq->read_ptr = msgq->buffer_start;
293 		}
294 		msgq->used_msgs--;
295 
296 		/* handle first thread waiting to write (if any) */
297 		pending_thread = z_unpend_first_thread(&msgq->wait_q);
298 		if (unlikely(pending_thread != NULL)) {
299 			SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
300 
301 			/* add thread's message to queue */
302 			__ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
303 					msgq->write_ptr < msgq->buffer_end);
304 			(void)memcpy(msgq->write_ptr, (char *)pending_thread->base.swap_data,
305 			       msgq->msg_size);
306 			msgq->write_ptr += msgq->msg_size;
307 			if (msgq->write_ptr == msgq->buffer_end) {
308 				msgq->write_ptr = msgq->buffer_start;
309 			}
310 			msgq->used_msgs++;
311 
312 			/* wake up waiting thread */
313 			arch_thread_return_value_set(pending_thread, 0);
314 			z_ready_thread(pending_thread);
315 			resched = true;
316 		}
317 		result = 0;
318 	} else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
319 		/* don't wait for a message to become available */
320 		result = -ENOMSG;
321 	} else {
322 		SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
323 
324 		/* wait for get message success or timeout */
325 		_current->base.swap_data = data;
326 
327 		result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
328 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
329 		return result;
330 	}
331 
332 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
333 
334 	if (resched) {
335 		z_reschedule(&msgq->lock, key);
336 	} else {
337 		k_spin_unlock(&msgq->lock, key);
338 	}
339 
340 	return result;
341 }
342 
343 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)344 static inline int z_vrfy_k_msgq_get(struct k_msgq *msgq, void *data,
345 				    k_timeout_t timeout)
346 {
347 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
348 	K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
349 
350 	return z_impl_k_msgq_get(msgq, data, timeout);
351 }
352 #include <zephyr/syscalls/k_msgq_get_mrsh.c>
353 #endif /* CONFIG_USERSPACE */
354 
z_impl_k_msgq_peek(struct k_msgq * msgq,void * data)355 int z_impl_k_msgq_peek(struct k_msgq *msgq, void *data)
356 {
357 	k_spinlock_key_t key;
358 	int result;
359 
360 	key = k_spin_lock(&msgq->lock);
361 
362 	if (msgq->used_msgs > 0U) {
363 		/* take first available message from queue */
364 		(void)memcpy((char *)data, msgq->read_ptr, msgq->msg_size);
365 		result = 0;
366 	} else {
367 		/* don't wait for a message to become available */
368 		result = -ENOMSG;
369 	}
370 
371 	SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
372 
373 	k_spin_unlock(&msgq->lock, key);
374 
375 	return result;
376 }
377 
378 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek(struct k_msgq * msgq,void * data)379 static inline int z_vrfy_k_msgq_peek(struct k_msgq *msgq, void *data)
380 {
381 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
382 	K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
383 
384 	return z_impl_k_msgq_peek(msgq, data);
385 }
386 #include <zephyr/syscalls/k_msgq_peek_mrsh.c>
387 #endif /* CONFIG_USERSPACE */
388 
z_impl_k_msgq_peek_at(struct k_msgq * msgq,void * data,uint32_t idx)389 int z_impl_k_msgq_peek_at(struct k_msgq *msgq, void *data, uint32_t idx)
390 {
391 	k_spinlock_key_t key;
392 	int result;
393 	uint32_t bytes_to_end;
394 	uint32_t byte_offset;
395 	char *start_addr;
396 
397 	key = k_spin_lock(&msgq->lock);
398 
399 	if (msgq->used_msgs > idx) {
400 		bytes_to_end = (msgq->buffer_end - msgq->read_ptr);
401 		byte_offset = idx * msgq->msg_size;
402 		start_addr = msgq->read_ptr;
403 		/* check item available in start/end of ring buffer */
404 		if (bytes_to_end <= byte_offset) {
405 			/* Tweak the values in case */
406 			byte_offset -= bytes_to_end;
407 			/* wrap-around is required */
408 			start_addr = msgq->buffer_start;
409 		}
410 		(void)memcpy(data, start_addr + byte_offset, msgq->msg_size);
411 		result = 0;
412 	} else {
413 		/* don't wait for a message to become available */
414 		result = -ENOMSG;
415 	}
416 
417 	SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
418 
419 	k_spin_unlock(&msgq->lock, key);
420 
421 	return result;
422 }
423 
424 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek_at(struct k_msgq * msgq,void * data,uint32_t idx)425 static inline int z_vrfy_k_msgq_peek_at(struct k_msgq *msgq, void *data, uint32_t idx)
426 {
427 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
428 	K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
429 
430 	return z_impl_k_msgq_peek_at(msgq, data, idx);
431 }
432 #include <zephyr/syscalls/k_msgq_peek_at_mrsh.c>
433 #endif /* CONFIG_USERSPACE */
434 
z_impl_k_msgq_purge(struct k_msgq * msgq)435 void z_impl_k_msgq_purge(struct k_msgq *msgq)
436 {
437 	k_spinlock_key_t key;
438 	struct k_thread *pending_thread;
439 	bool resched = false;
440 
441 	key = k_spin_lock(&msgq->lock);
442 
443 	SYS_PORT_TRACING_OBJ_FUNC(k_msgq, purge, msgq);
444 
445 	/* wake up any threads that are waiting to write */
446 	for (pending_thread = z_unpend_first_thread(&msgq->wait_q);
447 	     pending_thread != NULL;
448 	     pending_thread = z_unpend_first_thread(&msgq->wait_q)) {
449 		arch_thread_return_value_set(pending_thread, -ENOMSG);
450 		z_ready_thread(pending_thread);
451 		resched = true;
452 	}
453 
454 	msgq->used_msgs = 0;
455 	msgq->read_ptr = msgq->write_ptr;
456 
457 	if (resched) {
458 		z_reschedule(&msgq->lock, key);
459 	} else {
460 		k_spin_unlock(&msgq->lock, key);
461 	}
462 }
463 
464 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_purge(struct k_msgq * msgq)465 static inline void z_vrfy_k_msgq_purge(struct k_msgq *msgq)
466 {
467 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
468 	z_impl_k_msgq_purge(msgq);
469 }
470 #include <zephyr/syscalls/k_msgq_purge_mrsh.c>
471 
z_vrfy_k_msgq_num_free_get(struct k_msgq * msgq)472 static inline uint32_t z_vrfy_k_msgq_num_free_get(struct k_msgq *msgq)
473 {
474 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
475 	return z_impl_k_msgq_num_free_get(msgq);
476 }
477 #include <zephyr/syscalls/k_msgq_num_free_get_mrsh.c>
478 
z_vrfy_k_msgq_num_used_get(struct k_msgq * msgq)479 static inline uint32_t z_vrfy_k_msgq_num_used_get(struct k_msgq *msgq)
480 {
481 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
482 	return z_impl_k_msgq_num_used_get(msgq);
483 }
484 #include <zephyr/syscalls/k_msgq_num_used_get_mrsh.c>
485 
486 #endif /* CONFIG_USERSPACE */
487 
488 #ifdef CONFIG_OBJ_CORE_MSGQ
init_msgq_obj_core_list(void)489 static int init_msgq_obj_core_list(void)
490 {
491 	/* Initialize msgq object type */
492 
493 	z_obj_type_init(&obj_type_msgq, K_OBJ_TYPE_MSGQ_ID,
494 			offsetof(struct k_msgq, obj_core));
495 
496 	/* Initialize and link statically defined message queues */
497 
498 	STRUCT_SECTION_FOREACH(k_msgq, msgq) {
499 		k_obj_core_init_and_link(K_OBJ_CORE(msgq), &obj_type_msgq);
500 	}
501 
502 	return 0;
503 };
504 
505 SYS_INIT(init_msgq_obj_core_list, PRE_KERNEL_1,
506 	 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
507 
508 #endif /* CONFIG_OBJ_CORE_MSGQ */
509