1 /*
2  * Copyright (c) 2016 Wind River Systems, Inc.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 /**
8  * @file
9  * @brief Message queues.
10  */
11 
12 
13 #include <zephyr/kernel.h>
14 #include <zephyr/kernel_structs.h>
15 
16 #include <zephyr/toolchain.h>
17 #include <zephyr/linker/sections.h>
18 #include <string.h>
19 #include <ksched.h>
20 #include <wait_q.h>
21 #include <zephyr/sys/dlist.h>
22 #include <zephyr/sys/math_extras.h>
23 #include <zephyr/init.h>
24 #include <zephyr/internal/syscall_handler.h>
25 #include <kernel_internal.h>
26 #include <zephyr/sys/check.h>
27 
28 #ifdef CONFIG_OBJ_CORE_MSGQ
29 static struct k_obj_type obj_type_msgq;
30 #endif /* CONFIG_OBJ_CORE_MSGQ */
31 
32 #ifdef CONFIG_POLL
handle_poll_events(struct k_msgq * msgq,uint32_t state)33 static inline void handle_poll_events(struct k_msgq *msgq, uint32_t state)
34 {
35 	z_handle_obj_poll_events(&msgq->poll_events, state);
36 }
37 #endif /* CONFIG_POLL */
38 
k_msgq_init(struct k_msgq * msgq,char * buffer,size_t msg_size,uint32_t max_msgs)39 void k_msgq_init(struct k_msgq *msgq, char *buffer, size_t msg_size,
40 		 uint32_t max_msgs)
41 {
42 	msgq->msg_size = msg_size;
43 	msgq->max_msgs = max_msgs;
44 	msgq->buffer_start = buffer;
45 	msgq->buffer_end = buffer + (max_msgs * msg_size);
46 	msgq->read_ptr = buffer;
47 	msgq->write_ptr = buffer;
48 	msgq->used_msgs = 0;
49 	msgq->flags = 0;
50 	z_waitq_init(&msgq->wait_q);
51 	msgq->lock = (struct k_spinlock) {};
52 #ifdef CONFIG_POLL
53 	sys_dlist_init(&msgq->poll_events);
54 #endif	/* CONFIG_POLL */
55 
56 #ifdef CONFIG_OBJ_CORE_MSGQ
57 	k_obj_core_init_and_link(K_OBJ_CORE(msgq), &obj_type_msgq);
58 #endif /* CONFIG_OBJ_CORE_MSGQ */
59 
60 	SYS_PORT_TRACING_OBJ_INIT(k_msgq, msgq);
61 
62 	k_object_init(msgq);
63 }
64 
z_impl_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)65 int z_impl_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
66 			    uint32_t max_msgs)
67 {
68 	void *buffer;
69 	int ret;
70 	size_t total_size;
71 
72 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, alloc_init, msgq);
73 
74 	if (size_mul_overflow(msg_size, max_msgs, &total_size)) {
75 		ret = -EINVAL;
76 	} else {
77 		buffer = z_thread_malloc(total_size);
78 		if (buffer != NULL) {
79 			k_msgq_init(msgq, buffer, msg_size, max_msgs);
80 			msgq->flags = K_MSGQ_FLAG_ALLOC;
81 			ret = 0;
82 		} else {
83 			ret = -ENOMEM;
84 		}
85 	}
86 
87 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, alloc_init, msgq, ret);
88 
89 	return ret;
90 }
91 
92 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)93 int z_vrfy_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
94 			    uint32_t max_msgs)
95 {
96 	K_OOPS(K_SYSCALL_OBJ_NEVER_INIT(msgq, K_OBJ_MSGQ));
97 
98 	return z_impl_k_msgq_alloc_init(msgq, msg_size, max_msgs);
99 }
100 #include <zephyr/syscalls/k_msgq_alloc_init_mrsh.c>
101 #endif /* CONFIG_USERSPACE */
102 
k_msgq_cleanup(struct k_msgq * msgq)103 int k_msgq_cleanup(struct k_msgq *msgq)
104 {
105 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, cleanup, msgq);
106 
107 	CHECKIF(z_waitq_head(&msgq->wait_q) != NULL) {
108 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, -EBUSY);
109 
110 		return -EBUSY;
111 	}
112 
113 	if ((msgq->flags & K_MSGQ_FLAG_ALLOC) != 0U) {
114 		k_free(msgq->buffer_start);
115 		msgq->flags &= ~K_MSGQ_FLAG_ALLOC;
116 	}
117 
118 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, 0);
119 
120 	return 0;
121 }
122 
123 
z_impl_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)124 int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout)
125 {
126 	__ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
127 
128 	struct k_thread *pending_thread;
129 	k_spinlock_key_t key;
130 	int result;
131 
132 	key = k_spin_lock(&msgq->lock);
133 
134 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout);
135 
136 	if (msgq->used_msgs < msgq->max_msgs) {
137 		/* message queue isn't full */
138 		pending_thread = z_unpend_first_thread(&msgq->wait_q);
139 		if (unlikely(pending_thread != NULL)) {
140 			SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, 0);
141 
142 			/* give message to waiting thread */
143 			(void)memcpy(pending_thread->base.swap_data, data,
144 			       msgq->msg_size);
145 			/* wake up waiting thread */
146 			arch_thread_return_value_set(pending_thread, 0);
147 			z_ready_thread(pending_thread);
148 			z_reschedule(&msgq->lock, key);
149 			return 0;
150 		} else {
151 			/* put message in queue */
152 			__ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
153 					msgq->write_ptr < msgq->buffer_end);
154 			(void)memcpy(msgq->write_ptr, (char *)data, msgq->msg_size);
155 			msgq->write_ptr += msgq->msg_size;
156 			if (msgq->write_ptr == msgq->buffer_end) {
157 				msgq->write_ptr = msgq->buffer_start;
158 			}
159 			msgq->used_msgs++;
160 #ifdef CONFIG_POLL
161 			handle_poll_events(msgq, K_POLL_STATE_MSGQ_DATA_AVAILABLE);
162 #endif /* CONFIG_POLL */
163 		}
164 		result = 0;
165 	} else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
166 		/* don't wait for message space to become available */
167 		result = -ENOMSG;
168 	} else {
169 		SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout);
170 
171 		/* wait for put message success, failure, or timeout */
172 		arch_current_thread()->base.swap_data = (void *) data;
173 
174 		result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
175 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
176 		return result;
177 	}
178 
179 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
180 
181 	z_reschedule(&msgq->lock, key);
182 
183 	return result;
184 }
185 
186 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)187 static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
188 				    k_timeout_t timeout)
189 {
190 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
191 	K_OOPS(K_SYSCALL_MEMORY_READ(data, msgq->msg_size));
192 
193 	return z_impl_k_msgq_put(msgq, data, timeout);
194 }
195 #include <zephyr/syscalls/k_msgq_put_mrsh.c>
196 #endif /* CONFIG_USERSPACE */
197 
z_impl_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)198 void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs)
199 {
200 	attrs->msg_size = msgq->msg_size;
201 	attrs->max_msgs = msgq->max_msgs;
202 	attrs->used_msgs = msgq->used_msgs;
203 }
204 
205 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)206 static inline void z_vrfy_k_msgq_get_attrs(struct k_msgq *msgq,
207 					   struct k_msgq_attrs *attrs)
208 {
209 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
210 	K_OOPS(K_SYSCALL_MEMORY_WRITE(attrs, sizeof(struct k_msgq_attrs)));
211 	z_impl_k_msgq_get_attrs(msgq, attrs);
212 }
213 #include <zephyr/syscalls/k_msgq_get_attrs_mrsh.c>
214 #endif /* CONFIG_USERSPACE */
215 
z_impl_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)216 int z_impl_k_msgq_get(struct k_msgq *msgq, void *data, k_timeout_t timeout)
217 {
218 	__ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
219 
220 	k_spinlock_key_t key;
221 	struct k_thread *pending_thread;
222 	int result;
223 
224 	key = k_spin_lock(&msgq->lock);
225 
226 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, get, msgq, timeout);
227 
228 	if (msgq->used_msgs > 0U) {
229 		/* take first available message from queue */
230 		(void)memcpy((char *)data, msgq->read_ptr, msgq->msg_size);
231 		msgq->read_ptr += msgq->msg_size;
232 		if (msgq->read_ptr == msgq->buffer_end) {
233 			msgq->read_ptr = msgq->buffer_start;
234 		}
235 		msgq->used_msgs--;
236 
237 		/* handle first thread waiting to write (if any) */
238 		pending_thread = z_unpend_first_thread(&msgq->wait_q);
239 		if (unlikely(pending_thread != NULL)) {
240 			SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
241 
242 			/* add thread's message to queue */
243 			__ASSERT_NO_MSG(msgq->write_ptr >= msgq->buffer_start &&
244 					msgq->write_ptr < msgq->buffer_end);
245 			(void)memcpy(msgq->write_ptr, (char *)pending_thread->base.swap_data,
246 			       msgq->msg_size);
247 			msgq->write_ptr += msgq->msg_size;
248 			if (msgq->write_ptr == msgq->buffer_end) {
249 				msgq->write_ptr = msgq->buffer_start;
250 			}
251 			msgq->used_msgs++;
252 
253 			/* wake up waiting thread */
254 			arch_thread_return_value_set(pending_thread, 0);
255 			z_ready_thread(pending_thread);
256 			z_reschedule(&msgq->lock, key);
257 
258 			SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, 0);
259 
260 			return 0;
261 		}
262 		result = 0;
263 	} else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
264 		/* don't wait for a message to become available */
265 		result = -ENOMSG;
266 	} else {
267 		SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
268 
269 		/* wait for get message success or timeout */
270 		arch_current_thread()->base.swap_data = data;
271 
272 		result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
273 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
274 		return result;
275 	}
276 
277 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
278 
279 	k_spin_unlock(&msgq->lock, key);
280 
281 	return result;
282 }
283 
284 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)285 static inline int z_vrfy_k_msgq_get(struct k_msgq *msgq, void *data,
286 				    k_timeout_t timeout)
287 {
288 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
289 	K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
290 
291 	return z_impl_k_msgq_get(msgq, data, timeout);
292 }
293 #include <zephyr/syscalls/k_msgq_get_mrsh.c>
294 #endif /* CONFIG_USERSPACE */
295 
z_impl_k_msgq_peek(struct k_msgq * msgq,void * data)296 int z_impl_k_msgq_peek(struct k_msgq *msgq, void *data)
297 {
298 	k_spinlock_key_t key;
299 	int result;
300 
301 	key = k_spin_lock(&msgq->lock);
302 
303 	if (msgq->used_msgs > 0U) {
304 		/* take first available message from queue */
305 		(void)memcpy((char *)data, msgq->read_ptr, msgq->msg_size);
306 		result = 0;
307 	} else {
308 		/* don't wait for a message to become available */
309 		result = -ENOMSG;
310 	}
311 
312 	SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
313 
314 	k_spin_unlock(&msgq->lock, key);
315 
316 	return result;
317 }
318 
319 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek(struct k_msgq * msgq,void * data)320 static inline int z_vrfy_k_msgq_peek(struct k_msgq *msgq, void *data)
321 {
322 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
323 	K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
324 
325 	return z_impl_k_msgq_peek(msgq, data);
326 }
327 #include <zephyr/syscalls/k_msgq_peek_mrsh.c>
328 #endif /* CONFIG_USERSPACE */
329 
z_impl_k_msgq_peek_at(struct k_msgq * msgq,void * data,uint32_t idx)330 int z_impl_k_msgq_peek_at(struct k_msgq *msgq, void *data, uint32_t idx)
331 {
332 	k_spinlock_key_t key;
333 	int result;
334 	uint32_t bytes_to_end;
335 	uint32_t byte_offset;
336 	char *start_addr;
337 
338 	key = k_spin_lock(&msgq->lock);
339 
340 	if (msgq->used_msgs > idx) {
341 		bytes_to_end = (msgq->buffer_end - msgq->read_ptr);
342 		byte_offset = idx * msgq->msg_size;
343 		start_addr = msgq->read_ptr;
344 		/* check item available in start/end of ring buffer */
345 		if (bytes_to_end <= byte_offset) {
346 			/* Tweak the values in case */
347 			byte_offset -= bytes_to_end;
348 			/* wrap-around is required */
349 			start_addr = msgq->buffer_start;
350 		}
351 		(void)memcpy(data, start_addr + byte_offset, msgq->msg_size);
352 		result = 0;
353 	} else {
354 		/* don't wait for a message to become available */
355 		result = -ENOMSG;
356 	}
357 
358 	SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
359 
360 	k_spin_unlock(&msgq->lock, key);
361 
362 	return result;
363 }
364 
365 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek_at(struct k_msgq * msgq,void * data,uint32_t idx)366 static inline int z_vrfy_k_msgq_peek_at(struct k_msgq *msgq, void *data, uint32_t idx)
367 {
368 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
369 	K_OOPS(K_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
370 
371 	return z_impl_k_msgq_peek_at(msgq, data, idx);
372 }
373 #include <zephyr/syscalls/k_msgq_peek_at_mrsh.c>
374 #endif /* CONFIG_USERSPACE */
375 
z_impl_k_msgq_purge(struct k_msgq * msgq)376 void z_impl_k_msgq_purge(struct k_msgq *msgq)
377 {
378 	k_spinlock_key_t key;
379 	struct k_thread *pending_thread;
380 
381 	key = k_spin_lock(&msgq->lock);
382 
383 	SYS_PORT_TRACING_OBJ_FUNC(k_msgq, purge, msgq);
384 
385 	/* wake up any threads that are waiting to write */
386 	for (pending_thread = z_unpend_first_thread(&msgq->wait_q); pending_thread != NULL;
387 		 pending_thread = z_unpend_first_thread(&msgq->wait_q)) {
388 		arch_thread_return_value_set(pending_thread, -ENOMSG);
389 		z_ready_thread(pending_thread);
390 	}
391 
392 	msgq->used_msgs = 0;
393 	msgq->read_ptr = msgq->write_ptr;
394 
395 	z_reschedule(&msgq->lock, key);
396 }
397 
398 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_purge(struct k_msgq * msgq)399 static inline void z_vrfy_k_msgq_purge(struct k_msgq *msgq)
400 {
401 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
402 	z_impl_k_msgq_purge(msgq);
403 }
404 #include <zephyr/syscalls/k_msgq_purge_mrsh.c>
405 
z_vrfy_k_msgq_num_free_get(struct k_msgq * msgq)406 static inline uint32_t z_vrfy_k_msgq_num_free_get(struct k_msgq *msgq)
407 {
408 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
409 	return z_impl_k_msgq_num_free_get(msgq);
410 }
411 #include <zephyr/syscalls/k_msgq_num_free_get_mrsh.c>
412 
z_vrfy_k_msgq_num_used_get(struct k_msgq * msgq)413 static inline uint32_t z_vrfy_k_msgq_num_used_get(struct k_msgq *msgq)
414 {
415 	K_OOPS(K_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
416 	return z_impl_k_msgq_num_used_get(msgq);
417 }
418 #include <zephyr/syscalls/k_msgq_num_used_get_mrsh.c>
419 
420 #endif /* CONFIG_USERSPACE */
421 
422 #ifdef CONFIG_OBJ_CORE_MSGQ
init_msgq_obj_core_list(void)423 static int init_msgq_obj_core_list(void)
424 {
425 	/* Initialize msgq object type */
426 
427 	z_obj_type_init(&obj_type_msgq, K_OBJ_TYPE_MSGQ_ID,
428 			offsetof(struct k_msgq, obj_core));
429 
430 	/* Initialize and link statically defined message queues */
431 
432 	STRUCT_SECTION_FOREACH(k_msgq, msgq) {
433 		k_obj_core_init_and_link(K_OBJ_CORE(msgq), &obj_type_msgq);
434 	}
435 
436 	return 0;
437 };
438 
439 SYS_INIT(init_msgq_obj_core_list, PRE_KERNEL_1,
440 	 CONFIG_KERNEL_INIT_PRIORITY_OBJECTS);
441 
442 #endif /* CONFIG_OBJ_CORE_MSGQ */
443