1 /*
2 * Copyright (c) 2023 Rodrigo Peixoto <rodrigopex@gmail.com>
3 * SPDX-License-Identifier: Apache-2.0
4 */
5
6 #include "messages.h"
7
8 #include <zephyr/kernel.h>
9 #include <zephyr/logging/log.h>
10 #include <zephyr/zbus/zbus.h>
11 #include <zephyr/ztest.h>
12 LOG_MODULE_DECLARE(zbus, CONFIG_ZBUS_LOG_LEVEL);
13
14 #define STACK_SIZE (CONFIG_MAIN_STACK_SIZE + CONFIG_TEST_EXTRA_STACK_SIZE)
15
16 static struct k_thread pub_thread;
17 static K_THREAD_STACK_DEFINE(pub_thread_sz, STACK_SIZE);
18 static struct k_thread s1_thread;
19 static K_THREAD_STACK_DEFINE(s1_thread_sz, STACK_SIZE);
20 static struct k_thread ms1_thread;
21 static K_THREAD_STACK_DEFINE(ms1_thread_sz, STACK_SIZE);
22
23 struct msg_testing_01 {
24 int seq;
25 bool must_detach;
26 };
27
28 ZBUS_CHAN_DEFINE(chan_testing_01, /* Name */
29 struct msg_testing_01, /* Message type */
30
31 NULL, /* Validator */
32 NULL, /* User data */
33 ZBUS_OBSERVERS(lis1, sub1, msub1), /* observers */
34 ZBUS_MSG_INIT(0) /* Initial value major 0, minor 1, build 1023 */
35 );
36
consumer_sub_thread(void * ptr1,void * ptr2,void * ptr3)37 static void consumer_sub_thread(void *ptr1, void *ptr2, void *ptr3)
38 {
39 ARG_UNUSED(ptr3);
40
41 zbus_obs_attach_to_thread(ptr1);
42
43 char *name = ptr2;
44 const struct zbus_observer *sub = ptr1;
45 const struct zbus_channel *chan;
46 struct msg_testing_01 msg;
47
48 while (1) {
49 if (zbus_sub_wait(sub, &chan, K_FOREVER) != 0) {
50 k_oops();
51 }
52 zbus_chan_read(chan, &msg, K_FOREVER);
53
54 printk("%s level: %d\n", name, msg.seq);
55
56 if (msg.must_detach) {
57 zbus_obs_detach_from_thread(sub);
58 }
59 }
60 }
61
62 ZBUS_SUBSCRIBER_DEFINE(sub1, 4);
63
consumer_msg_sub_thread(void * ptr1,void * ptr2,void * ptr3)64 static void consumer_msg_sub_thread(void *ptr1, void *ptr2, void *ptr3)
65 {
66 ARG_UNUSED(ptr3);
67
68 zbus_obs_attach_to_thread(ptr1);
69
70 char *name = ptr2;
71 const struct zbus_observer *msub = ptr1;
72 const struct zbus_channel *chan;
73 struct msg_testing_01 msg;
74
75 while (1) {
76 if (zbus_sub_wait_msg(msub, &chan, &msg, K_FOREVER) != 0) {
77 k_oops();
78 }
79 printk("%s level: %d\n", name, msg.seq);
80
81 if (msg.must_detach) {
82 zbus_obs_detach_from_thread(msub);
83 }
84 }
85 }
86
87 ZBUS_MSG_SUBSCRIBER_DEFINE(msub1);
88
89 static K_SEM_DEFINE(sync_sem, 1, 1);
90 static K_SEM_DEFINE(done_sem, 0, 1);
91
92 static struct msg_testing_01 msg = {.seq = 0};
93
publisher_thread(void * ptr1,void * ptr2,void * ptr3)94 static void publisher_thread(void *ptr1, void *ptr2, void *ptr3)
95 {
96 ARG_UNUSED(ptr1);
97 ARG_UNUSED(ptr2);
98 ARG_UNUSED(ptr3);
99
100 while (1) {
101 k_sem_take(&sync_sem, K_FOREVER);
102 zbus_chan_pub(&chan_testing_01, &msg, K_FOREVER);
103 k_msleep(100);
104 k_sem_give(&done_sem);
105 }
106 }
107
_pub_and_sync(void)108 static inline void _pub_and_sync(void)
109 {
110 k_sem_give(&sync_sem);
111 k_sem_take(&done_sem, K_FOREVER);
112 }
113
114 static k_tid_t pub_thread_id;
115
116 static int prio;
117
listener_callback(const struct zbus_channel * chan)118 static void listener_callback(const struct zbus_channel *chan)
119 {
120 prio = k_thread_priority_get(pub_thread_id);
121 }
122
123 ZBUS_LISTENER_DEFINE(lis1, listener_callback);
124
ZTEST(hlp_priority_boost,test_priority_elevation)125 ZTEST(hlp_priority_boost, test_priority_elevation)
126 {
127 pub_thread_id = k_thread_create(&pub_thread, pub_thread_sz, STACK_SIZE, publisher_thread,
128 NULL, NULL, NULL, K_PRIO_PREEMPT(8), 0, K_NO_WAIT);
129 (void)k_thread_create(&s1_thread, s1_thread_sz, STACK_SIZE, consumer_sub_thread,
130 (void *)&sub1, "sub1", NULL, K_PRIO_PREEMPT(3), 0, K_NO_WAIT);
131 (void)k_thread_create(&ms1_thread, ms1_thread_sz, STACK_SIZE, consumer_msg_sub_thread,
132 (void *)&msub1, "msub1", NULL, K_PRIO_PREEMPT(2), 0, K_NO_WAIT);
133
134 _pub_and_sync();
135 zassert_true(prio == 1, "The priority must be 1, but it is %d", prio);
136
137 ++msg.seq;
138
139 zbus_obs_set_enable(&msub1, false);
140 _pub_and_sync();
141 zassert_true(prio == 2, "The priority must be 2, but it is %d", prio);
142 zbus_obs_set_enable(&msub1, true);
143
144 ++msg.seq;
145
146 _pub_and_sync();
147 zassert_true(prio == 1, "The priority must be 1, but it is %d", prio);
148
149 ++msg.seq;
150
151 zbus_obs_set_chan_notification_mask(&msub1, &chan_testing_01, true);
152 bool is_masked;
153
154 zbus_obs_is_chan_notification_masked(&msub1, &chan_testing_01, &is_masked);
155 zassert_true(is_masked, NULL);
156 _pub_and_sync();
157 zassert_true(prio == 2, "The priority must be 2, but it is %d", prio);
158 zbus_obs_set_chan_notification_mask(&msub1, &chan_testing_01, false);
159
160 ++msg.seq;
161
162 zbus_obs_set_enable(&msub1, false);
163 zbus_obs_set_enable(&sub1, false);
164 _pub_and_sync();
165 zassert_true(prio == 8, "The priority must be 8, but it is %d", prio);
166 zbus_obs_set_chan_notification_mask(&msub1, &chan_testing_01, false);
167 zbus_obs_set_enable(&msub1, true);
168 zbus_obs_set_enable(&sub1, true);
169
170 ++msg.seq;
171 msg.must_detach = true;
172 _pub_and_sync();
173 zassert_true(prio == 1, "The priority must be 1, but it is %d", prio);
174 ++msg.seq;
175 /* Checking if the detach command took effect on both observers */
176 _pub_and_sync();
177 zassert_true(prio == 8, "The priority must be 8, but it is %d", prio);
178 }
179
180 ZTEST_SUITE(hlp_priority_boost, NULL, NULL, NULL, NULL, NULL);
181