1 /*
2  * Copyright (c) 2021 Intel Corporation
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/kernel.h>
8 #include <zephyr/irq_offload.h>
9 #include <zephyr/ztest.h>
10 #include <limits.h>
11 
12 #define MSGQ_LEN (2)
13 #define STACK_SIZE (512 + CONFIG_TEST_EXTRA_STACK_SIZE)
14 #define NUM_SERVICES 2
15 #define TIMEOUT K_MSEC(100)
16 
17 K_MSGQ_DEFINE(manager_q, sizeof(unsigned long) * 2, 4, 4);
18 struct k_msgq service1_msgq;
19 struct k_msgq service2_msgq;
20 struct k_msgq client_msgq;
21 K_THREAD_STACK_DEFINE(service_manager_stack, STACK_SIZE);
22 K_THREAD_STACK_DEFINE(service1_stack, STACK_SIZE);
23 K_THREAD_STACK_DEFINE(service2_stack, STACK_SIZE);
24 K_THREAD_STACK_DEFINE(client_stack, STACK_SIZE);
25 K_SEM_DEFINE(service_sema, 2, 2);
26 K_SEM_DEFINE(service_started, 0, 2);
27 K_SEM_DEFINE(test_continue, 0, 1);
28 struct k_thread service_manager;
29 struct k_thread service1;
30 struct k_thread service2;
31 struct k_thread client_thread;
32 static ZTEST_DMEM unsigned long __aligned(4) service1_buf[MSGQ_LEN];
33 static ZTEST_DMEM unsigned long __aligned(4) service2_buf[MSGQ_LEN];
34 static ZTEST_DMEM unsigned long __aligned(4) client_buf[MSGQ_LEN * 2];
35 static ZTEST_DMEM struct k_msgq *services[NUM_SERVICES];
36 static ZTEST_DMEM struct k_msgq *pclient;
37 static ZTEST_DMEM bool service1_run;
38 static ZTEST_DMEM bool service2_run;
39 static ZTEST_DMEM k_tid_t tservice_manager, tservice1, tservice2, tclient;
40 
41 enum message_info {
42 	QUERRY_SERVICE = 1,
43 	REGISTER_SERVICE1,
44 	REGISTER_SERVICE2,
45 	GET_SERVICE,
46 	SERVICE1_RUNNING,
47 	SERVICE2_RUNNING,
48 	SERVICE_QUIT
49 };
50 
service_manager_entry(void * p1,void * p2,void * p3)51 static void service_manager_entry(void *p1, void *p2, void *p3)
52 {
53 	static unsigned long data[2];
54 
55 	while (1) {
56 		k_msgq_get(&manager_q, data, K_FOREVER);
57 		switch (data[0]) {
58 		case QUERRY_SERVICE:
59 			pclient = (struct k_msgq *)data[1];
60 			k_msgq_put(pclient, services, K_NO_WAIT);
61 			break;
62 		case REGISTER_SERVICE1:
63 			services[0] = (struct k_msgq *)data[1];
64 			k_sem_give(&service_started);
65 			break;
66 		case REGISTER_SERVICE2:
67 			services[1] = (struct k_msgq *)data[1];
68 			k_sem_give(&service_started);
69 			break;
70 		case SERVICE_QUIT:
71 			for (int i = 0; i < NUM_SERVICES; i++) {
72 				if (services[i] == (struct k_msgq *)data[1]) {
73 					services[i] = NULL;
74 				}
75 			}
76 			/* wake up threads waiting for this queue */
77 			k_msgq_purge((struct k_msgq *)data[1]);
78 			break;
79 		default:
80 			TC_PRINT("Unknown message %ld\n", data[0]);
81 			break;
82 		}
83 		k_msleep(10);
84 	}
85 }
86 
start_service_manager(void)87 static void start_service_manager(void)
88 {
89 	int pri = k_thread_priority_get(k_current_get());
90 
91 	tservice_manager = k_thread_create(&service_manager,
92 					   service_manager_stack, STACK_SIZE,
93 					   service_manager_entry, NULL, NULL,
94 					   NULL, pri, 0, K_NO_WAIT);
95 }
96 
service1_entry(void * p1,void * p2,void * p3)97 static void service1_entry(void *p1, void *p2, void *p3)
98 {
99 	static unsigned long service_data[2];
100 	struct k_msgq *client;
101 	int ret;
102 
103 	service_data[0] = REGISTER_SERVICE1;
104 	service_data[1] = (unsigned long)&service1_msgq;
105 
106 	k_msgq_init(&service1_msgq, (char *)service1_buf,
107 		    sizeof(service1_buf), 1);
108 	ret = k_msgq_put(&manager_q, service_data, K_NO_WAIT);
109 	zassert_equal(ret, 0, "Can't register service");
110 
111 	k_sem_take(&service_sema, K_NO_WAIT);
112 	while (service1_run) {
113 		k_msgq_get(&service1_msgq, service_data, K_FOREVER);
114 		if (service_data[0] == GET_SERVICE) {
115 			client = (struct k_msgq *)service_data[1];
116 			service_data[0] = SERVICE1_RUNNING;
117 			k_msgq_put(client, service_data, K_NO_WAIT);
118 		}
119 		k_msleep(10);
120 	}
121 
122 	/* inform services manager */
123 	service_data[0] = SERVICE_QUIT;
124 	service_data[1] = (unsigned long)&service1_msgq;
125 	k_msgq_put(&manager_q, service_data, K_NO_WAIT);
126 }
127 
service2_entry(void * p1,void * p2,void * p3)128 static void service2_entry(void *p1, void *p2, void *p3)
129 {
130 	static unsigned long service_data[2];
131 	struct k_msgq *client;
132 	int ret;
133 
134 	service_data[0] = REGISTER_SERVICE2;
135 	service_data[1] = (unsigned long)&service2_msgq;
136 
137 	k_msgq_init(&service2_msgq, (char *)service2_buf,
138 		    sizeof(service2_buf), 1);
139 	ret = k_msgq_put(&manager_q, service_data, K_NO_WAIT);
140 	zassert_equal(ret, 0, "Can't register service");
141 
142 	k_sem_take(&service_sema, K_NO_WAIT);
143 	while (service2_run) {
144 		k_msgq_get(&service2_msgq, service_data, K_FOREVER);
145 		if (service_data[0] == GET_SERVICE) {
146 			client = (struct k_msgq *)service_data[1];
147 			service_data[0] = SERVICE2_RUNNING;
148 			k_msgq_put(client, service_data, K_NO_WAIT);
149 		}
150 		k_msleep(10);
151 	}
152 
153 	/* inform services manager */
154 	service_data[0] = SERVICE_QUIT;
155 	service_data[1] = (unsigned long)&service2_msgq;
156 	k_msgq_put(&manager_q, service_data, K_NO_WAIT);
157 }
158 
register_service(void)159 static void register_service(void)
160 {
161 
162 	int pri = k_thread_priority_get(k_current_get());
163 
164 	service1_run = true;
165 	tservice1 = k_thread_create(&service1, service1_stack, STACK_SIZE,
166 				    service1_entry, NULL, NULL, NULL, pri,
167 				    0, K_NO_WAIT);
168 
169 	service2_run = true;
170 	tservice2 = k_thread_create(&service2, service2_stack, STACK_SIZE,
171 				    service2_entry, NULL, NULL, NULL, pri,
172 				    0, K_NO_WAIT);
173 }
174 
client_entry(void * p1,void * p2,void * p3)175 static void client_entry(void *p1, void *p2, void *p3)
176 {
177 	static unsigned long client_data[2];
178 	static unsigned long service_data[2];
179 	struct k_msgq *service1q;
180 	struct k_msgq *service2q;
181 	bool query_service = false;
182 	int ret;
183 
184 
185 	k_msgq_init(&client_msgq, (char *)client_buf,
186 		    sizeof(unsigned long) * 2, 2);
187 	client_data[0] = QUERRY_SERVICE;
188 	client_data[1] = (unsigned long)&client_msgq;
189 
190 	/* wait all services started */
191 	k_sem_take(&service_started, K_FOREVER);
192 	k_sem_take(&service_started, K_FOREVER);
193 
194 	/* query services */
195 	k_msgq_put(&manager_q, client_data, K_NO_WAIT);
196 	ret = k_msgq_get(&client_msgq, service_data, K_FOREVER);
197 	zassert_equal(ret, 0);
198 
199 	service1q = (struct k_msgq *)service_data[0];
200 	service2q = (struct k_msgq *)service_data[1];
201 	/* all services should be running */
202 	zassert_equal(service1q, &service1_msgq);
203 	zassert_equal(service2q, &service2_msgq);
204 	/* let the test thread continue */
205 	k_sem_give(&test_continue);
206 
207 	while (1) {
208 		/* service might quit */
209 		if (query_service) {
210 			client_data[0] = QUERRY_SERVICE;
211 			client_data[1] = (unsigned long)&client_msgq;
212 			k_msgq_put(&manager_q, client_data, K_NO_WAIT);
213 			k_msgq_get(&client_msgq, service_data, K_FOREVER);
214 			service1q = (struct k_msgq *)service_data[0];
215 			service2q = (struct k_msgq *)service_data[1];
216 			query_service = false;
217 		}
218 
219 		if (!service1q && !service2q) {
220 			break;
221 		}
222 
223 		client_data[0] = GET_SERVICE;
224 		client_data[1] = (unsigned long)&client_msgq;
225 
226 		if (service1q) {
227 			k_msgq_put(service1q, client_data, K_NO_WAIT);
228 			ret = k_msgq_get(&client_msgq, service_data, TIMEOUT);
229 			if (!ret) {
230 				zassert_equal(service_data[0],
231 					      SERVICE1_RUNNING, NULL);
232 			} else {
233 				/* service might down, query in next loop */
234 				query_service = true;
235 			}
236 		}
237 
238 		if (service2q) {
239 			k_msgq_put(service2q, client_data, K_NO_WAIT);
240 			ret = k_msgq_get(&client_msgq, service_data, TIMEOUT);
241 			if (!ret) {
242 				zassert_equal(service_data[0],
243 					      SERVICE2_RUNNING, NULL);
244 			} else {
245 				query_service = true;
246 			}
247 		}
248 		k_msleep(10);
249 	}
250 }
251 
start_client(void)252 static void start_client(void)
253 {
254 
255 	int pri = k_thread_priority_get(k_current_get());
256 
257 	tclient = k_thread_create(&client_thread, client_stack, STACK_SIZE,
258 				  client_entry, NULL, NULL, NULL, pri,
259 				  0, K_NO_WAIT);
260 }
261 
ZTEST(msgq_usage,test_msgq_usage)262 ZTEST(msgq_usage, test_msgq_usage)
263 {
264 	start_service_manager();
265 	register_service();
266 	start_client();
267 	/* waiting to continue */
268 	k_sem_take(&test_continue, K_FOREVER);
269 
270 	/* rather than schedule this thread by k_msleep(), use semaphore with
271 	 * a timeout value, so there is no give operation over service_sema
272 	 */
273 	TC_PRINT("try to kill service1\n");
274 	k_sem_take(&service_sema, Z_TIMEOUT_MS(500));
275 	service1_run = false;
276 
277 	TC_PRINT("try to kill service2\n");
278 	k_sem_take(&service_sema, Z_TIMEOUT_MS(500));
279 	service2_run = false;
280 
281 	k_thread_join(tservice1, K_FOREVER);
282 	k_thread_join(tservice2, K_FOREVER);
283 	k_thread_join(tclient, K_FOREVER);
284 	k_thread_abort(tservice_manager);
285 }
286 
287 ZTEST_SUITE(msgq_usage, NULL, NULL, NULL, NULL, NULL);
288