1 /*
2  * Copyright (c) 2018 Intel Corporation
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 #include <zephyr/kernel.h>
7 #include <errno.h>
8 #include <string.h>
9 #include <zephyr/sys/atomic.h>
10 #include <zephyr/posix/time.h>
11 #include <zephyr/posix/mqueue.h>
12 
13 typedef struct mqueue_object {
14 	sys_snode_t snode;
15 	char *mem_buffer;
16 	char *mem_obj;
17 	struct k_msgq queue;
18 	atomic_t ref_count;
19 	char *name;
20 } mqueue_object;
21 
22 typedef struct mqueue_desc {
23 	char *mem_desc;
24 	mqueue_object *mqueue;
25 	uint32_t  flags;
26 } mqueue_desc;
27 
28 K_SEM_DEFINE(mq_sem, 1, 1);
29 
30 /* Initialize the list */
31 sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list);
32 
33 int64_t timespec_to_timeoutms(const struct timespec *abstime);
34 static mqueue_object *find_in_list(const char *name);
35 static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
36 			  k_timeout_t timeout);
37 static int receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
38 			   k_timeout_t timeout);
39 static void remove_mq(mqueue_object *msg_queue);
40 
41 #if defined(__sparc__)
42 /*
43  * mode_t is defined as "unsigned short" on SPARC newlib. This type is promoted
44  * to "int" when passed through '...' so we should pass the promoted type to
45  * va_arg().
46  */
47 #define PROMOTED_MODE_T int
48 #else
49 #define PROMOTED_MODE_T mode_t
50 #endif
51 
52 /**
53  * @brief Open a message queue.
54  *
55  * Number of message queue and descriptor to message queue are limited by
56  * heap size. increase the size through CONFIG_HEAP_MEM_POOL_SIZE.
57  *
58  * See IEEE 1003.1
59  */
mq_open(const char * name,int oflags,...)60 mqd_t mq_open(const char *name, int oflags, ...)
61 {
62 	va_list va;
63 	mode_t mode;
64 	struct mq_attr *attrs = NULL;
65 	long msg_size = 0U, max_msgs = 0U;
66 	mqueue_object *msg_queue;
67 	mqueue_desc *msg_queue_desc = NULL, *mqd = (mqueue_desc *)(-1);
68 	char *mq_desc_ptr, *mq_obj_ptr, *mq_buf_ptr, *mq_name_ptr;
69 
70 	va_start(va, oflags);
71 	if ((oflags & O_CREAT) != 0) {
72 		mode = va_arg(va, PROMOTED_MODE_T);
73 		attrs = va_arg(va, struct mq_attr*);
74 	}
75 	va_end(va);
76 
77 	if (attrs != NULL) {
78 		msg_size = attrs->mq_msgsize;
79 		max_msgs = attrs->mq_maxmsg;
80 	}
81 
82 	if ((name == NULL) || ((oflags & O_CREAT) != 0 && (msg_size <= 0 ||
83 						      max_msgs <= 0))) {
84 		errno = EINVAL;
85 		return (mqd_t)mqd;
86 	}
87 
88 	if ((strlen(name) + 1)  > CONFIG_MQUEUE_NAMELEN_MAX) {
89 		errno = ENAMETOOLONG;
90 		return (mqd_t)mqd;
91 	}
92 
93 	/* Check if queue already exists */
94 	k_sem_take(&mq_sem, K_FOREVER);
95 	msg_queue = find_in_list(name);
96 	k_sem_give(&mq_sem);
97 
98 	if ((msg_queue != NULL) && (oflags & O_CREAT) != 0 &&
99 	    (oflags & O_EXCL) != 0) {
100 		/* Message queue has already been opened and O_EXCL is set */
101 		errno = EEXIST;
102 		return (mqd_t)mqd;
103 	}
104 
105 	if ((msg_queue == NULL) && (oflags & O_CREAT) == 0) {
106 		errno = ENOENT;
107 		return (mqd_t)mqd;
108 	}
109 
110 	mq_desc_ptr = k_malloc(sizeof(struct mqueue_desc));
111 	if (mq_desc_ptr != NULL) {
112 		(void)memset(mq_desc_ptr, 0, sizeof(struct mqueue_desc));
113 		msg_queue_desc = (struct mqueue_desc *)mq_desc_ptr;
114 		msg_queue_desc->mem_desc = mq_desc_ptr;
115 	} else {
116 		goto free_mq_desc;
117 	}
118 
119 
120 	/* Allocate mqueue object for new message queue */
121 	if (msg_queue == NULL) {
122 
123 		/* Check for message quantity and size in message queue */
124 		if (attrs->mq_msgsize > CONFIG_MSG_SIZE_MAX &&
125 		    attrs->mq_maxmsg > CONFIG_MSG_COUNT_MAX) {
126 			goto free_mq_desc;
127 		}
128 
129 		mq_obj_ptr = k_malloc(sizeof(mqueue_object));
130 		if (mq_obj_ptr != NULL) {
131 			(void)memset(mq_obj_ptr, 0, sizeof(mqueue_object));
132 			msg_queue = (mqueue_object *)mq_obj_ptr;
133 			msg_queue->mem_obj = mq_obj_ptr;
134 
135 		} else {
136 			goto free_mq_object;
137 		}
138 
139 		mq_name_ptr = k_malloc(strlen(name) + 1);
140 		if (mq_name_ptr != NULL) {
141 			(void)memset(mq_name_ptr, 0, strlen(name) + 1);
142 			msg_queue->name = mq_name_ptr;
143 
144 		} else {
145 			goto free_mq_name;
146 		}
147 
148 		strcpy(msg_queue->name, name);
149 
150 		mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(uint8_t));
151 		if (mq_buf_ptr != NULL) {
152 			(void)memset(mq_buf_ptr, 0,
153 				     msg_size * max_msgs * sizeof(uint8_t));
154 			msg_queue->mem_buffer = mq_buf_ptr;
155 		} else {
156 			goto free_mq_buffer;
157 		}
158 
159 		(void)atomic_set(&msg_queue->ref_count, 1);
160 		/* initialize zephyr message queue */
161 		k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size,
162 			    max_msgs);
163 		k_sem_take(&mq_sem, K_FOREVER);
164 		sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode));
165 		k_sem_give(&mq_sem);
166 
167 	} else {
168 		atomic_inc(&msg_queue->ref_count);
169 	}
170 
171 	msg_queue_desc->mqueue = msg_queue;
172 	msg_queue_desc->flags = (oflags & O_NONBLOCK) != 0 ? O_NONBLOCK : 0;
173 	return (mqd_t)msg_queue_desc;
174 
175 free_mq_buffer:
176 	k_free(mq_name_ptr);
177 free_mq_name:
178 	k_free(mq_obj_ptr);
179 free_mq_object:
180 	k_free(mq_desc_ptr);
181 free_mq_desc:
182 	errno = ENOSPC;
183 	return (mqd_t)mqd;
184 }
185 
186 /**
187  * @brief Close a message queue descriptor.
188  *
189  * See IEEE 1003.1
190  */
mq_close(mqd_t mqdes)191 int mq_close(mqd_t mqdes)
192 {
193 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
194 
195 	if (mqd == NULL) {
196 		errno = EBADF;
197 		return -1;
198 	}
199 
200 	atomic_dec(&mqd->mqueue->ref_count);
201 
202 	/* remove mq if marked for unlink */
203 	if (mqd->mqueue->name == NULL) {
204 		remove_mq(mqd->mqueue);
205 	}
206 
207 	k_free(mqd->mem_desc);
208 	return 0;
209 }
210 
211 /**
212  * @brief Remove a message queue.
213  *
214  * See IEEE 1003.1
215  */
mq_unlink(const char * name)216 int mq_unlink(const char *name)
217 {
218 	mqueue_object *msg_queue;
219 
220 	k_sem_take(&mq_sem, K_FOREVER);
221 	msg_queue = find_in_list(name);
222 
223 	if (msg_queue == NULL) {
224 		k_sem_give(&mq_sem);
225 		errno = EBADF;
226 		return -1;
227 	}
228 
229 	k_free(msg_queue->name);
230 	msg_queue->name = NULL;
231 	k_sem_give(&mq_sem);
232 	remove_mq(msg_queue);
233 	return 0;
234 }
235 
236 /**
237  * @brief Send a message to a message queue.
238  *
239  * All messages in message queue are of equal priority.
240  *
241  * See IEEE 1003.1
242  */
mq_send(mqd_t mqdes,const char * msg_ptr,size_t msg_len,unsigned int msg_prio)243 int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
244 	    unsigned int msg_prio)
245 {
246 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
247 
248 	return send_message(mqd, msg_ptr, msg_len, K_FOREVER);
249 }
250 
251 /**
252  * @brief Send message to a message queue within abstime time.
253  *
254  * All messages in message queue are of equal priority.
255  *
256  * See IEEE 1003.1
257  */
mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,unsigned int msg_prio,const struct timespec * abstime)258 int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
259 		 unsigned int msg_prio, const struct timespec *abstime)
260 {
261 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
262 	int32_t timeout = (int32_t) timespec_to_timeoutms(abstime);
263 
264 	return send_message(mqd, msg_ptr, msg_len, K_MSEC(timeout));
265 }
266 
267 /**
268  * @brief Receive a message from a message queue.
269  *
270  * All messages in message queue are of equal priority.
271  *
272  * See IEEE 1003.1
273  */
mq_receive(mqd_t mqdes,char * msg_ptr,size_t msg_len,unsigned int * msg_prio)274 int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
275 		   unsigned int *msg_prio)
276 {
277 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
278 
279 	return receive_message(mqd, msg_ptr, msg_len, K_FOREVER);
280 }
281 
282 /**
283  * @brief Receive message from a message queue within abstime time.
284  *
285  * All messages in message queue are of equal priority.
286  *
287  * See IEEE 1003.1
288  */
mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,unsigned int * msg_prio,const struct timespec * abstime)289 int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
290 			unsigned int *msg_prio, const struct timespec *abstime)
291 {
292 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
293 	int32_t timeout = (int32_t) timespec_to_timeoutms(abstime);
294 
295 	return receive_message(mqd, msg_ptr, msg_len, K_MSEC(timeout));
296 }
297 
298 /**
299  * @brief Get message queue attributes.
300  *
301  * See IEEE 1003.1
302  */
mq_getattr(mqd_t mqdes,struct mq_attr * mqstat)303 int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
304 {
305 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
306 	struct k_msgq_attrs attrs;
307 
308 	if (mqd == NULL) {
309 		errno = EBADF;
310 		return -1;
311 	}
312 
313 	k_sem_take(&mq_sem, K_FOREVER);
314 	k_msgq_get_attrs(&mqd->mqueue->queue, &attrs);
315 	mqstat->mq_flags = mqd->flags;
316 	mqstat->mq_maxmsg = attrs.max_msgs;
317 	mqstat->mq_msgsize = attrs.msg_size;
318 	mqstat->mq_curmsgs = attrs.used_msgs;
319 	k_sem_give(&mq_sem);
320 	return 0;
321 }
322 
323 /**
324  * @brief Set message queue attributes.
325  *
326  * See IEEE 1003.1
327  */
mq_setattr(mqd_t mqdes,const struct mq_attr * mqstat,struct mq_attr * omqstat)328 int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
329 	       struct mq_attr *omqstat)
330 {
331 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
332 
333 	if (mqd == NULL) {
334 		errno = EBADF;
335 		return -1;
336 	}
337 
338 	if (mqstat->mq_flags != 0 && mqstat->mq_flags != O_NONBLOCK) {
339 		errno = EINVAL;
340 		return -1;
341 	}
342 
343 	if (omqstat != NULL) {
344 		mq_getattr(mqdes, omqstat);
345 	}
346 
347 	k_sem_take(&mq_sem, K_FOREVER);
348 	mqd->flags = mqstat->mq_flags;
349 	k_sem_give(&mq_sem);
350 
351 	return 0;
352 }
353 
354 /* Internal functions */
find_in_list(const char * name)355 static mqueue_object *find_in_list(const char *name)
356 {
357 	sys_snode_t *mq;
358 	mqueue_object *msg_queue;
359 
360 	mq = mq_list.head;
361 
362 	while (mq != NULL) {
363 		msg_queue = (mqueue_object *)mq;
364 		if (strcmp(msg_queue->name, name) == 0) {
365 			return msg_queue;
366 		}
367 
368 		mq = mq->next;
369 	}
370 
371 	return NULL;
372 }
373 
send_message(mqueue_desc * mqd,const char * msg_ptr,size_t msg_len,k_timeout_t timeout)374 static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
375 			  k_timeout_t timeout)
376 {
377 	int32_t ret = -1;
378 
379 	if (mqd == NULL) {
380 		errno = EBADF;
381 		return ret;
382 	}
383 
384 	if ((mqd->flags & O_NONBLOCK) != 0U) {
385 		timeout = K_NO_WAIT;
386 	}
387 
388 	if (msg_len >  mqd->mqueue->queue.msg_size) {
389 		errno = EMSGSIZE;
390 		return ret;
391 	}
392 
393 	if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
394 		errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
395 		return ret;
396 	}
397 
398 	return 0;
399 }
400 
receive_message(mqueue_desc * mqd,char * msg_ptr,size_t msg_len,k_timeout_t timeout)401 static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
402 			     k_timeout_t timeout)
403 {
404 	int ret = -1;
405 
406 	if (mqd == NULL) {
407 		errno = EBADF;
408 		return ret;
409 	}
410 
411 	if (msg_len < mqd->mqueue->queue.msg_size) {
412 		errno = EMSGSIZE;
413 		return ret;
414 	}
415 
416 	if ((mqd->flags & O_NONBLOCK) != 0U) {
417 		timeout = K_NO_WAIT;
418 	}
419 
420 	if (k_msgq_get(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
421 		errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
422 	} else {
423 		ret = mqd->mqueue->queue.msg_size;
424 	}
425 
426 	return ret;
427 }
428 
remove_mq(mqueue_object * msg_queue)429 static void remove_mq(mqueue_object *msg_queue)
430 {
431 	if (atomic_cas(&msg_queue->ref_count, 0, 0)) {
432 		k_sem_take(&mq_sem, K_FOREVER);
433 		sys_slist_find_and_remove(&mq_list, (sys_snode_t *) msg_queue);
434 		k_sem_give(&mq_sem);
435 
436 		/* Free mq buffer and pbject */
437 		k_free(msg_queue->mem_buffer);
438 		k_free(msg_queue->mem_obj);
439 	}
440 }
441