1 /*
2  * Copyright (c) 2018 Intel Corporation
3  * Copyright (c) 2024 BayLibre, SAS
4  *
5  * SPDX-License-Identifier: Apache-2.0
6  */
7 #include <zephyr/kernel.h>
8 #include <errno.h>
9 #include <string.h>
10 #include <zephyr/sys/atomic.h>
11 #include <zephyr/posix/mqueue.h>
12 #include <zephyr/posix/pthread.h>
13 
14 #define SIGEV_MASK (SIGEV_NONE | SIGEV_SIGNAL | SIGEV_THREAD)
15 
16 typedef struct mqueue_object {
17 	sys_snode_t snode;
18 	char *mem_buffer;
19 	char *mem_obj;
20 	struct k_msgq queue;
21 	atomic_t ref_count;
22 	char *name;
23 	struct sigevent not;
24 } mqueue_object;
25 
26 typedef struct mqueue_desc {
27 	char *mem_desc;
28 	mqueue_object *mqueue;
29 	uint32_t  flags;
30 } mqueue_desc;
31 
32 K_SEM_DEFINE(mq_sem, 1, 1);
33 
34 /* Initialize the list */
35 sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list);
36 
37 int64_t timespec_to_timeoutms(const struct timespec *abstime);
38 static mqueue_object *find_in_list(const char *name);
39 static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
40 			  k_timeout_t timeout);
41 static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
42 			   k_timeout_t timeout);
43 static void remove_notification(mqueue_object *msg_queue);
44 static void remove_mq(mqueue_object *msg_queue);
45 static void *mq_notify_thread(void *arg);
46 
47 /**
48  * @brief Open a message queue.
49  *
50  * Number of message queue and descriptor to message queue are limited by
51  * heap size. increase the size through CONFIG_HEAP_MEM_POOL_SIZE.
52  *
53  * See IEEE 1003.1
54  */
mq_open(const char * name,int oflags,...)55 mqd_t mq_open(const char *name, int oflags, ...)
56 {
57 	va_list va;
58 	mode_t mode;
59 	struct mq_attr *attrs = NULL;
60 	long msg_size = 0U, max_msgs = 0U;
61 	mqueue_object *msg_queue;
62 	mqueue_desc *msg_queue_desc = NULL, *mqd = (mqueue_desc *)(-1);
63 	char *mq_desc_ptr, *mq_obj_ptr, *mq_buf_ptr, *mq_name_ptr;
64 
65 	va_start(va, oflags);
66 	if ((oflags & O_CREAT) != 0) {
67 		BUILD_ASSERT(sizeof(mode_t) <= sizeof(int));
68 		mode = va_arg(va, unsigned int);
69 		attrs = va_arg(va, struct mq_attr*);
70 	}
71 	va_end(va);
72 
73 	if (attrs != NULL) {
74 		msg_size = attrs->mq_msgsize;
75 		max_msgs = attrs->mq_maxmsg;
76 	}
77 
78 	if ((name == NULL) || ((oflags & O_CREAT) != 0 && (msg_size <= 0 ||
79 						      max_msgs <= 0))) {
80 		errno = EINVAL;
81 		return (mqd_t)mqd;
82 	}
83 
84 	if ((strlen(name) + 1)  > CONFIG_MQUEUE_NAMELEN_MAX) {
85 		errno = ENAMETOOLONG;
86 		return (mqd_t)mqd;
87 	}
88 
89 	/* Check if queue already exists */
90 	k_sem_take(&mq_sem, K_FOREVER);
91 	msg_queue = find_in_list(name);
92 	k_sem_give(&mq_sem);
93 
94 	if ((msg_queue != NULL) && (oflags & O_CREAT) != 0 &&
95 	    (oflags & O_EXCL) != 0) {
96 		/* Message queue has already been opened and O_EXCL is set */
97 		errno = EEXIST;
98 		return (mqd_t)mqd;
99 	}
100 
101 	if ((msg_queue == NULL) && (oflags & O_CREAT) == 0) {
102 		errno = ENOENT;
103 		return (mqd_t)mqd;
104 	}
105 
106 	mq_desc_ptr = k_malloc(sizeof(struct mqueue_desc));
107 	if (mq_desc_ptr != NULL) {
108 		(void)memset(mq_desc_ptr, 0, sizeof(struct mqueue_desc));
109 		msg_queue_desc = (struct mqueue_desc *)mq_desc_ptr;
110 		msg_queue_desc->mem_desc = mq_desc_ptr;
111 	} else {
112 		goto free_mq_desc;
113 	}
114 
115 
116 	/* Allocate mqueue object for new message queue */
117 	if (msg_queue == NULL) {
118 
119 		/* Check for message quantity and size in message queue */
120 		if (attrs->mq_msgsize > CONFIG_MSG_SIZE_MAX &&
121 		    attrs->mq_maxmsg > CONFIG_POSIX_MQ_OPEN_MAX) {
122 			goto free_mq_desc;
123 		}
124 
125 		mq_obj_ptr = k_malloc(sizeof(mqueue_object));
126 		if (mq_obj_ptr != NULL) {
127 			(void)memset(mq_obj_ptr, 0, sizeof(mqueue_object));
128 			msg_queue = (mqueue_object *)mq_obj_ptr;
129 			msg_queue->mem_obj = mq_obj_ptr;
130 
131 		} else {
132 			goto free_mq_object;
133 		}
134 
135 		mq_name_ptr = k_malloc(strlen(name) + 1);
136 		if (mq_name_ptr != NULL) {
137 			(void)memset(mq_name_ptr, 0, strlen(name) + 1);
138 			msg_queue->name = mq_name_ptr;
139 
140 		} else {
141 			goto free_mq_name;
142 		}
143 
144 		strcpy(msg_queue->name, name);
145 
146 		mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(uint8_t));
147 		if (mq_buf_ptr != NULL) {
148 			(void)memset(mq_buf_ptr, 0,
149 				     msg_size * max_msgs * sizeof(uint8_t));
150 			msg_queue->mem_buffer = mq_buf_ptr;
151 		} else {
152 			goto free_mq_buffer;
153 		}
154 
155 		(void)atomic_set(&msg_queue->ref_count, 1);
156 		/* initialize zephyr message queue */
157 		k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size,
158 			    max_msgs);
159 		k_sem_take(&mq_sem, K_FOREVER);
160 		sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode));
161 		k_sem_give(&mq_sem);
162 
163 	} else {
164 		atomic_inc(&msg_queue->ref_count);
165 	}
166 
167 	msg_queue_desc->mqueue = msg_queue;
168 	msg_queue_desc->flags = (oflags & O_NONBLOCK) != 0 ? O_NONBLOCK : 0;
169 	return (mqd_t)msg_queue_desc;
170 
171 free_mq_buffer:
172 	k_free(mq_name_ptr);
173 free_mq_name:
174 	k_free(mq_obj_ptr);
175 free_mq_object:
176 	k_free(mq_desc_ptr);
177 free_mq_desc:
178 	errno = ENOSPC;
179 	return (mqd_t)mqd;
180 }
181 
182 /**
183  * @brief Close a message queue descriptor.
184  *
185  * See IEEE 1003.1
186  */
mq_close(mqd_t mqdes)187 int mq_close(mqd_t mqdes)
188 {
189 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
190 
191 	if (mqd == NULL) {
192 		errno = EBADF;
193 		return -1;
194 	}
195 
196 	atomic_dec(&mqd->mqueue->ref_count);
197 
198 	/* remove mq if marked for unlink */
199 	if (mqd->mqueue->name == NULL) {
200 		remove_mq(mqd->mqueue);
201 	}
202 
203 	k_free(mqd->mem_desc);
204 	return 0;
205 }
206 
207 /**
208  * @brief Remove a message queue.
209  *
210  * See IEEE 1003.1
211  */
mq_unlink(const char * name)212 int mq_unlink(const char *name)
213 {
214 	mqueue_object *msg_queue;
215 
216 	k_sem_take(&mq_sem, K_FOREVER);
217 	msg_queue = find_in_list(name);
218 
219 	if (msg_queue == NULL) {
220 		k_sem_give(&mq_sem);
221 		errno = EBADF;
222 		return -1;
223 	}
224 
225 	k_free(msg_queue->name);
226 	msg_queue->name = NULL;
227 	k_sem_give(&mq_sem);
228 	remove_mq(msg_queue);
229 	return 0;
230 }
231 
232 /**
233  * @brief Send a message to a message queue.
234  *
235  * All messages in message queue are of equal priority.
236  *
237  * See IEEE 1003.1
238  */
mq_send(mqd_t mqdes,const char * msg_ptr,size_t msg_len,unsigned int msg_prio)239 int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
240 	    unsigned int msg_prio)
241 {
242 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
243 
244 	return send_message(mqd, msg_ptr, msg_len, K_FOREVER);
245 }
246 
247 /**
248  * @brief Send message to a message queue within abstime time.
249  *
250  * All messages in message queue are of equal priority.
251  *
252  * See IEEE 1003.1
253  */
mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,unsigned int msg_prio,const struct timespec * abstime)254 int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
255 		 unsigned int msg_prio, const struct timespec *abstime)
256 {
257 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
258 	int32_t timeout = (int32_t) timespec_to_timeoutms(abstime);
259 
260 	return send_message(mqd, msg_ptr, msg_len, K_MSEC(timeout));
261 }
262 
263 /**
264  * @brief Receive a message from a message queue.
265  *
266  * All messages in message queue are of equal priority.
267  *
268  * See IEEE 1003.1
269  */
mq_receive(mqd_t mqdes,char * msg_ptr,size_t msg_len,unsigned int * msg_prio)270 int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
271 		   unsigned int *msg_prio)
272 {
273 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
274 
275 	return receive_message(mqd, msg_ptr, msg_len, K_FOREVER);
276 }
277 
278 /**
279  * @brief Receive message from a message queue within abstime time.
280  *
281  * All messages in message queue are of equal priority.
282  *
283  * See IEEE 1003.1
284  */
mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,unsigned int * msg_prio,const struct timespec * abstime)285 int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
286 			unsigned int *msg_prio, const struct timespec *abstime)
287 {
288 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
289 	int32_t timeout = (int32_t) timespec_to_timeoutms(abstime);
290 
291 	return receive_message(mqd, msg_ptr, msg_len, K_MSEC(timeout));
292 }
293 
294 /**
295  * @brief Get message queue attributes.
296  *
297  * See IEEE 1003.1
298  */
mq_getattr(mqd_t mqdes,struct mq_attr * mqstat)299 int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
300 {
301 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
302 	struct k_msgq_attrs attrs;
303 
304 	if (mqd == NULL) {
305 		errno = EBADF;
306 		return -1;
307 	}
308 
309 	k_sem_take(&mq_sem, K_FOREVER);
310 	k_msgq_get_attrs(&mqd->mqueue->queue, &attrs);
311 	mqstat->mq_flags = mqd->flags;
312 	mqstat->mq_maxmsg = attrs.max_msgs;
313 	mqstat->mq_msgsize = attrs.msg_size;
314 	mqstat->mq_curmsgs = attrs.used_msgs;
315 	k_sem_give(&mq_sem);
316 	return 0;
317 }
318 
319 /**
320  * @brief Set message queue attributes.
321  *
322  * See IEEE 1003.1
323  */
mq_setattr(mqd_t mqdes,const struct mq_attr * mqstat,struct mq_attr * omqstat)324 int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
325 	       struct mq_attr *omqstat)
326 {
327 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
328 
329 	if (mqd == NULL) {
330 		errno = EBADF;
331 		return -1;
332 	}
333 
334 	if (mqstat->mq_flags != 0 && mqstat->mq_flags != O_NONBLOCK) {
335 		errno = EINVAL;
336 		return -1;
337 	}
338 
339 	if (omqstat != NULL) {
340 		mq_getattr(mqdes, omqstat);
341 	}
342 
343 	k_sem_take(&mq_sem, K_FOREVER);
344 	mqd->flags = mqstat->mq_flags;
345 	k_sem_give(&mq_sem);
346 
347 	return 0;
348 }
349 
350 /**
351  * @brief Notify process that a message is available.
352  *
353  * See IEEE 1003.1
354  */
mq_notify(mqd_t mqdes,const struct sigevent * notification)355 int mq_notify(mqd_t mqdes, const struct sigevent *notification)
356 {
357 	mqueue_desc *mqd = (mqueue_desc *)mqdes;
358 
359 	if (mqd == NULL) {
360 		errno = EBADF;
361 		return -1;
362 	}
363 
364 	mqueue_object *msg_queue = mqd->mqueue;
365 
366 	if (notification == NULL) {
367 		if ((msg_queue->not.sigev_notify & SIGEV_MASK) == 0) {
368 			errno = EINVAL;
369 			return -1;
370 		}
371 		remove_notification(msg_queue);
372 		return 0;
373 	}
374 
375 	if ((msg_queue->not.sigev_notify & SIGEV_MASK) != 0) {
376 		errno = EBUSY;
377 		return -1;
378 	}
379 	if (notification->sigev_notify == SIGEV_SIGNAL) {
380 		errno = ENOSYS;
381 		return -1;
382 	}
383 	if (notification->sigev_notify_attributes != NULL) {
384 		int ret = pthread_attr_setdetachstate(notification->sigev_notify_attributes,
385 						      PTHREAD_CREATE_DETACHED);
386 		if (ret != 0) {
387 			errno = ret;
388 			return -1;
389 		}
390 	}
391 
392 	k_sem_take(&mq_sem, K_FOREVER);
393 	memcpy(&msg_queue->not, notification, sizeof(struct sigevent));
394 	k_sem_give(&mq_sem);
395 
396 	return 0;
397 }
398 
mq_notify_thread(void * arg)399 static void *mq_notify_thread(void *arg)
400 {
401 	mqueue_object *mqueue = (mqueue_object *)arg;
402 	struct sigevent *sevp = &mqueue->not;
403 
404 	pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
405 
406 	if (sevp->sigev_notify_attributes == NULL) {
407 		pthread_detach(pthread_self());
408 	}
409 
410 	sevp->sigev_notify_function(sevp->sigev_value);
411 
412 	remove_notification(mqueue);
413 
414 	return NULL;
415 }
416 
417 /* Internal functions */
find_in_list(const char * name)418 static mqueue_object *find_in_list(const char *name)
419 {
420 	sys_snode_t *mq;
421 	mqueue_object *msg_queue;
422 
423 	mq = mq_list.head;
424 
425 	while (mq != NULL) {
426 		msg_queue = (mqueue_object *)mq;
427 		if (strcmp(msg_queue->name, name) == 0) {
428 			return msg_queue;
429 		}
430 
431 		mq = mq->next;
432 	}
433 
434 	return NULL;
435 }
436 
send_message(mqueue_desc * mqd,const char * msg_ptr,size_t msg_len,k_timeout_t timeout)437 static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
438 			  k_timeout_t timeout)
439 {
440 	int32_t ret = -1;
441 
442 	if (mqd == NULL) {
443 		errno = EBADF;
444 		return ret;
445 	}
446 
447 	if ((mqd->flags & O_NONBLOCK) != 0U) {
448 		timeout = K_NO_WAIT;
449 	}
450 
451 	if (msg_len >  mqd->mqueue->queue.msg_size) {
452 		errno = EMSGSIZE;
453 		return ret;
454 	}
455 
456 	uint32_t msgq_num = k_msgq_num_used_get(&mqd->mqueue->queue);
457 
458 	if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
459 		errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
460 		return ret;
461 	}
462 
463 	if (k_msgq_num_used_get(&mqd->mqueue->queue) - msgq_num > 0) {
464 		struct sigevent *sevp = &mqd->mqueue->not;
465 
466 		if (sevp->sigev_notify == SIGEV_NONE) {
467 			sevp->sigev_notify_function(sevp->sigev_value);
468 		} else if (sevp->sigev_notify == SIGEV_THREAD) {
469 			pthread_t th;
470 
471 			ret = pthread_create(&th,
472 					     sevp->sigev_notify_attributes,
473 					     mq_notify_thread,
474 					     mqd->mqueue);
475 		}
476 	}
477 
478 	return 0;
479 }
480 
receive_message(mqueue_desc * mqd,char * msg_ptr,size_t msg_len,k_timeout_t timeout)481 static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
482 			     k_timeout_t timeout)
483 {
484 	int ret = -1;
485 
486 	if (mqd == NULL) {
487 		errno = EBADF;
488 		return ret;
489 	}
490 
491 	if (msg_len < mqd->mqueue->queue.msg_size) {
492 		errno = EMSGSIZE;
493 		return ret;
494 	}
495 
496 	if ((mqd->flags & O_NONBLOCK) != 0U) {
497 		timeout = K_NO_WAIT;
498 	}
499 
500 	if (k_msgq_get(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
501 		errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
502 	} else {
503 		ret = mqd->mqueue->queue.msg_size;
504 	}
505 
506 	return ret;
507 }
508 
remove_mq(mqueue_object * msg_queue)509 static void remove_mq(mqueue_object *msg_queue)
510 {
511 	if (atomic_cas(&msg_queue->ref_count, 0, 0)) {
512 		k_sem_take(&mq_sem, K_FOREVER);
513 		sys_slist_find_and_remove(&mq_list, (sys_snode_t *) msg_queue);
514 		k_sem_give(&mq_sem);
515 
516 		/* Free mq buffer and pbject */
517 		k_free(msg_queue->mem_buffer);
518 		k_free(msg_queue->mem_obj);
519 	}
520 }
521 
remove_notification(mqueue_object * msg_queue)522 static void remove_notification(mqueue_object *msg_queue)
523 {
524 	k_sem_take(&mq_sem, K_FOREVER);
525 	memset(&msg_queue->not, 0, sizeof(struct sigevent));
526 	k_sem_give(&mq_sem);
527 }
528