1 /*
2 * Copyright (c) 2018 Intel Corporation
3 * Copyright (c) 2024 BayLibre, SAS
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 */
7 #include <zephyr/kernel.h>
8 #include <errno.h>
9 #include <string.h>
10 #include <zephyr/sys/atomic.h>
11 #include <zephyr/posix/mqueue.h>
12 #include <zephyr/posix/pthread.h>
13
14 #define SIGEV_MASK (SIGEV_NONE | SIGEV_SIGNAL | SIGEV_THREAD)
15
16 typedef struct mqueue_object {
17 sys_snode_t snode;
18 char *mem_buffer;
19 char *mem_obj;
20 struct k_msgq queue;
21 atomic_t ref_count;
22 char *name;
23 struct sigevent not;
24 } mqueue_object;
25
26 typedef struct mqueue_desc {
27 char *mem_desc;
28 mqueue_object *mqueue;
29 uint32_t flags;
30 } mqueue_desc;
31
32 K_SEM_DEFINE(mq_sem, 1, 1);
33
34 /* Initialize the list */
35 sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list);
36
37 int64_t timespec_to_timeoutms(const struct timespec *abstime);
38 static mqueue_object *find_in_list(const char *name);
39 static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
40 k_timeout_t timeout);
41 static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
42 k_timeout_t timeout);
43 static void remove_notification(mqueue_object *msg_queue);
44 static void remove_mq(mqueue_object *msg_queue);
45 static void *mq_notify_thread(void *arg);
46
47 /**
48 * @brief Open a message queue.
49 *
50 * Number of message queue and descriptor to message queue are limited by
51 * heap size. increase the size through CONFIG_HEAP_MEM_POOL_SIZE.
52 *
53 * See IEEE 1003.1
54 */
mq_open(const char * name,int oflags,...)55 mqd_t mq_open(const char *name, int oflags, ...)
56 {
57 va_list va;
58 mode_t mode;
59 struct mq_attr *attrs = NULL;
60 long msg_size = 0U, max_msgs = 0U;
61 mqueue_object *msg_queue;
62 mqueue_desc *msg_queue_desc = NULL, *mqd = (mqueue_desc *)(-1);
63 char *mq_desc_ptr, *mq_obj_ptr, *mq_buf_ptr, *mq_name_ptr;
64
65 va_start(va, oflags);
66 if ((oflags & O_CREAT) != 0) {
67 BUILD_ASSERT(sizeof(mode_t) <= sizeof(int));
68 mode = va_arg(va, unsigned int);
69 attrs = va_arg(va, struct mq_attr*);
70 }
71 va_end(va);
72
73 if (attrs != NULL) {
74 msg_size = attrs->mq_msgsize;
75 max_msgs = attrs->mq_maxmsg;
76 }
77
78 if ((name == NULL) || ((oflags & O_CREAT) != 0 && (msg_size <= 0 ||
79 max_msgs <= 0))) {
80 errno = EINVAL;
81 return (mqd_t)mqd;
82 }
83
84 if ((strlen(name) + 1) > CONFIG_MQUEUE_NAMELEN_MAX) {
85 errno = ENAMETOOLONG;
86 return (mqd_t)mqd;
87 }
88
89 /* Check if queue already exists */
90 k_sem_take(&mq_sem, K_FOREVER);
91 msg_queue = find_in_list(name);
92 k_sem_give(&mq_sem);
93
94 if ((msg_queue != NULL) && (oflags & O_CREAT) != 0 &&
95 (oflags & O_EXCL) != 0) {
96 /* Message queue has already been opened and O_EXCL is set */
97 errno = EEXIST;
98 return (mqd_t)mqd;
99 }
100
101 if ((msg_queue == NULL) && (oflags & O_CREAT) == 0) {
102 errno = ENOENT;
103 return (mqd_t)mqd;
104 }
105
106 mq_desc_ptr = k_malloc(sizeof(struct mqueue_desc));
107 if (mq_desc_ptr != NULL) {
108 (void)memset(mq_desc_ptr, 0, sizeof(struct mqueue_desc));
109 msg_queue_desc = (struct mqueue_desc *)mq_desc_ptr;
110 msg_queue_desc->mem_desc = mq_desc_ptr;
111 } else {
112 goto free_mq_desc;
113 }
114
115
116 /* Allocate mqueue object for new message queue */
117 if (msg_queue == NULL) {
118
119 /* Check for message quantity and size in message queue */
120 if (attrs->mq_msgsize > CONFIG_MSG_SIZE_MAX &&
121 attrs->mq_maxmsg > CONFIG_POSIX_MQ_OPEN_MAX) {
122 goto free_mq_desc;
123 }
124
125 mq_obj_ptr = k_malloc(sizeof(mqueue_object));
126 if (mq_obj_ptr != NULL) {
127 (void)memset(mq_obj_ptr, 0, sizeof(mqueue_object));
128 msg_queue = (mqueue_object *)mq_obj_ptr;
129 msg_queue->mem_obj = mq_obj_ptr;
130
131 } else {
132 goto free_mq_object;
133 }
134
135 mq_name_ptr = k_malloc(strlen(name) + 1);
136 if (mq_name_ptr != NULL) {
137 (void)memset(mq_name_ptr, 0, strlen(name) + 1);
138 msg_queue->name = mq_name_ptr;
139
140 } else {
141 goto free_mq_name;
142 }
143
144 strcpy(msg_queue->name, name);
145
146 mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(uint8_t));
147 if (mq_buf_ptr != NULL) {
148 (void)memset(mq_buf_ptr, 0,
149 msg_size * max_msgs * sizeof(uint8_t));
150 msg_queue->mem_buffer = mq_buf_ptr;
151 } else {
152 goto free_mq_buffer;
153 }
154
155 (void)atomic_set(&msg_queue->ref_count, 1);
156 /* initialize zephyr message queue */
157 k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size,
158 max_msgs);
159 k_sem_take(&mq_sem, K_FOREVER);
160 sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode));
161 k_sem_give(&mq_sem);
162
163 } else {
164 atomic_inc(&msg_queue->ref_count);
165 }
166
167 msg_queue_desc->mqueue = msg_queue;
168 msg_queue_desc->flags = (oflags & O_NONBLOCK) != 0 ? O_NONBLOCK : 0;
169 return (mqd_t)msg_queue_desc;
170
171 free_mq_buffer:
172 k_free(mq_name_ptr);
173 free_mq_name:
174 k_free(mq_obj_ptr);
175 free_mq_object:
176 k_free(mq_desc_ptr);
177 free_mq_desc:
178 errno = ENOSPC;
179 return (mqd_t)mqd;
180 }
181
182 /**
183 * @brief Close a message queue descriptor.
184 *
185 * See IEEE 1003.1
186 */
mq_close(mqd_t mqdes)187 int mq_close(mqd_t mqdes)
188 {
189 mqueue_desc *mqd = (mqueue_desc *)mqdes;
190
191 if (mqd == NULL) {
192 errno = EBADF;
193 return -1;
194 }
195
196 atomic_dec(&mqd->mqueue->ref_count);
197
198 /* remove mq if marked for unlink */
199 if (mqd->mqueue->name == NULL) {
200 remove_mq(mqd->mqueue);
201 }
202
203 k_free(mqd->mem_desc);
204 return 0;
205 }
206
207 /**
208 * @brief Remove a message queue.
209 *
210 * See IEEE 1003.1
211 */
mq_unlink(const char * name)212 int mq_unlink(const char *name)
213 {
214 mqueue_object *msg_queue;
215
216 k_sem_take(&mq_sem, K_FOREVER);
217 msg_queue = find_in_list(name);
218
219 if (msg_queue == NULL) {
220 k_sem_give(&mq_sem);
221 errno = EBADF;
222 return -1;
223 }
224
225 k_free(msg_queue->name);
226 msg_queue->name = NULL;
227 k_sem_give(&mq_sem);
228 remove_mq(msg_queue);
229 return 0;
230 }
231
232 /**
233 * @brief Send a message to a message queue.
234 *
235 * All messages in message queue are of equal priority.
236 *
237 * See IEEE 1003.1
238 */
mq_send(mqd_t mqdes,const char * msg_ptr,size_t msg_len,unsigned int msg_prio)239 int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
240 unsigned int msg_prio)
241 {
242 mqueue_desc *mqd = (mqueue_desc *)mqdes;
243
244 return send_message(mqd, msg_ptr, msg_len, K_FOREVER);
245 }
246
247 /**
248 * @brief Send message to a message queue within abstime time.
249 *
250 * All messages in message queue are of equal priority.
251 *
252 * See IEEE 1003.1
253 */
mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,unsigned int msg_prio,const struct timespec * abstime)254 int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
255 unsigned int msg_prio, const struct timespec *abstime)
256 {
257 mqueue_desc *mqd = (mqueue_desc *)mqdes;
258 int32_t timeout = (int32_t) timespec_to_timeoutms(abstime);
259
260 return send_message(mqd, msg_ptr, msg_len, K_MSEC(timeout));
261 }
262
263 /**
264 * @brief Receive a message from a message queue.
265 *
266 * All messages in message queue are of equal priority.
267 *
268 * See IEEE 1003.1
269 */
mq_receive(mqd_t mqdes,char * msg_ptr,size_t msg_len,unsigned int * msg_prio)270 int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
271 unsigned int *msg_prio)
272 {
273 mqueue_desc *mqd = (mqueue_desc *)mqdes;
274
275 return receive_message(mqd, msg_ptr, msg_len, K_FOREVER);
276 }
277
278 /**
279 * @brief Receive message from a message queue within abstime time.
280 *
281 * All messages in message queue are of equal priority.
282 *
283 * See IEEE 1003.1
284 */
mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,unsigned int * msg_prio,const struct timespec * abstime)285 int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
286 unsigned int *msg_prio, const struct timespec *abstime)
287 {
288 mqueue_desc *mqd = (mqueue_desc *)mqdes;
289 int32_t timeout = (int32_t) timespec_to_timeoutms(abstime);
290
291 return receive_message(mqd, msg_ptr, msg_len, K_MSEC(timeout));
292 }
293
294 /**
295 * @brief Get message queue attributes.
296 *
297 * See IEEE 1003.1
298 */
mq_getattr(mqd_t mqdes,struct mq_attr * mqstat)299 int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
300 {
301 mqueue_desc *mqd = (mqueue_desc *)mqdes;
302 struct k_msgq_attrs attrs;
303
304 if (mqd == NULL) {
305 errno = EBADF;
306 return -1;
307 }
308
309 k_sem_take(&mq_sem, K_FOREVER);
310 k_msgq_get_attrs(&mqd->mqueue->queue, &attrs);
311 mqstat->mq_flags = mqd->flags;
312 mqstat->mq_maxmsg = attrs.max_msgs;
313 mqstat->mq_msgsize = attrs.msg_size;
314 mqstat->mq_curmsgs = attrs.used_msgs;
315 k_sem_give(&mq_sem);
316 return 0;
317 }
318
319 /**
320 * @brief Set message queue attributes.
321 *
322 * See IEEE 1003.1
323 */
mq_setattr(mqd_t mqdes,const struct mq_attr * mqstat,struct mq_attr * omqstat)324 int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
325 struct mq_attr *omqstat)
326 {
327 mqueue_desc *mqd = (mqueue_desc *)mqdes;
328
329 if (mqd == NULL) {
330 errno = EBADF;
331 return -1;
332 }
333
334 if (mqstat->mq_flags != 0 && mqstat->mq_flags != O_NONBLOCK) {
335 errno = EINVAL;
336 return -1;
337 }
338
339 if (omqstat != NULL) {
340 mq_getattr(mqdes, omqstat);
341 }
342
343 k_sem_take(&mq_sem, K_FOREVER);
344 mqd->flags = mqstat->mq_flags;
345 k_sem_give(&mq_sem);
346
347 return 0;
348 }
349
350 /**
351 * @brief Notify process that a message is available.
352 *
353 * See IEEE 1003.1
354 */
mq_notify(mqd_t mqdes,const struct sigevent * notification)355 int mq_notify(mqd_t mqdes, const struct sigevent *notification)
356 {
357 mqueue_desc *mqd = (mqueue_desc *)mqdes;
358
359 if (mqd == NULL) {
360 errno = EBADF;
361 return -1;
362 }
363
364 mqueue_object *msg_queue = mqd->mqueue;
365
366 if (notification == NULL) {
367 if ((msg_queue->not.sigev_notify & SIGEV_MASK) == 0) {
368 errno = EINVAL;
369 return -1;
370 }
371 remove_notification(msg_queue);
372 return 0;
373 }
374
375 if ((msg_queue->not.sigev_notify & SIGEV_MASK) != 0) {
376 errno = EBUSY;
377 return -1;
378 }
379 if (notification->sigev_notify == SIGEV_SIGNAL) {
380 errno = ENOSYS;
381 return -1;
382 }
383 if (notification->sigev_notify_attributes != NULL) {
384 int ret = pthread_attr_setdetachstate(notification->sigev_notify_attributes,
385 PTHREAD_CREATE_DETACHED);
386 if (ret != 0) {
387 errno = ret;
388 return -1;
389 }
390 }
391
392 k_sem_take(&mq_sem, K_FOREVER);
393 memcpy(&msg_queue->not, notification, sizeof(struct sigevent));
394 k_sem_give(&mq_sem);
395
396 return 0;
397 }
398
mq_notify_thread(void * arg)399 static void *mq_notify_thread(void *arg)
400 {
401 mqueue_object *mqueue = (mqueue_object *)arg;
402 struct sigevent *sevp = &mqueue->not;
403
404 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
405
406 if (sevp->sigev_notify_attributes == NULL) {
407 pthread_detach(pthread_self());
408 }
409
410 sevp->sigev_notify_function(sevp->sigev_value);
411
412 remove_notification(mqueue);
413
414 return NULL;
415 }
416
417 /* Internal functions */
find_in_list(const char * name)418 static mqueue_object *find_in_list(const char *name)
419 {
420 sys_snode_t *mq;
421 mqueue_object *msg_queue;
422
423 mq = mq_list.head;
424
425 while (mq != NULL) {
426 msg_queue = (mqueue_object *)mq;
427 if (strcmp(msg_queue->name, name) == 0) {
428 return msg_queue;
429 }
430
431 mq = mq->next;
432 }
433
434 return NULL;
435 }
436
send_message(mqueue_desc * mqd,const char * msg_ptr,size_t msg_len,k_timeout_t timeout)437 static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
438 k_timeout_t timeout)
439 {
440 int32_t ret = -1;
441
442 if (mqd == NULL) {
443 errno = EBADF;
444 return ret;
445 }
446
447 if ((mqd->flags & O_NONBLOCK) != 0U) {
448 timeout = K_NO_WAIT;
449 }
450
451 if (msg_len > mqd->mqueue->queue.msg_size) {
452 errno = EMSGSIZE;
453 return ret;
454 }
455
456 uint32_t msgq_num = k_msgq_num_used_get(&mqd->mqueue->queue);
457
458 if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
459 errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
460 return ret;
461 }
462
463 if (k_msgq_num_used_get(&mqd->mqueue->queue) - msgq_num > 0) {
464 struct sigevent *sevp = &mqd->mqueue->not;
465
466 if (sevp->sigev_notify == SIGEV_NONE) {
467 sevp->sigev_notify_function(sevp->sigev_value);
468 } else if (sevp->sigev_notify == SIGEV_THREAD) {
469 pthread_t th;
470
471 ret = pthread_create(&th,
472 sevp->sigev_notify_attributes,
473 mq_notify_thread,
474 mqd->mqueue);
475 }
476 }
477
478 return 0;
479 }
480
receive_message(mqueue_desc * mqd,char * msg_ptr,size_t msg_len,k_timeout_t timeout)481 static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
482 k_timeout_t timeout)
483 {
484 int ret = -1;
485
486 if (mqd == NULL) {
487 errno = EBADF;
488 return ret;
489 }
490
491 if (msg_len < mqd->mqueue->queue.msg_size) {
492 errno = EMSGSIZE;
493 return ret;
494 }
495
496 if ((mqd->flags & O_NONBLOCK) != 0U) {
497 timeout = K_NO_WAIT;
498 }
499
500 if (k_msgq_get(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
501 errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
502 } else {
503 ret = mqd->mqueue->queue.msg_size;
504 }
505
506 return ret;
507 }
508
remove_mq(mqueue_object * msg_queue)509 static void remove_mq(mqueue_object *msg_queue)
510 {
511 if (atomic_cas(&msg_queue->ref_count, 0, 0)) {
512 k_sem_take(&mq_sem, K_FOREVER);
513 sys_slist_find_and_remove(&mq_list, (sys_snode_t *) msg_queue);
514 k_sem_give(&mq_sem);
515
516 /* Free mq buffer and pbject */
517 k_free(msg_queue->mem_buffer);
518 k_free(msg_queue->mem_obj);
519 }
520 }
521
remove_notification(mqueue_object * msg_queue)522 static void remove_notification(mqueue_object *msg_queue)
523 {
524 k_sem_take(&mq_sem, K_FOREVER);
525 memset(&msg_queue->not, 0, sizeof(struct sigevent));
526 k_sem_give(&mq_sem);
527 }
528