1 /*
2 * Copyright (c) 2018 Intel Corporation
3 * Copyright (c) 2024 BayLibre, SAS
4 *
5 * SPDX-License-Identifier: Apache-2.0
6 */
7
8 #include <fcntl.h>
9 #include <mqueue.h>
10 #include <pthread.h>
11
12 #include <zephyr/sys/util.h>
13 #include <zephyr/ztest.h>
14
15 #define N_THR 2
16 #define MESSAGE_SIZE 16
17 #define MESG_COUNT_PERMQ 4
18
19 static char queue[16] = "server";
20
21 static char send_data[MESSAGE_SIZE] = "timed data send";
22
23 /*
24 * For platforms that select CONFIG_KERNEL_COHERENCE, the receive buffer can
25 * not be on the stack as the k_msgq that underlies the mq_timedsend() will
26 * copy directly to the receiver's buffer when there is already a waiting
27 * receiver.
28 */
29
30 static char rec_data[MESSAGE_SIZE];
31
sender_thread(void * p1)32 static void *sender_thread(void *p1)
33 {
34 mqd_t mqd;
35 struct timespec curtime;
36
37 mqd = mq_open(queue, O_WRONLY);
38 clock_gettime(CLOCK_MONOTONIC, &curtime);
39 curtime.tv_sec += 1;
40 zassert_false(mq_timedsend(mqd, send_data, MESSAGE_SIZE, 0, &curtime),
41 "Not able to send message in timer");
42 usleep(USEC_PER_MSEC);
43 zassert_false(mq_close(mqd),
44 "unable to close message queue descriptor.");
45 pthread_exit(p1);
46 return NULL;
47 }
48
49
receiver_thread(void * p1)50 static void *receiver_thread(void *p1)
51 {
52 mqd_t mqd;
53 struct timespec curtime;
54
55 mqd = mq_open(queue, O_RDONLY);
56 clock_gettime(CLOCK_MONOTONIC, &curtime);
57 curtime.tv_sec += 1;
58 mq_timedreceive(mqd, rec_data, MESSAGE_SIZE, 0, &curtime);
59 zassert_false(strcmp(rec_data, send_data), "Error in data reception. exp: %s act: %s",
60 send_data, rec_data);
61 usleep(USEC_PER_MSEC);
62 zassert_false(mq_close(mqd),
63 "unable to close message queue descriptor.");
64 pthread_exit(p1);
65 return NULL;
66 }
67
ZTEST(mqueue,test_mqueue)68 ZTEST(mqueue, test_mqueue)
69 {
70 mqd_t mqd;
71 struct mq_attr attrs;
72 int32_t mode = 0777;
73 int flags = O_RDWR | O_CREAT;
74 void *retval;
75 pthread_t newthread[N_THR];
76
77 attrs.mq_msgsize = MESSAGE_SIZE;
78 attrs.mq_maxmsg = MESG_COUNT_PERMQ;
79
80 mqd = mq_open(queue, flags, mode, &attrs);
81
82 for (int i = 0; i < N_THR; i++) {
83 /* Creating threads */
84 zassert_ok(pthread_create(&newthread[i], NULL,
85 (i % 2 == 0) ? receiver_thread : sender_thread, NULL));
86 }
87
88 usleep(USEC_PER_MSEC * 10U);
89
90 for (int i = 0; i < N_THR; i++) {
91 pthread_join(newthread[i], &retval);
92 }
93
94 zassert_false(mq_close(mqd),
95 "unable to close message queue descriptor.");
96 zassert_false(mq_unlink(queue), "Not able to unlink Queue");
97 }
98
99 static bool notification_executed;
100
notify_function_basic(union sigval val)101 void notify_function_basic(union sigval val)
102 {
103 mqd_t mqd;
104 bool *executed = (bool *)val.sival_ptr;
105
106 mqd = mq_open(queue, O_RDONLY);
107
108 mq_receive(mqd, rec_data, MESSAGE_SIZE, 0);
109 zassert_ok(strcmp(rec_data, send_data),
110 "Error in data reception. exp: %s act: %s", send_data, rec_data);
111
112 zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
113
114 *executed = true;
115 }
116
ZTEST(mqueue,test_mqueue_notify_basic)117 ZTEST(mqueue, test_mqueue_notify_basic)
118 {
119 mqd_t mqd;
120 struct mq_attr attrs = {
121 .mq_msgsize = MESSAGE_SIZE,
122 .mq_maxmsg = MESG_COUNT_PERMQ,
123 };
124 struct sigevent not = {
125 .sigev_notify = SIGEV_NONE,
126 .sigev_value.sival_ptr = (void *)¬ification_executed,
127 .sigev_notify_function = notify_function_basic,
128 };
129 int32_t mode = 0777;
130 int flags = O_RDWR | O_CREAT;
131
132 notification_executed = false;
133 memset(rec_data, 0, MESSAGE_SIZE);
134
135 mqd = mq_open(queue, flags, mode, &attrs);
136
137 zassert_ok(mq_notify(mqd, ¬), "Unable to set notification.");
138
139 zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message");
140
141 zassert_true(notification_executed, "Notification not triggered.");
142
143 zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
144 zassert_ok(mq_unlink(queue), "Unable to unlink queue");
145 }
146
notify_function_thread(union sigval val)147 void notify_function_thread(union sigval val)
148 {
149 mqd_t mqd;
150 pthread_t sender = (pthread_t)val.sival_int;
151
152 zassert_not_equal(sender, pthread_self(),
153 "Notification function should be executed from different thread.");
154
155 mqd = mq_open(queue, O_RDONLY);
156
157 mq_receive(mqd, rec_data, MESSAGE_SIZE, 0);
158 zassert_ok(strcmp(rec_data, send_data),
159 "Error in data reception. exp: %s act: %s", send_data, rec_data);
160
161 zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
162
163 notification_executed = true;
164 }
165
ZTEST(mqueue,test_mqueue_notify_thread)166 ZTEST(mqueue, test_mqueue_notify_thread)
167 {
168 mqd_t mqd;
169 struct mq_attr attrs = {
170 .mq_msgsize = MESSAGE_SIZE,
171 .mq_maxmsg = MESG_COUNT_PERMQ,
172 };
173 struct sigevent not = {
174 .sigev_notify = SIGEV_THREAD,
175 .sigev_value.sival_int = (int)pthread_self(),
176 .sigev_notify_function = notify_function_thread,
177 };
178 int32_t mode = 0777;
179 int flags = O_RDWR | O_CREAT;
180
181 notification_executed = false;
182 memset(rec_data, 0, MESSAGE_SIZE);
183
184 mqd = mq_open(queue, flags, mode, &attrs);
185
186 zassert_ok(mq_notify(mqd, ¬), "Unable to set notification.");
187
188 zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message");
189
190 usleep(USEC_PER_MSEC * 100U);
191
192 zassert_true(notification_executed, "Notification not triggered.");
193
194 zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
195 zassert_ok(mq_unlink(queue), "Unable to unlink queue");
196 }
197
ZTEST(mqueue,test_mqueue_notify_non_empty_queue)198 ZTEST(mqueue, test_mqueue_notify_non_empty_queue)
199 {
200 mqd_t mqd;
201 struct mq_attr attrs = {
202 .mq_msgsize = MESSAGE_SIZE,
203 .mq_maxmsg = MESG_COUNT_PERMQ,
204 };
205 struct sigevent not = {
206 .sigev_notify = SIGEV_NONE,
207 .sigev_value.sival_ptr = (void *)¬ification_executed,
208 .sigev_notify_function = notify_function_basic,
209 };
210 int32_t mode = 0777;
211 int flags = O_RDWR | O_CREAT;
212
213 notification_executed = false;
214 memset(rec_data, 0, MESSAGE_SIZE);
215
216 mqd = mq_open(queue, flags, mode, &attrs);
217
218 zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message");
219
220 zassert_ok(mq_notify(mqd, ¬), "Unable to set notification.");
221
222 zassert_false(notification_executed, "Notification shouldn't be processed.");
223
224 mq_receive(mqd, rec_data, MESSAGE_SIZE, 0);
225 zassert_false(strcmp(rec_data, send_data),
226 "Error in data reception. exp: %s act: %s", send_data, rec_data);
227
228 memset(rec_data, 0, MESSAGE_SIZE);
229
230 zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message");
231
232 zassert_true(notification_executed, "Notification not triggered.");
233
234 zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
235 zassert_ok(mq_unlink(queue), "Unable to unlink queue");
236 }
237
ZTEST(mqueue,test_mqueue_notify_errors)238 ZTEST(mqueue, test_mqueue_notify_errors)
239 {
240 mqd_t mqd;
241 struct mq_attr attrs = {
242 .mq_msgsize = MESSAGE_SIZE,
243 .mq_maxmsg = MESG_COUNT_PERMQ,
244 };
245 struct sigevent not = {
246 .sigev_notify = SIGEV_SIGNAL,
247 .sigev_value.sival_ptr = (void *)¬ification_executed,
248 .sigev_notify_function = notify_function_basic,
249 };
250 int32_t mode = 0777;
251 int flags = O_RDWR | O_CREAT;
252
253 zassert_not_ok(mq_notify(NULL, NULL), "Should return -1 and set errno to EBADF.");
254 zassert_equal(errno, EBADF);
255
256 mqd = mq_open(queue, flags, mode, &attrs);
257
258 zassert_not_ok(mq_notify(mqd, NULL), "Should return -1 and set errno to EINVAL.");
259 zassert_equal(errno, EINVAL);
260
261 zassert_not_ok(mq_notify(mqd, ¬), "SIGEV_SIGNAL not supported should return -1.");
262 zassert_equal(errno, ENOSYS);
263
264 not.sigev_notify = SIGEV_NONE;
265
266 zassert_ok(mq_notify(mqd, ¬),
267 "Unexpected error while asigning notification to the queue.");
268
269 zassert_not_ok(mq_notify(mqd, ¬),
270 "Can't assign notification when there is another assigned.");
271 zassert_equal(errno, EBUSY);
272
273 zassert_ok(mq_notify(mqd, NULL), "Unable to remove notification from the message queue.");
274
275 zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
276 zassert_ok(mq_unlink(queue), "Unable to unlink queue");
277 }
278
before(void * arg)279 static void before(void *arg)
280 {
281 ARG_UNUSED(arg);
282
283 if (!IS_ENABLED(CONFIG_DYNAMIC_THREAD)) {
284 /* skip redundant testing if there is no thread pool / heap allocation */
285 ztest_test_skip();
286 }
287 }
288
289 ZTEST_SUITE(mqueue, NULL, NULL, before, NULL, NULL);
290