1 /*
2 * Copyright (c) 2021 Intel Corporation
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <zephyr/kernel.h>
8 #include <zephyr/irq_offload.h>
9 #include <zephyr/ztest.h>
10 #include <limits.h>
11
12 #define MSGQ_LEN (2)
13 #define STACK_SIZE (512 + CONFIG_TEST_EXTRA_STACK_SIZE)
14 #define NUM_SERVICES 2
15 #define TIMEOUT K_MSEC(100)
16
17 K_MSGQ_DEFINE(manager_q, sizeof(unsigned long) * 2, 4, 4);
18 struct k_msgq service1_msgq;
19 struct k_msgq service2_msgq;
20 struct k_msgq client_msgq;
21 K_THREAD_STACK_DEFINE(service_manager_stack, STACK_SIZE);
22 K_THREAD_STACK_DEFINE(service1_stack, STACK_SIZE);
23 K_THREAD_STACK_DEFINE(service2_stack, STACK_SIZE);
24 K_THREAD_STACK_DEFINE(client_stack, STACK_SIZE);
25 K_SEM_DEFINE(service_sema, 2, 2);
26 K_SEM_DEFINE(service_started, 0, 2);
27 K_SEM_DEFINE(test_continue, 0, 1);
28 struct k_thread service_manager;
29 struct k_thread service1;
30 struct k_thread service2;
31 struct k_thread client_thread;
32 static ZTEST_DMEM unsigned long __aligned(4) service1_buf[MSGQ_LEN];
33 static ZTEST_DMEM unsigned long __aligned(4) service2_buf[MSGQ_LEN];
34 static ZTEST_DMEM unsigned long __aligned(4) client_buf[MSGQ_LEN * 2];
35 static ZTEST_DMEM struct k_msgq *services[NUM_SERVICES];
36 static ZTEST_DMEM struct k_msgq *pclient;
37 static ZTEST_DMEM bool service1_run;
38 static ZTEST_DMEM bool service2_run;
39 static ZTEST_DMEM k_tid_t tservice_manager, tservice1, tservice2, tclient;
40
41 enum message_info {
42 QUERRY_SERVICE = 1,
43 REGISTER_SERVICE1,
44 REGISTER_SERVICE2,
45 GET_SERVICE,
46 SERVICE1_RUNNING,
47 SERVICE2_RUNNING,
48 SERVICE_QUIT
49 };
50
service_manager_entry(void * p1,void * p2,void * p3)51 static void service_manager_entry(void *p1, void *p2, void *p3)
52 {
53 static unsigned long data[2];
54
55 while (1) {
56 k_msgq_get(&manager_q, data, K_FOREVER);
57 switch (data[0]) {
58 case QUERRY_SERVICE:
59 pclient = (struct k_msgq *)data[1];
60 k_msgq_put(pclient, services, K_NO_WAIT);
61 break;
62 case REGISTER_SERVICE1:
63 services[0] = (struct k_msgq *)data[1];
64 k_sem_give(&service_started);
65 break;
66 case REGISTER_SERVICE2:
67 services[1] = (struct k_msgq *)data[1];
68 k_sem_give(&service_started);
69 break;
70 case SERVICE_QUIT:
71 for (int i = 0; i < NUM_SERVICES; i++) {
72 if (services[i] == (struct k_msgq *)data[1]) {
73 services[i] = NULL;
74 }
75 }
76 /* wake up threads waiting for this queue */
77 k_msgq_purge((struct k_msgq *)data[1]);
78 break;
79 default:
80 TC_PRINT("Unknown message %ld\n", data[0]);
81 break;
82 }
83 k_msleep(10);
84 }
85 }
86
start_service_manager(void)87 static void start_service_manager(void)
88 {
89 int pri = k_thread_priority_get(k_current_get());
90
91 tservice_manager = k_thread_create(&service_manager,
92 service_manager_stack, STACK_SIZE,
93 service_manager_entry, NULL, NULL,
94 NULL, pri, 0, K_NO_WAIT);
95 }
96
service1_entry(void * p1,void * p2,void * p3)97 static void service1_entry(void *p1, void *p2, void *p3)
98 {
99 static unsigned long service_data[2];
100 struct k_msgq *client;
101 int ret;
102
103 service_data[0] = REGISTER_SERVICE1;
104 service_data[1] = (unsigned long)&service1_msgq;
105
106 k_msgq_init(&service1_msgq, (char *)service1_buf,
107 sizeof(service1_buf), 1);
108 ret = k_msgq_put(&manager_q, service_data, K_NO_WAIT);
109 zassert_equal(ret, 0, "Can't register service");
110
111 k_sem_take(&service_sema, K_NO_WAIT);
112 while (service1_run) {
113 k_msgq_get(&service1_msgq, service_data, K_FOREVER);
114 if (service_data[0] == GET_SERVICE) {
115 client = (struct k_msgq *)service_data[1];
116 service_data[0] = SERVICE1_RUNNING;
117 k_msgq_put(client, service_data, K_NO_WAIT);
118 }
119 k_msleep(10);
120 }
121
122 /* inform services manager */
123 service_data[0] = SERVICE_QUIT;
124 service_data[1] = (unsigned long)&service1_msgq;
125 k_msgq_put(&manager_q, service_data, K_NO_WAIT);
126 }
127
service2_entry(void * p1,void * p2,void * p3)128 static void service2_entry(void *p1, void *p2, void *p3)
129 {
130 static unsigned long service_data[2];
131 struct k_msgq *client;
132 int ret;
133
134 service_data[0] = REGISTER_SERVICE2;
135 service_data[1] = (unsigned long)&service2_msgq;
136
137 k_msgq_init(&service2_msgq, (char *)service2_buf,
138 sizeof(service2_buf), 1);
139 ret = k_msgq_put(&manager_q, service_data, K_NO_WAIT);
140 zassert_equal(ret, 0, "Can't register service");
141
142 k_sem_take(&service_sema, K_NO_WAIT);
143 while (service2_run) {
144 k_msgq_get(&service2_msgq, service_data, K_FOREVER);
145 if (service_data[0] == GET_SERVICE) {
146 client = (struct k_msgq *)service_data[1];
147 service_data[0] = SERVICE2_RUNNING;
148 k_msgq_put(client, service_data, K_NO_WAIT);
149 }
150 k_msleep(10);
151 }
152
153 /* inform services manager */
154 service_data[0] = SERVICE_QUIT;
155 service_data[1] = (unsigned long)&service2_msgq;
156 k_msgq_put(&manager_q, service_data, K_NO_WAIT);
157 }
158
register_service(void)159 static void register_service(void)
160 {
161
162 int pri = k_thread_priority_get(k_current_get());
163
164 service1_run = true;
165 tservice1 = k_thread_create(&service1, service1_stack, STACK_SIZE,
166 service1_entry, NULL, NULL, NULL, pri,
167 0, K_NO_WAIT);
168
169 service2_run = true;
170 tservice2 = k_thread_create(&service2, service2_stack, STACK_SIZE,
171 service2_entry, NULL, NULL, NULL, pri,
172 0, K_NO_WAIT);
173 }
174
client_entry(void * p1,void * p2,void * p3)175 static void client_entry(void *p1, void *p2, void *p3)
176 {
177 static unsigned long client_data[2];
178 static unsigned long service_data[2];
179 struct k_msgq *service1q;
180 struct k_msgq *service2q;
181 bool query_service = false;
182 int ret;
183
184
185 k_msgq_init(&client_msgq, (char *)client_buf,
186 sizeof(unsigned long) * 2, 2);
187 client_data[0] = QUERRY_SERVICE;
188 client_data[1] = (unsigned long)&client_msgq;
189
190 /* wait all services started */
191 k_sem_take(&service_started, K_FOREVER);
192 k_sem_take(&service_started, K_FOREVER);
193
194 /* query services */
195 k_msgq_put(&manager_q, client_data, K_NO_WAIT);
196 ret = k_msgq_get(&client_msgq, service_data, K_FOREVER);
197 zassert_equal(ret, 0);
198
199 service1q = (struct k_msgq *)service_data[0];
200 service2q = (struct k_msgq *)service_data[1];
201 /* all services should be running */
202 zassert_equal(service1q, &service1_msgq);
203 zassert_equal(service2q, &service2_msgq);
204 /* let the test thread continue */
205 k_sem_give(&test_continue);
206
207 while (1) {
208 /* service might quit */
209 if (query_service) {
210 client_data[0] = QUERRY_SERVICE;
211 client_data[1] = (unsigned long)&client_msgq;
212 k_msgq_put(&manager_q, client_data, K_NO_WAIT);
213 k_msgq_get(&client_msgq, service_data, K_FOREVER);
214 service1q = (struct k_msgq *)service_data[0];
215 service2q = (struct k_msgq *)service_data[1];
216 query_service = false;
217 }
218
219 if (!service1q && !service2q) {
220 break;
221 }
222
223 client_data[0] = GET_SERVICE;
224 client_data[1] = (unsigned long)&client_msgq;
225
226 if (service1q) {
227 k_msgq_put(service1q, client_data, K_NO_WAIT);
228 ret = k_msgq_get(&client_msgq, service_data, TIMEOUT);
229 if (!ret) {
230 zassert_equal(service_data[0],
231 SERVICE1_RUNNING, NULL);
232 } else {
233 /* service might down, query in next loop */
234 query_service = true;
235 }
236 }
237
238 if (service2q) {
239 k_msgq_put(service2q, client_data, K_NO_WAIT);
240 ret = k_msgq_get(&client_msgq, service_data, TIMEOUT);
241 if (!ret) {
242 zassert_equal(service_data[0],
243 SERVICE2_RUNNING, NULL);
244 } else {
245 query_service = true;
246 }
247 }
248 k_msleep(10);
249 }
250 }
251
start_client(void)252 static void start_client(void)
253 {
254
255 int pri = k_thread_priority_get(k_current_get());
256
257 tclient = k_thread_create(&client_thread, client_stack, STACK_SIZE,
258 client_entry, NULL, NULL, NULL, pri,
259 0, K_NO_WAIT);
260 }
261
ZTEST(msgq_usage,test_msgq_usage)262 ZTEST(msgq_usage, test_msgq_usage)
263 {
264 start_service_manager();
265 register_service();
266 start_client();
267 /* waiting to continue */
268 k_sem_take(&test_continue, K_FOREVER);
269
270 /* rather than schedule this thread by k_msleep(), use semaphore with
271 * a timeout value, so there is no give operation over service_sema
272 */
273 TC_PRINT("try to kill service1\n");
274 k_sem_take(&service_sema, Z_TIMEOUT_MS(500));
275 service1_run = false;
276
277 TC_PRINT("try to kill service2\n");
278 k_sem_take(&service_sema, Z_TIMEOUT_MS(500));
279 service2_run = false;
280
281 k_thread_join(tservice1, K_FOREVER);
282 k_thread_join(tservice2, K_FOREVER);
283 k_thread_join(tclient, K_FOREVER);
284 k_thread_abort(tservice_manager);
285 }
286
287 ZTEST_SUITE(msgq_usage, NULL, NULL, NULL, NULL, NULL);
288