1 /*
2  * Copyright (c) 2016 Wind River Systems, Inc.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 /**
8  * @file
9  * @brief Message queues.
10  */
11 
12 
13 #include <kernel.h>
14 #include <kernel_structs.h>
15 
16 #include <toolchain.h>
17 #include <linker/sections.h>
18 #include <string.h>
19 #include <ksched.h>
20 #include <wait_q.h>
21 #include <sys/dlist.h>
22 #include <sys/math_extras.h>
23 #include <init.h>
24 #include <syscall_handler.h>
25 #include <kernel_internal.h>
26 #include <sys/check.h>
27 
28 #ifdef CONFIG_POLL
handle_poll_events(struct k_msgq * msgq,uint32_t state)29 static inline void handle_poll_events(struct k_msgq *msgq, uint32_t state)
30 {
31 	z_handle_obj_poll_events(&msgq->poll_events, state);
32 }
33 #endif /* CONFIG_POLL */
34 
k_msgq_init(struct k_msgq * msgq,char * buffer,size_t msg_size,uint32_t max_msgs)35 void k_msgq_init(struct k_msgq *msgq, char *buffer, size_t msg_size,
36 		 uint32_t max_msgs)
37 {
38 	msgq->msg_size = msg_size;
39 	msgq->max_msgs = max_msgs;
40 	msgq->buffer_start = buffer;
41 	msgq->buffer_end = buffer + (max_msgs * msg_size);
42 	msgq->read_ptr = buffer;
43 	msgq->write_ptr = buffer;
44 	msgq->used_msgs = 0;
45 	msgq->flags = 0;
46 	z_waitq_init(&msgq->wait_q);
47 	msgq->lock = (struct k_spinlock) {};
48 #ifdef CONFIG_POLL
49 	sys_dlist_init(&msgq->poll_events);
50 #endif	/* CONFIG_POLL */
51 
52 	SYS_PORT_TRACING_OBJ_INIT(k_msgq, msgq);
53 
54 	z_object_init(msgq);
55 }
56 
z_impl_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)57 int z_impl_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
58 			    uint32_t max_msgs)
59 {
60 	void *buffer;
61 	int ret;
62 	size_t total_size;
63 
64 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, alloc_init, msgq);
65 
66 	if (size_mul_overflow(msg_size, max_msgs, &total_size)) {
67 		ret = -EINVAL;
68 	} else {
69 		buffer = z_thread_malloc(total_size);
70 		if (buffer != NULL) {
71 			k_msgq_init(msgq, buffer, msg_size, max_msgs);
72 			msgq->flags = K_MSGQ_FLAG_ALLOC;
73 			ret = 0;
74 		} else {
75 			ret = -ENOMEM;
76 		}
77 	}
78 
79 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, alloc_init, msgq, ret);
80 
81 	return ret;
82 }
83 
84 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_alloc_init(struct k_msgq * msgq,size_t msg_size,uint32_t max_msgs)85 int z_vrfy_k_msgq_alloc_init(struct k_msgq *msgq, size_t msg_size,
86 			    uint32_t max_msgs)
87 {
88 	Z_OOPS(Z_SYSCALL_OBJ_NEVER_INIT(msgq, K_OBJ_MSGQ));
89 
90 	return z_impl_k_msgq_alloc_init(msgq, msg_size, max_msgs);
91 }
92 #include <syscalls/k_msgq_alloc_init_mrsh.c>
93 #endif
94 
k_msgq_cleanup(struct k_msgq * msgq)95 int k_msgq_cleanup(struct k_msgq *msgq)
96 {
97 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, cleanup, msgq);
98 
99 	CHECKIF(z_waitq_head(&msgq->wait_q) != NULL) {
100 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, -EBUSY);
101 
102 		return -EBUSY;
103 	}
104 
105 	if ((msgq->flags & K_MSGQ_FLAG_ALLOC) != 0U) {
106 		k_free(msgq->buffer_start);
107 		msgq->flags &= ~K_MSGQ_FLAG_ALLOC;
108 	}
109 
110 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, cleanup, msgq, 0);
111 
112 	return 0;
113 }
114 
115 
z_impl_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)116 int z_impl_k_msgq_put(struct k_msgq *msgq, const void *data, k_timeout_t timeout)
117 {
118 	__ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
119 
120 	struct k_thread *pending_thread;
121 	k_spinlock_key_t key;
122 	int result;
123 
124 	key = k_spin_lock(&msgq->lock);
125 
126 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, put, msgq, timeout);
127 
128 	if (msgq->used_msgs < msgq->max_msgs) {
129 		/* message queue isn't full */
130 		pending_thread = z_unpend_first_thread(&msgq->wait_q);
131 		if (pending_thread != NULL) {
132 			SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, 0);
133 
134 			/* give message to waiting thread */
135 			(void)memcpy(pending_thread->base.swap_data, data,
136 			       msgq->msg_size);
137 			/* wake up waiting thread */
138 			arch_thread_return_value_set(pending_thread, 0);
139 			z_ready_thread(pending_thread);
140 			z_reschedule(&msgq->lock, key);
141 			return 0;
142 		} else {
143 			/* put message in queue */
144 			(void)memcpy(msgq->write_ptr, data, msgq->msg_size);
145 			msgq->write_ptr += msgq->msg_size;
146 			if (msgq->write_ptr == msgq->buffer_end) {
147 				msgq->write_ptr = msgq->buffer_start;
148 			}
149 			msgq->used_msgs++;
150 #ifdef CONFIG_POLL
151 			handle_poll_events(msgq, K_POLL_STATE_MSGQ_DATA_AVAILABLE);
152 #endif /* CONFIG_POLL */
153 		}
154 		result = 0;
155 	} else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
156 		/* don't wait for message space to become available */
157 		result = -ENOMSG;
158 	} else {
159 		SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, put, msgq, timeout);
160 
161 		/* wait for put message success, failure, or timeout */
162 		_current->base.swap_data = (void *) data;
163 
164 		result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
165 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
166 		return result;
167 	}
168 
169 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, put, msgq, timeout, result);
170 
171 	k_spin_unlock(&msgq->lock, key);
172 
173 	return result;
174 }
175 
176 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_put(struct k_msgq * msgq,const void * data,k_timeout_t timeout)177 static inline int z_vrfy_k_msgq_put(struct k_msgq *msgq, const void *data,
178 				    k_timeout_t timeout)
179 {
180 	Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
181 	Z_OOPS(Z_SYSCALL_MEMORY_READ(data, msgq->msg_size));
182 
183 	return z_impl_k_msgq_put(msgq, data, timeout);
184 }
185 #include <syscalls/k_msgq_put_mrsh.c>
186 #endif
187 
z_impl_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)188 void z_impl_k_msgq_get_attrs(struct k_msgq *msgq, struct k_msgq_attrs *attrs)
189 {
190 	attrs->msg_size = msgq->msg_size;
191 	attrs->max_msgs = msgq->max_msgs;
192 	attrs->used_msgs = msgq->used_msgs;
193 }
194 
195 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get_attrs(struct k_msgq * msgq,struct k_msgq_attrs * attrs)196 static inline void z_vrfy_k_msgq_get_attrs(struct k_msgq *msgq,
197 					   struct k_msgq_attrs *attrs)
198 {
199 	Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
200 	Z_OOPS(Z_SYSCALL_MEMORY_WRITE(attrs, sizeof(struct k_msgq_attrs)));
201 	z_impl_k_msgq_get_attrs(msgq, attrs);
202 }
203 #include <syscalls/k_msgq_get_attrs_mrsh.c>
204 #endif
205 
z_impl_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)206 int z_impl_k_msgq_get(struct k_msgq *msgq, void *data, k_timeout_t timeout)
207 {
208 	__ASSERT(!arch_is_in_isr() || K_TIMEOUT_EQ(timeout, K_NO_WAIT), "");
209 
210 	k_spinlock_key_t key;
211 	struct k_thread *pending_thread;
212 	int result;
213 
214 	key = k_spin_lock(&msgq->lock);
215 
216 	SYS_PORT_TRACING_OBJ_FUNC_ENTER(k_msgq, get, msgq, timeout);
217 
218 	if (msgq->used_msgs > 0U) {
219 		/* take first available message from queue */
220 		(void)memcpy(data, msgq->read_ptr, msgq->msg_size);
221 		msgq->read_ptr += msgq->msg_size;
222 		if (msgq->read_ptr == msgq->buffer_end) {
223 			msgq->read_ptr = msgq->buffer_start;
224 		}
225 		msgq->used_msgs--;
226 
227 		/* handle first thread waiting to write (if any) */
228 		pending_thread = z_unpend_first_thread(&msgq->wait_q);
229 		if (pending_thread != NULL) {
230 			SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
231 
232 			/* add thread's message to queue */
233 			(void)memcpy(msgq->write_ptr, pending_thread->base.swap_data,
234 			       msgq->msg_size);
235 			msgq->write_ptr += msgq->msg_size;
236 			if (msgq->write_ptr == msgq->buffer_end) {
237 				msgq->write_ptr = msgq->buffer_start;
238 			}
239 			msgq->used_msgs++;
240 
241 			/* wake up waiting thread */
242 			arch_thread_return_value_set(pending_thread, 0);
243 			z_ready_thread(pending_thread);
244 			z_reschedule(&msgq->lock, key);
245 
246 			SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, 0);
247 
248 			return 0;
249 		}
250 		result = 0;
251 	} else if (K_TIMEOUT_EQ(timeout, K_NO_WAIT)) {
252 		/* don't wait for a message to become available */
253 		result = -ENOMSG;
254 	} else {
255 		SYS_PORT_TRACING_OBJ_FUNC_BLOCKING(k_msgq, get, msgq, timeout);
256 
257 		/* wait for get message success or timeout */
258 		_current->base.swap_data = data;
259 
260 		result = z_pend_curr(&msgq->lock, key, &msgq->wait_q, timeout);
261 		SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
262 		return result;
263 	}
264 
265 	SYS_PORT_TRACING_OBJ_FUNC_EXIT(k_msgq, get, msgq, timeout, result);
266 
267 	k_spin_unlock(&msgq->lock, key);
268 
269 	return result;
270 }
271 
272 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_get(struct k_msgq * msgq,void * data,k_timeout_t timeout)273 static inline int z_vrfy_k_msgq_get(struct k_msgq *msgq, void *data,
274 				    k_timeout_t timeout)
275 {
276 	Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
277 	Z_OOPS(Z_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
278 
279 	return z_impl_k_msgq_get(msgq, data, timeout);
280 }
281 #include <syscalls/k_msgq_get_mrsh.c>
282 #endif
283 
z_impl_k_msgq_peek(struct k_msgq * msgq,void * data)284 int z_impl_k_msgq_peek(struct k_msgq *msgq, void *data)
285 {
286 	k_spinlock_key_t key;
287 	int result;
288 
289 	key = k_spin_lock(&msgq->lock);
290 
291 	if (msgq->used_msgs > 0U) {
292 		/* take first available message from queue */
293 		(void)memcpy(data, msgq->read_ptr, msgq->msg_size);
294 		result = 0;
295 	} else {
296 		/* don't wait for a message to become available */
297 		result = -ENOMSG;
298 	}
299 
300 	SYS_PORT_TRACING_OBJ_FUNC(k_msgq, peek, msgq, result);
301 
302 	k_spin_unlock(&msgq->lock, key);
303 
304 	return result;
305 }
306 
307 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_peek(struct k_msgq * msgq,void * data)308 static inline int z_vrfy_k_msgq_peek(struct k_msgq *msgq, void *data)
309 {
310 	Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
311 	Z_OOPS(Z_SYSCALL_MEMORY_WRITE(data, msgq->msg_size));
312 
313 	return z_impl_k_msgq_peek(msgq, data);
314 }
315 #include <syscalls/k_msgq_peek_mrsh.c>
316 #endif
317 
z_impl_k_msgq_purge(struct k_msgq * msgq)318 void z_impl_k_msgq_purge(struct k_msgq *msgq)
319 {
320 	k_spinlock_key_t key;
321 	struct k_thread *pending_thread;
322 
323 	key = k_spin_lock(&msgq->lock);
324 
325 	SYS_PORT_TRACING_OBJ_FUNC(k_msgq, purge, msgq);
326 
327 	/* wake up any threads that are waiting to write */
328 	while ((pending_thread = z_unpend_first_thread(&msgq->wait_q)) != NULL) {
329 		arch_thread_return_value_set(pending_thread, -ENOMSG);
330 		z_ready_thread(pending_thread);
331 	}
332 
333 	msgq->used_msgs = 0;
334 	msgq->read_ptr = msgq->write_ptr;
335 
336 	z_reschedule(&msgq->lock, key);
337 }
338 
339 #ifdef CONFIG_USERSPACE
z_vrfy_k_msgq_purge(struct k_msgq * msgq)340 static inline void z_vrfy_k_msgq_purge(struct k_msgq *msgq)
341 {
342 	Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
343 	z_impl_k_msgq_purge(msgq);
344 }
345 #include <syscalls/k_msgq_purge_mrsh.c>
346 
z_vrfy_k_msgq_num_free_get(struct k_msgq * msgq)347 static inline uint32_t z_vrfy_k_msgq_num_free_get(struct k_msgq *msgq)
348 {
349 	Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
350 	return z_impl_k_msgq_num_free_get(msgq);
351 }
352 #include <syscalls/k_msgq_num_free_get_mrsh.c>
353 
z_vrfy_k_msgq_num_used_get(struct k_msgq * msgq)354 static inline uint32_t z_vrfy_k_msgq_num_used_get(struct k_msgq *msgq)
355 {
356 	Z_OOPS(Z_SYSCALL_OBJ(msgq, K_OBJ_MSGQ));
357 	return z_impl_k_msgq_num_used_get(msgq);
358 }
359 #include <syscalls/k_msgq_num_used_get_mrsh.c>
360 
361 #endif
362