1 /*
2 * Copyright (c) 2018 Intel Corporation
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6 #include <zephyr/kernel.h>
7 #include <errno.h>
8 #include <string.h>
9 #include <zephyr/sys/atomic.h>
10 #include <zephyr/posix/time.h>
11 #include <zephyr/posix/mqueue.h>
12
13 typedef struct mqueue_object {
14 sys_snode_t snode;
15 char *mem_buffer;
16 char *mem_obj;
17 struct k_msgq queue;
18 atomic_t ref_count;
19 char *name;
20 } mqueue_object;
21
22 typedef struct mqueue_desc {
23 char *mem_desc;
24 mqueue_object *mqueue;
25 uint32_t flags;
26 } mqueue_desc;
27
28 K_SEM_DEFINE(mq_sem, 1, 1);
29
30 /* Initialize the list */
31 sys_slist_t mq_list = SYS_SLIST_STATIC_INIT(&mq_list);
32
33 int64_t timespec_to_timeoutms(const struct timespec *abstime);
34 static mqueue_object *find_in_list(const char *name);
35 static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
36 k_timeout_t timeout);
37 static int receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
38 k_timeout_t timeout);
39 static void remove_mq(mqueue_object *msg_queue);
40
41 #if defined(__sparc__)
42 /*
43 * mode_t is defined as "unsigned short" on SPARC newlib. This type is promoted
44 * to "int" when passed through '...' so we should pass the promoted type to
45 * va_arg().
46 */
47 #define PROMOTED_MODE_T int
48 #else
49 #define PROMOTED_MODE_T mode_t
50 #endif
51
52 /**
53 * @brief Open a message queue.
54 *
55 * Number of message queue and descriptor to message queue are limited by
56 * heap size. increase the size through CONFIG_HEAP_MEM_POOL_SIZE.
57 *
58 * See IEEE 1003.1
59 */
mq_open(const char * name,int oflags,...)60 mqd_t mq_open(const char *name, int oflags, ...)
61 {
62 va_list va;
63 mode_t mode;
64 struct mq_attr *attrs = NULL;
65 long msg_size = 0U, max_msgs = 0U;
66 mqueue_object *msg_queue;
67 mqueue_desc *msg_queue_desc = NULL, *mqd = (mqueue_desc *)(-1);
68 char *mq_desc_ptr, *mq_obj_ptr, *mq_buf_ptr, *mq_name_ptr;
69
70 va_start(va, oflags);
71 if ((oflags & O_CREAT) != 0) {
72 mode = va_arg(va, PROMOTED_MODE_T);
73 attrs = va_arg(va, struct mq_attr*);
74 }
75 va_end(va);
76
77 if (attrs != NULL) {
78 msg_size = attrs->mq_msgsize;
79 max_msgs = attrs->mq_maxmsg;
80 }
81
82 if ((name == NULL) || ((oflags & O_CREAT) != 0 && (msg_size <= 0 ||
83 max_msgs <= 0))) {
84 errno = EINVAL;
85 return (mqd_t)mqd;
86 }
87
88 if ((strlen(name) + 1) > CONFIG_MQUEUE_NAMELEN_MAX) {
89 errno = ENAMETOOLONG;
90 return (mqd_t)mqd;
91 }
92
93 /* Check if queue already exists */
94 k_sem_take(&mq_sem, K_FOREVER);
95 msg_queue = find_in_list(name);
96 k_sem_give(&mq_sem);
97
98 if ((msg_queue != NULL) && (oflags & O_CREAT) != 0 &&
99 (oflags & O_EXCL) != 0) {
100 /* Message queue has already been opened and O_EXCL is set */
101 errno = EEXIST;
102 return (mqd_t)mqd;
103 }
104
105 if ((msg_queue == NULL) && (oflags & O_CREAT) == 0) {
106 errno = ENOENT;
107 return (mqd_t)mqd;
108 }
109
110 mq_desc_ptr = k_malloc(sizeof(struct mqueue_desc));
111 if (mq_desc_ptr != NULL) {
112 (void)memset(mq_desc_ptr, 0, sizeof(struct mqueue_desc));
113 msg_queue_desc = (struct mqueue_desc *)mq_desc_ptr;
114 msg_queue_desc->mem_desc = mq_desc_ptr;
115 } else {
116 goto free_mq_desc;
117 }
118
119
120 /* Allocate mqueue object for new message queue */
121 if (msg_queue == NULL) {
122
123 /* Check for message quantity and size in message queue */
124 if (attrs->mq_msgsize > CONFIG_MSG_SIZE_MAX &&
125 attrs->mq_maxmsg > CONFIG_MSG_COUNT_MAX) {
126 goto free_mq_desc;
127 }
128
129 mq_obj_ptr = k_malloc(sizeof(mqueue_object));
130 if (mq_obj_ptr != NULL) {
131 (void)memset(mq_obj_ptr, 0, sizeof(mqueue_object));
132 msg_queue = (mqueue_object *)mq_obj_ptr;
133 msg_queue->mem_obj = mq_obj_ptr;
134
135 } else {
136 goto free_mq_object;
137 }
138
139 mq_name_ptr = k_malloc(strlen(name) + 1);
140 if (mq_name_ptr != NULL) {
141 (void)memset(mq_name_ptr, 0, strlen(name) + 1);
142 msg_queue->name = mq_name_ptr;
143
144 } else {
145 goto free_mq_name;
146 }
147
148 strcpy(msg_queue->name, name);
149
150 mq_buf_ptr = k_malloc(msg_size * max_msgs * sizeof(uint8_t));
151 if (mq_buf_ptr != NULL) {
152 (void)memset(mq_buf_ptr, 0,
153 msg_size * max_msgs * sizeof(uint8_t));
154 msg_queue->mem_buffer = mq_buf_ptr;
155 } else {
156 goto free_mq_buffer;
157 }
158
159 (void)atomic_set(&msg_queue->ref_count, 1);
160 /* initialize zephyr message queue */
161 k_msgq_init(&msg_queue->queue, msg_queue->mem_buffer, msg_size,
162 max_msgs);
163 k_sem_take(&mq_sem, K_FOREVER);
164 sys_slist_append(&mq_list, (sys_snode_t *)&(msg_queue->snode));
165 k_sem_give(&mq_sem);
166
167 } else {
168 atomic_inc(&msg_queue->ref_count);
169 }
170
171 msg_queue_desc->mqueue = msg_queue;
172 msg_queue_desc->flags = (oflags & O_NONBLOCK) != 0 ? O_NONBLOCK : 0;
173 return (mqd_t)msg_queue_desc;
174
175 free_mq_buffer:
176 k_free(mq_name_ptr);
177 free_mq_name:
178 k_free(mq_obj_ptr);
179 free_mq_object:
180 k_free(mq_desc_ptr);
181 free_mq_desc:
182 errno = ENOSPC;
183 return (mqd_t)mqd;
184 }
185
186 /**
187 * @brief Close a message queue descriptor.
188 *
189 * See IEEE 1003.1
190 */
mq_close(mqd_t mqdes)191 int mq_close(mqd_t mqdes)
192 {
193 mqueue_desc *mqd = (mqueue_desc *)mqdes;
194
195 if (mqd == NULL) {
196 errno = EBADF;
197 return -1;
198 }
199
200 atomic_dec(&mqd->mqueue->ref_count);
201
202 /* remove mq if marked for unlink */
203 if (mqd->mqueue->name == NULL) {
204 remove_mq(mqd->mqueue);
205 }
206
207 k_free(mqd->mem_desc);
208 return 0;
209 }
210
211 /**
212 * @brief Remove a message queue.
213 *
214 * See IEEE 1003.1
215 */
mq_unlink(const char * name)216 int mq_unlink(const char *name)
217 {
218 mqueue_object *msg_queue;
219
220 k_sem_take(&mq_sem, K_FOREVER);
221 msg_queue = find_in_list(name);
222
223 if (msg_queue == NULL) {
224 k_sem_give(&mq_sem);
225 errno = EBADF;
226 return -1;
227 }
228
229 k_free(msg_queue->name);
230 msg_queue->name = NULL;
231 k_sem_give(&mq_sem);
232 remove_mq(msg_queue);
233 return 0;
234 }
235
236 /**
237 * @brief Send a message to a message queue.
238 *
239 * All messages in message queue are of equal priority.
240 *
241 * See IEEE 1003.1
242 */
mq_send(mqd_t mqdes,const char * msg_ptr,size_t msg_len,unsigned int msg_prio)243 int mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
244 unsigned int msg_prio)
245 {
246 mqueue_desc *mqd = (mqueue_desc *)mqdes;
247
248 return send_message(mqd, msg_ptr, msg_len, K_FOREVER);
249 }
250
251 /**
252 * @brief Send message to a message queue within abstime time.
253 *
254 * All messages in message queue are of equal priority.
255 *
256 * See IEEE 1003.1
257 */
mq_timedsend(mqd_t mqdes,const char * msg_ptr,size_t msg_len,unsigned int msg_prio,const struct timespec * abstime)258 int mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len,
259 unsigned int msg_prio, const struct timespec *abstime)
260 {
261 mqueue_desc *mqd = (mqueue_desc *)mqdes;
262 int32_t timeout = (int32_t) timespec_to_timeoutms(abstime);
263
264 return send_message(mqd, msg_ptr, msg_len, K_MSEC(timeout));
265 }
266
267 /**
268 * @brief Receive a message from a message queue.
269 *
270 * All messages in message queue are of equal priority.
271 *
272 * See IEEE 1003.1
273 */
mq_receive(mqd_t mqdes,char * msg_ptr,size_t msg_len,unsigned int * msg_prio)274 int mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
275 unsigned int *msg_prio)
276 {
277 mqueue_desc *mqd = (mqueue_desc *)mqdes;
278
279 return receive_message(mqd, msg_ptr, msg_len, K_FOREVER);
280 }
281
282 /**
283 * @brief Receive message from a message queue within abstime time.
284 *
285 * All messages in message queue are of equal priority.
286 *
287 * See IEEE 1003.1
288 */
mq_timedreceive(mqd_t mqdes,char * msg_ptr,size_t msg_len,unsigned int * msg_prio,const struct timespec * abstime)289 int mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len,
290 unsigned int *msg_prio, const struct timespec *abstime)
291 {
292 mqueue_desc *mqd = (mqueue_desc *)mqdes;
293 int32_t timeout = (int32_t) timespec_to_timeoutms(abstime);
294
295 return receive_message(mqd, msg_ptr, msg_len, K_MSEC(timeout));
296 }
297
298 /**
299 * @brief Get message queue attributes.
300 *
301 * See IEEE 1003.1
302 */
mq_getattr(mqd_t mqdes,struct mq_attr * mqstat)303 int mq_getattr(mqd_t mqdes, struct mq_attr *mqstat)
304 {
305 mqueue_desc *mqd = (mqueue_desc *)mqdes;
306 struct k_msgq_attrs attrs;
307
308 if (mqd == NULL) {
309 errno = EBADF;
310 return -1;
311 }
312
313 k_sem_take(&mq_sem, K_FOREVER);
314 k_msgq_get_attrs(&mqd->mqueue->queue, &attrs);
315 mqstat->mq_flags = mqd->flags;
316 mqstat->mq_maxmsg = attrs.max_msgs;
317 mqstat->mq_msgsize = attrs.msg_size;
318 mqstat->mq_curmsgs = attrs.used_msgs;
319 k_sem_give(&mq_sem);
320 return 0;
321 }
322
323 /**
324 * @brief Set message queue attributes.
325 *
326 * See IEEE 1003.1
327 */
mq_setattr(mqd_t mqdes,const struct mq_attr * mqstat,struct mq_attr * omqstat)328 int mq_setattr(mqd_t mqdes, const struct mq_attr *mqstat,
329 struct mq_attr *omqstat)
330 {
331 mqueue_desc *mqd = (mqueue_desc *)mqdes;
332
333 if (mqd == NULL) {
334 errno = EBADF;
335 return -1;
336 }
337
338 if (mqstat->mq_flags != 0 && mqstat->mq_flags != O_NONBLOCK) {
339 errno = EINVAL;
340 return -1;
341 }
342
343 if (omqstat != NULL) {
344 mq_getattr(mqdes, omqstat);
345 }
346
347 k_sem_take(&mq_sem, K_FOREVER);
348 mqd->flags = mqstat->mq_flags;
349 k_sem_give(&mq_sem);
350
351 return 0;
352 }
353
354 /* Internal functions */
find_in_list(const char * name)355 static mqueue_object *find_in_list(const char *name)
356 {
357 sys_snode_t *mq;
358 mqueue_object *msg_queue;
359
360 mq = mq_list.head;
361
362 while (mq != NULL) {
363 msg_queue = (mqueue_object *)mq;
364 if (strcmp(msg_queue->name, name) == 0) {
365 return msg_queue;
366 }
367
368 mq = mq->next;
369 }
370
371 return NULL;
372 }
373
send_message(mqueue_desc * mqd,const char * msg_ptr,size_t msg_len,k_timeout_t timeout)374 static int32_t send_message(mqueue_desc *mqd, const char *msg_ptr, size_t msg_len,
375 k_timeout_t timeout)
376 {
377 int32_t ret = -1;
378
379 if (mqd == NULL) {
380 errno = EBADF;
381 return ret;
382 }
383
384 if ((mqd->flags & O_NONBLOCK) != 0U) {
385 timeout = K_NO_WAIT;
386 }
387
388 if (msg_len > mqd->mqueue->queue.msg_size) {
389 errno = EMSGSIZE;
390 return ret;
391 }
392
393 if (k_msgq_put(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
394 errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
395 return ret;
396 }
397
398 return 0;
399 }
400
receive_message(mqueue_desc * mqd,char * msg_ptr,size_t msg_len,k_timeout_t timeout)401 static int32_t receive_message(mqueue_desc *mqd, char *msg_ptr, size_t msg_len,
402 k_timeout_t timeout)
403 {
404 int ret = -1;
405
406 if (mqd == NULL) {
407 errno = EBADF;
408 return ret;
409 }
410
411 if (msg_len < mqd->mqueue->queue.msg_size) {
412 errno = EMSGSIZE;
413 return ret;
414 }
415
416 if ((mqd->flags & O_NONBLOCK) != 0U) {
417 timeout = K_NO_WAIT;
418 }
419
420 if (k_msgq_get(&mqd->mqueue->queue, (void *)msg_ptr, timeout) != 0) {
421 errno = K_TIMEOUT_EQ(timeout, K_NO_WAIT) ? EAGAIN : ETIMEDOUT;
422 } else {
423 ret = mqd->mqueue->queue.msg_size;
424 }
425
426 return ret;
427 }
428
remove_mq(mqueue_object * msg_queue)429 static void remove_mq(mqueue_object *msg_queue)
430 {
431 if (atomic_cas(&msg_queue->ref_count, 0, 0)) {
432 k_sem_take(&mq_sem, K_FOREVER);
433 sys_slist_find_and_remove(&mq_list, (sys_snode_t *) msg_queue);
434 k_sem_give(&mq_sem);
435
436 /* Free mq buffer and pbject */
437 k_free(msg_queue->mem_buffer);
438 k_free(msg_queue->mem_obj);
439 }
440 }
441