1 /*
2  * Copyright (c) 2018 Intel Corporation
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <ztest.h>
8 #include <zephyr.h>
9 #include <sys/printk.h>
10 #include <fcntl.h>
11 #include <sys/util.h>
12 #include <mqueue.h>
13 #include <pthread.h>
14 
15 #define N_THR 2
16 #define STACKSZ (1024 + CONFIG_TEST_EXTRA_STACKSIZE)
17 #define SENDER_THREAD 0
18 #define RECEIVER_THREAD 1
19 #define MESSAGE_SIZE 16
20 #define MESG_COUNT_PERMQ 4
21 
22 K_THREAD_STACK_ARRAY_DEFINE(stacks, N_THR, STACKSZ);
23 
24 char queue[16] = "server";
25 char send_data[MESSAGE_SIZE] = "timed data send";
26 
sender_thread(void * p1)27 void *sender_thread(void *p1)
28 {
29 	mqd_t mqd;
30 	struct timespec curtime;
31 
32 	mqd = mq_open(queue, O_WRONLY);
33 	clock_gettime(CLOCK_MONOTONIC, &curtime);
34 	curtime.tv_sec += 1;
35 	zassert_false(mq_timedsend(mqd, send_data, MESSAGE_SIZE, 0, &curtime),
36 		      "Not able to send message in timer");
37 	usleep(USEC_PER_MSEC);
38 	zassert_false(mq_close(mqd),
39 		      "unable to close message queue descriptor.");
40 	pthread_exit(p1);
41 	return NULL;
42 }
43 
44 
receiver_thread(void * p1)45 void *receiver_thread(void *p1)
46 {
47 	mqd_t mqd;
48 	char rec_data[MESSAGE_SIZE];
49 	struct timespec curtime;
50 
51 	mqd = mq_open(queue, O_RDONLY);
52 	clock_gettime(CLOCK_MONOTONIC, &curtime);
53 	curtime.tv_sec += 1;
54 	mq_timedreceive(mqd, rec_data, MESSAGE_SIZE, 0, &curtime);
55 	zassert_false(strcmp(rec_data, send_data), "Error in data reception");
56 	usleep(USEC_PER_MSEC);
57 	zassert_false(mq_close(mqd),
58 		      "unable to close message queue descriptor.");
59 	pthread_exit(p1);
60 	return NULL;
61 }
62 
test_posix_mqueue(void)63 void test_posix_mqueue(void)
64 {
65 	mqd_t mqd;
66 	struct mq_attr attrs;
67 	int32_t mode = 0777, flags = O_RDWR | O_CREAT, ret, i;
68 	void *retval;
69 	pthread_attr_t attr[N_THR];
70 	pthread_t newthread[N_THR];
71 
72 	attrs.mq_msgsize = MESSAGE_SIZE;
73 	attrs.mq_maxmsg = MESG_COUNT_PERMQ;
74 
75 	mqd = mq_open(queue, flags, mode, &attrs);
76 
77 	for (i = 0; i < N_THR; i++) {
78 		/* Creating threads */
79 		if (pthread_attr_init(&attr[i]) != 0) {
80 			zassert_equal(pthread_attr_destroy(&attr[i]), 0, NULL);
81 			zassert_false(pthread_attr_init(&attr[i]),
82 				      "pthread attr init failed");
83 		}
84 		pthread_attr_setstack(&attr[i], &stacks[i][0], STACKSZ);
85 
86 		if (i % 2) {
87 			ret = pthread_create(&newthread[i], &attr[i],
88 					     sender_thread,
89 					     INT_TO_POINTER(i));
90 		} else {
91 			ret = pthread_create(&newthread[i], &attr[i],
92 					     receiver_thread,
93 					     INT_TO_POINTER(i));
94 		}
95 
96 		zassert_false(ret, "Not enough space to create new thread");
97 		zassert_equal(pthread_attr_destroy(&attr[i]), 0, NULL);
98 	}
99 
100 	usleep(USEC_PER_MSEC * 10U);
101 
102 	for (i = 0; i < N_THR; i++) {
103 		pthread_join(newthread[i], &retval);
104 	}
105 
106 	zassert_false(mq_close(mqd),
107 		      "unable to close message queue descriptor.");
108 	zassert_false(mq_unlink(queue), "Not able to unlink Queue");
109 }
110