1 /*
2  * Copyright (c) 2023 Rodrigo Peixoto <rodrigopex@gmail.com>
3  * SPDX-License-Identifier: Apache-2.0
4  */
5 
6 #include "messages.h"
7 
8 #include <zephyr/kernel.h>
9 #include <zephyr/logging/log.h>
10 #include <zephyr/zbus/zbus.h>
11 #include <zephyr/ztest.h>
12 LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
13 
14 #define STACK_SIZE (CONFIG_MAIN_STACK_SIZE + CONFIG_TEST_EXTRA_STACK_SIZE)
15 
16 static struct k_thread pub_thread;
17 static K_THREAD_STACK_DEFINE(pub_thread_sz, STACK_SIZE);
18 static struct k_thread s1_thread;
19 static K_THREAD_STACK_DEFINE(s1_thread_sz, STACK_SIZE);
20 static struct k_thread ms1_thread;
21 static K_THREAD_STACK_DEFINE(ms1_thread_sz, STACK_SIZE);
22 
23 struct msg_testing_01 {
24 	int seq;
25 	bool must_detach;
26 };
27 
28 ZBUS_CHAN_DEFINE(chan_testing_01,       /* Name */
29 		 struct msg_testing_01, /* Message type */
30 
31 		 NULL,                              /* Validator */
32 		 NULL,                              /* User data */
33 		 ZBUS_OBSERVERS(lis1, sub1, msub1), /* observers */
34 		 ZBUS_MSG_INIT(0)                   /* Initial value major 0, minor 1, build 1023 */
35 );
36 
consumer_sub_thread(void * ptr1,void * ptr2,void * ptr3)37 static void consumer_sub_thread(void *ptr1, void *ptr2, void *ptr3)
38 {
39 	ARG_UNUSED(ptr3);
40 
41 	zbus_obs_attach_to_thread(ptr1);
42 
43 	char *name = ptr2;
44 	const struct zbus_observer *sub = ptr1;
45 	const struct zbus_channel *chan;
46 	struct msg_testing_01 msg;
47 
48 	while (1) {
49 		if (zbus_sub_wait(sub, &chan, K_FOREVER) != 0) {
50 			k_oops();
51 		}
52 		zbus_chan_read(chan, &msg, K_FOREVER);
53 
54 		printk("%s level: %d\n", name, msg.seq);
55 
56 		if (msg.must_detach) {
57 			zbus_obs_detach_from_thread(sub);
58 		}
59 	}
60 }
61 
62 ZBUS_SUBSCRIBER_DEFINE(sub1, 4);
63 
consumer_msg_sub_thread(void * ptr1,void * ptr2,void * ptr3)64 static void consumer_msg_sub_thread(void *ptr1, void *ptr2, void *ptr3)
65 {
66 	ARG_UNUSED(ptr3);
67 
68 	zbus_obs_attach_to_thread(ptr1);
69 
70 	char *name = ptr2;
71 	const struct zbus_observer *msub = ptr1;
72 	const struct zbus_channel *chan;
73 	struct msg_testing_01 msg;
74 
75 	while (1) {
76 		if (zbus_sub_wait_msg(msub, &chan, &msg, K_FOREVER) != 0) {
77 			k_oops();
78 		}
79 		printk("%s level: %d\n", name, msg.seq);
80 
81 		if (msg.must_detach) {
82 			zbus_obs_detach_from_thread(msub);
83 		}
84 	}
85 }
86 
87 ZBUS_MSG_SUBSCRIBER_DEFINE(msub1);
88 
89 static K_SEM_DEFINE(sync_sem, 1, 1);
90 static K_SEM_DEFINE(done_sem, 0, 1);
91 
92 static struct msg_testing_01 msg = {.seq = 0};
93 
publisher_thread(void * ptr1,void * ptr2,void * ptr3)94 static void publisher_thread(void *ptr1, void *ptr2, void *ptr3)
95 {
96 	ARG_UNUSED(ptr1);
97 	ARG_UNUSED(ptr2);
98 	ARG_UNUSED(ptr3);
99 
100 	while (1) {
101 		k_sem_take(&sync_sem, K_FOREVER);
102 		zbus_chan_pub(&chan_testing_01, &msg, K_FOREVER);
103 		k_msleep(100);
104 		k_sem_give(&done_sem);
105 	}
106 }
107 
_pub_and_sync(void)108 static inline void _pub_and_sync(void)
109 {
110 	k_sem_give(&sync_sem);
111 	k_sem_take(&done_sem, K_FOREVER);
112 }
113 
114 static k_tid_t pub_thread_id;
115 
116 static int prio;
117 
listener_callback(const struct zbus_channel * chan)118 static void listener_callback(const struct zbus_channel *chan)
119 {
120 	prio = k_thread_priority_get(pub_thread_id);
121 }
122 
123 ZBUS_LISTENER_DEFINE(lis1, listener_callback);
124 
ZTEST(hlp_priority_boost,test_priority_elevation)125 ZTEST(hlp_priority_boost, test_priority_elevation)
126 {
127 	pub_thread_id = k_thread_create(&pub_thread, pub_thread_sz, STACK_SIZE, publisher_thread,
128 					NULL, NULL, NULL, K_PRIO_PREEMPT(8), 0, K_NO_WAIT);
129 	(void)k_thread_create(&s1_thread, s1_thread_sz, STACK_SIZE, consumer_sub_thread,
130 			      (void *)&sub1, "sub1", NULL, K_PRIO_PREEMPT(3), 0, K_NO_WAIT);
131 	(void)k_thread_create(&ms1_thread, ms1_thread_sz, STACK_SIZE, consumer_msg_sub_thread,
132 			      (void *)&msub1, "msub1", NULL, K_PRIO_PREEMPT(2), 0, K_NO_WAIT);
133 
134 	_pub_and_sync();
135 	zassert_true(prio == 1, "The priority must be 1, but it is %d", prio);
136 
137 	++msg.seq;
138 
139 	zbus_obs_set_enable(&msub1, false);
140 	_pub_and_sync();
141 	zassert_true(prio == 2, "The priority must be 2, but it is %d", prio);
142 	zbus_obs_set_enable(&msub1, true);
143 
144 	++msg.seq;
145 
146 	_pub_and_sync();
147 	zassert_true(prio == 1, "The priority must be 1, but it is %d", prio);
148 
149 	++msg.seq;
150 
151 	zbus_obs_set_chan_notification_mask(&msub1, &chan_testing_01, true);
152 	bool is_masked;
153 
154 	zbus_obs_is_chan_notification_masked(&msub1, &chan_testing_01, &is_masked);
155 	zassert_true(is_masked, NULL);
156 	_pub_and_sync();
157 	zassert_true(prio == 2, "The priority must be 2, but it is %d", prio);
158 	zbus_obs_set_chan_notification_mask(&msub1, &chan_testing_01, false);
159 
160 	++msg.seq;
161 
162 	zbus_obs_set_enable(&msub1, false);
163 	zbus_obs_set_enable(&sub1, false);
164 	_pub_and_sync();
165 	zassert_true(prio == 8, "The priority must be 8, but it is %d", prio);
166 	zbus_obs_set_chan_notification_mask(&msub1, &chan_testing_01, false);
167 	zbus_obs_set_enable(&msub1, true);
168 	zbus_obs_set_enable(&sub1, true);
169 
170 	++msg.seq;
171 	msg.must_detach = true;
172 	_pub_and_sync();
173 	zassert_true(prio == 1, "The priority must be 1, but it is %d", prio);
174 	++msg.seq;
175 	/* Checking if the detach command took effect on both observers */
176 	_pub_and_sync();
177 	zassert_true(prio == 8, "The priority must be 8, but it is %d", prio);
178 }
179 
180 ZTEST_SUITE(hlp_priority_boost, NULL, NULL, NULL, NULL, NULL);
181