1 /*
2  * Copyright (c) 2018 Intel Corporation
3  * Copyright (c) 2024 BayLibre, SAS
4  *
5  * SPDX-License-Identifier: Apache-2.0
6  */
7 
8 #include <fcntl.h>
9 #include <mqueue.h>
10 #include <pthread.h>
11 
12 #include <zephyr/sys/util.h>
13 #include <zephyr/ztest.h>
14 
15 #define N_THR            2
16 #define MESSAGE_SIZE 16
17 #define MESG_COUNT_PERMQ 4
18 
19 static char queue[16] = "server";
20 
21 static char send_data[MESSAGE_SIZE] = "timed data send";
22 
23 /*
24  * For platforms that select CONFIG_KERNEL_COHERENCE, the receive buffer can
25  * not be on the stack as the k_msgq that underlies the mq_timedsend() will
26  * copy directly to the receiver's buffer when there is already a waiting
27  * receiver.
28  */
29 
30 static char rec_data[MESSAGE_SIZE];
31 
sender_thread(void * p1)32 static void *sender_thread(void *p1)
33 {
34 	mqd_t mqd;
35 	struct timespec curtime;
36 
37 	mqd = mq_open(queue, O_WRONLY);
38 	clock_gettime(CLOCK_MONOTONIC, &curtime);
39 	curtime.tv_sec += 1;
40 	zassert_false(mq_timedsend(mqd, send_data, MESSAGE_SIZE, 0, &curtime),
41 		      "Not able to send message in timer");
42 	usleep(USEC_PER_MSEC);
43 	zassert_false(mq_close(mqd),
44 		      "unable to close message queue descriptor.");
45 	pthread_exit(p1);
46 	return NULL;
47 }
48 
49 
receiver_thread(void * p1)50 static void *receiver_thread(void *p1)
51 {
52 	mqd_t mqd;
53 	struct timespec curtime;
54 
55 	mqd = mq_open(queue, O_RDONLY);
56 	clock_gettime(CLOCK_MONOTONIC, &curtime);
57 	curtime.tv_sec += 1;
58 	mq_timedreceive(mqd, rec_data, MESSAGE_SIZE, 0, &curtime);
59 	zassert_false(strcmp(rec_data, send_data), "Error in data reception. exp: %s act: %s",
60 		      send_data, rec_data);
61 	usleep(USEC_PER_MSEC);
62 	zassert_false(mq_close(mqd),
63 		      "unable to close message queue descriptor.");
64 	pthread_exit(p1);
65 	return NULL;
66 }
67 
ZTEST(mqueue,test_mqueue)68 ZTEST(mqueue, test_mqueue)
69 {
70 	mqd_t mqd;
71 	struct mq_attr attrs;
72 	int32_t mode = 0777;
73 	int flags = O_RDWR | O_CREAT;
74 	void *retval;
75 	pthread_t newthread[N_THR];
76 
77 	attrs.mq_msgsize = MESSAGE_SIZE;
78 	attrs.mq_maxmsg = MESG_COUNT_PERMQ;
79 
80 	mqd = mq_open(queue, flags, mode, &attrs);
81 
82 	for (int i = 0; i < N_THR; i++) {
83 		/* Creating threads */
84 		zassert_ok(pthread_create(&newthread[i], NULL,
85 					  (i % 2 == 0) ? receiver_thread : sender_thread, NULL));
86 	}
87 
88 	usleep(USEC_PER_MSEC * 10U);
89 
90 	for (int i = 0; i < N_THR; i++) {
91 		pthread_join(newthread[i], &retval);
92 	}
93 
94 	zassert_false(mq_close(mqd),
95 		      "unable to close message queue descriptor.");
96 	zassert_false(mq_unlink(queue), "Not able to unlink Queue");
97 }
98 
99 static bool notification_executed;
100 
notify_function_basic(union sigval val)101 void notify_function_basic(union sigval val)
102 {
103 	mqd_t mqd;
104 	bool *executed = (bool *)val.sival_ptr;
105 
106 	mqd = mq_open(queue, O_RDONLY);
107 
108 	mq_receive(mqd, rec_data, MESSAGE_SIZE, 0);
109 	zassert_ok(strcmp(rec_data, send_data),
110 		   "Error in data reception. exp: %s act: %s", send_data, rec_data);
111 
112 	zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
113 
114 	*executed = true;
115 }
116 
ZTEST(mqueue,test_mqueue_notify_basic)117 ZTEST(mqueue, test_mqueue_notify_basic)
118 {
119 	mqd_t mqd;
120 	struct mq_attr attrs = {
121 		.mq_msgsize = MESSAGE_SIZE,
122 		.mq_maxmsg = MESG_COUNT_PERMQ,
123 	};
124 	struct sigevent not = {
125 		.sigev_notify = SIGEV_NONE,
126 		.sigev_value.sival_ptr = (void *)&notification_executed,
127 		.sigev_notify_function = notify_function_basic,
128 	};
129 	int32_t mode = 0777;
130 	int flags = O_RDWR | O_CREAT;
131 
132 	notification_executed = false;
133 	memset(rec_data, 0, MESSAGE_SIZE);
134 
135 	mqd = mq_open(queue, flags, mode, &attrs);
136 
137 	zassert_ok(mq_notify(mqd, &not), "Unable to set notification.");
138 
139 	zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message");
140 
141 	zassert_true(notification_executed, "Notification not triggered.");
142 
143 	zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
144 	zassert_ok(mq_unlink(queue), "Unable to unlink queue");
145 }
146 
notify_function_thread(union sigval val)147 void notify_function_thread(union sigval val)
148 {
149 	mqd_t mqd;
150 	pthread_t sender = (pthread_t)val.sival_int;
151 
152 	zassert_not_equal(sender, pthread_self(),
153 			  "Notification function should be executed from different thread.");
154 
155 	mqd = mq_open(queue, O_RDONLY);
156 
157 	mq_receive(mqd, rec_data, MESSAGE_SIZE, 0);
158 	zassert_ok(strcmp(rec_data, send_data),
159 		   "Error in data reception. exp: %s act: %s", send_data, rec_data);
160 
161 	zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
162 
163 	notification_executed = true;
164 }
165 
ZTEST(mqueue,test_mqueue_notify_thread)166 ZTEST(mqueue, test_mqueue_notify_thread)
167 {
168 	mqd_t mqd;
169 	struct mq_attr attrs = {
170 		.mq_msgsize = MESSAGE_SIZE,
171 		.mq_maxmsg = MESG_COUNT_PERMQ,
172 	};
173 	struct sigevent not = {
174 		.sigev_notify = SIGEV_THREAD,
175 		.sigev_value.sival_int = (int)pthread_self(),
176 		.sigev_notify_function = notify_function_thread,
177 	};
178 	int32_t mode = 0777;
179 	int flags = O_RDWR | O_CREAT;
180 
181 	notification_executed = false;
182 	memset(rec_data, 0, MESSAGE_SIZE);
183 
184 	mqd = mq_open(queue, flags, mode, &attrs);
185 
186 	zassert_ok(mq_notify(mqd, &not), "Unable to set notification.");
187 
188 	zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message");
189 
190 	usleep(USEC_PER_MSEC * 100U);
191 
192 	zassert_true(notification_executed, "Notification not triggered.");
193 
194 	zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
195 	zassert_ok(mq_unlink(queue), "Unable to unlink queue");
196 }
197 
ZTEST(mqueue,test_mqueue_notify_non_empty_queue)198 ZTEST(mqueue, test_mqueue_notify_non_empty_queue)
199 {
200 	mqd_t mqd;
201 	struct mq_attr attrs = {
202 		.mq_msgsize = MESSAGE_SIZE,
203 		.mq_maxmsg = MESG_COUNT_PERMQ,
204 	};
205 	struct sigevent not = {
206 		.sigev_notify = SIGEV_NONE,
207 		.sigev_value.sival_ptr = (void *)&notification_executed,
208 		.sigev_notify_function = notify_function_basic,
209 	};
210 	int32_t mode = 0777;
211 	int flags = O_RDWR | O_CREAT;
212 
213 	notification_executed = false;
214 	memset(rec_data, 0, MESSAGE_SIZE);
215 
216 	mqd = mq_open(queue, flags, mode, &attrs);
217 
218 	zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message");
219 
220 	zassert_ok(mq_notify(mqd, &not), "Unable to set notification.");
221 
222 	zassert_false(notification_executed, "Notification shouldn't be processed.");
223 
224 	mq_receive(mqd, rec_data, MESSAGE_SIZE, 0);
225 	zassert_false(strcmp(rec_data, send_data),
226 		      "Error in data reception. exp: %s act: %s", send_data, rec_data);
227 
228 	memset(rec_data, 0, MESSAGE_SIZE);
229 
230 	zassert_ok(mq_send(mqd, send_data, MESSAGE_SIZE, 0), "Unable to send message");
231 
232 	zassert_true(notification_executed, "Notification not triggered.");
233 
234 	zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
235 	zassert_ok(mq_unlink(queue), "Unable to unlink queue");
236 }
237 
ZTEST(mqueue,test_mqueue_notify_errors)238 ZTEST(mqueue, test_mqueue_notify_errors)
239 {
240 	mqd_t mqd;
241 	struct mq_attr attrs = {
242 		.mq_msgsize = MESSAGE_SIZE,
243 		.mq_maxmsg = MESG_COUNT_PERMQ,
244 	};
245 	struct sigevent not = {
246 		.sigev_notify = SIGEV_SIGNAL,
247 		.sigev_value.sival_ptr = (void *)&notification_executed,
248 		.sigev_notify_function = notify_function_basic,
249 	};
250 	int32_t mode = 0777;
251 	int flags = O_RDWR | O_CREAT;
252 
253 	zassert_not_ok(mq_notify(NULL, NULL), "Should return -1 and set errno to EBADF.");
254 	zassert_equal(errno, EBADF);
255 
256 	mqd = mq_open(queue, flags, mode, &attrs);
257 
258 	zassert_not_ok(mq_notify(mqd, NULL), "Should return -1 and set errno to EINVAL.");
259 	zassert_equal(errno, EINVAL);
260 
261 	zassert_not_ok(mq_notify(mqd, &not), "SIGEV_SIGNAL not supported should return -1.");
262 	zassert_equal(errno, ENOSYS);
263 
264 	not.sigev_notify = SIGEV_NONE;
265 
266 	zassert_ok(mq_notify(mqd, &not),
267 		   "Unexpected error while asigning notification to the queue.");
268 
269 	zassert_not_ok(mq_notify(mqd, &not),
270 		       "Can't assign notification when there is another assigned.");
271 	zassert_equal(errno, EBUSY);
272 
273 	zassert_ok(mq_notify(mqd, NULL), "Unable to remove notification from the message queue.");
274 
275 	zassert_ok(mq_close(mqd), "Unable to close message queue descriptor.");
276 	zassert_ok(mq_unlink(queue), "Unable to unlink queue");
277 }
278 
before(void * arg)279 static void before(void *arg)
280 {
281 	ARG_UNUSED(arg);
282 
283 	if (!IS_ENABLED(CONFIG_DYNAMIC_THREAD)) {
284 		/* skip redundant testing if there is no thread pool / heap allocation */
285 		ztest_test_skip();
286 	}
287 }
288 
289 ZTEST_SUITE(mqueue, NULL, NULL, before, NULL, NULL);
290