1 /*
2 * Copyright (c) 2024 Intel Corporation
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <app_api.h>
8
9 #include <zephyr/kernel.h>
10 #include <zephyr/internal/syscall_handler.h>
11 #include <zephyr/zbus/zbus.h>
12 #include <zephyr/llext/symbol.h>
13
14 #include <string.h>
15
16 #define MAX_SUBSCRIBERS 64
17 #define SUBSCRIBER_THREAD_STACK_SIZE 1024
18 #define SUBSCRIBER_THREAD_PRIORITY K_PRIO_PREEMPT(1)
19
20 ZBUS_CHAN_DEFINE(tick_chan,
21 struct channel_tick_data,
22 NULL,
23 NULL,
24 ZBUS_OBSERVERS(default_sub),
25 ZBUS_MSG_INIT(.l = 0));
26
27 ZBUS_SUBSCRIBER_DEFINE(default_sub, 4);
28
29 K_THREAD_STACK_DEFINE(subscriber_thread_stack, SUBSCRIBER_THREAD_STACK_SIZE);
30 static struct k_thread subscriber_thread;
31
32 static struct subs {
33 struct {
34 k_tid_t thread;
35 struct k_event *evt;
36 } subscribers[MAX_SUBSCRIBERS];
37 struct k_mutex subscribers_mtx;
38 int subscribers_count;
39 const struct zbus_channel *chan;
40 } channel_subscribers[] = {
41 /* Empty one first, so no channel is zero (first item on enum == 1) */
42 {{ }},
43 { .chan = &tick_chan },
44 {{ }}
45 };
46
remove_subscriber(k_tid_t thread,struct subs * sus)47 static int remove_subscriber(k_tid_t thread, struct subs *sus)
48 {
49 int i;
50
51 for (i = 0; i < sus->subscribers_count; i++) {
52 if (sus->subscribers[i].thread == thread) {
53 sus->subscribers[i].thread = NULL;
54 sus->subscribers[i].evt = NULL;
55 break;
56 }
57 }
58
59 if (i == sus->subscribers_count) {
60 return -ENOENT;
61 }
62
63 /* Move all entries after excluded one, to keep things tidy */
64 memmove(&sus->subscribers[i], &sus->subscribers[i + 1],
65 (sus->subscribers_count - i - 1) *
66 sizeof(sus->subscribers[0]));
67 sus->subscribers_count--;
68
69 return 0;
70 }
71
add_subscriber(k_tid_t thread,struct subs * sus,struct k_event * evt)72 static int add_subscriber(k_tid_t thread, struct subs *sus,
73 struct k_event *evt)
74 {
75 if (sus->subscribers_count >= MAX_SUBSCRIBERS) {
76 return -ENOMEM;
77 }
78
79 sus->subscribers[sus->subscribers_count].thread = thread;
80 sus->subscribers[sus->subscribers_count].evt = evt;
81 sus->subscribers_count++;
82
83 printk("[app]Thread [%p] registered event [%p]\n", thread, evt);
84 return 0;
85 }
86
notify_subscribers(enum Channels channel)87 static void notify_subscribers(enum Channels channel)
88 {
89 int i;
90 struct subs *subs = &channel_subscribers[channel];
91
92 for (i = 0; i < subs->subscribers_count; i++) {
93 k_event_post(subs->subscribers[i].evt, channel);
94 }
95 }
96
subscriber_thread_fn(void * p0,void * p1,void * p2)97 static void subscriber_thread_fn(void *p0, void *p1, void *p2)
98 {
99 const struct zbus_channel *chan;
100 int i;
101
102 printk("[app]Subscriber thread [%p] started.\n", k_current_get());
103 while (zbus_sub_wait(&default_sub, &chan, K_FOREVER) == 0) {
104 printk("[app][subscriber_thread]Got channel %s\n",
105 zbus_chan_name(chan));
106
107 for (i = 0; i < CHAN_LAST; i++) {
108 if (channel_subscribers[i].chan == chan) {
109 notify_subscribers((enum Channels)i);
110 break;
111 }
112 }
113 }
114 }
115
start_subscriber_thread(void)116 k_tid_t start_subscriber_thread(void)
117 {
118 return k_thread_create(&subscriber_thread, subscriber_thread_stack,
119 SUBSCRIBER_THREAD_STACK_SIZE,
120 subscriber_thread_fn, NULL, NULL, NULL,
121 SUBSCRIBER_THREAD_PRIORITY, 0, K_NO_WAIT);
122 }
123
z_impl_publish(enum Channels channel,void * data,size_t data_len)124 int z_impl_publish(enum Channels channel, void *data, size_t data_len)
125 {
126 const struct zbus_channel *chan = channel_subscribers[channel].chan;
127
128 if (channel == CHAN_LAST) {
129 return -EINVAL;
130 }
131
132 return zbus_chan_pub(chan, data, K_NO_WAIT);
133 }
134 EXPORT_SYMBOL(z_impl_publish);
135
136 #ifdef CONFIG_USERSPACE
z_vrfy_publish(enum Channels channel,void * data,size_t data_len)137 static inline int z_vrfy_publish(enum Channels channel, void *data,
138 size_t data_len)
139 {
140 int ret;
141 void *copy_data;
142
143 copy_data = k_usermode_alloc_from_copy(data, data_len);
144 if (copy_data == NULL) {
145 return -EINVAL;
146 }
147
148 ret = z_impl_publish(channel, copy_data, data_len);
149
150 k_free(copy_data);
151
152 return ret;
153 }
154 #include <zephyr/syscalls/publish_mrsh.c>
155 #endif
156
z_impl_receive(enum Channels channel,void * data,size_t data_len)157 int z_impl_receive(enum Channels channel, void *data, size_t data_len)
158 {
159 size_t msg_size;
160
161 if (channel == CHAN_LAST || data == NULL) {
162 return -EINVAL;
163 }
164
165 msg_size = channel_subscribers[channel].chan->message_size;
166 if (data_len < msg_size) {
167 return -EINVAL;
168 }
169
170 return zbus_chan_read(channel_subscribers[channel].chan, data,
171 K_NO_WAIT);
172 }
173 EXPORT_SYMBOL(z_impl_receive);
174
175 #ifdef CONFIG_USERSPACE
z_vrfy_receive(enum Channels channel,void * data,size_t data_len)176 static inline int z_vrfy_receive(enum Channels channel, void *data,
177 size_t data_len)
178 {
179 if (K_SYSCALL_MEMORY_WRITE(data, data_len)) {
180 return -EINVAL;
181 }
182
183 return z_impl_receive(channel, data, data_len);
184 }
185 #include <zephyr/syscalls/receive_mrsh.c>
186 #endif
187
z_impl_register_subscriber(enum Channels channel,struct k_event * evt)188 int z_impl_register_subscriber(enum Channels channel, struct k_event *evt)
189 {
190 struct subs *subs = &channel_subscribers[channel];
191 int ret;
192
193 if (channel == CHAN_LAST) {
194 return -EINVAL;
195 }
196
197 k_mutex_lock(&subs->subscribers_mtx, K_FOREVER);
198
199 if (evt == NULL) {
200 ret = remove_subscriber(k_current_get(), subs);
201 } else {
202 ret = add_subscriber(k_current_get(), subs, evt);
203 }
204
205 k_mutex_unlock(&subs->subscribers_mtx);
206
207 return ret;
208 }
209 EXPORT_SYMBOL(z_impl_register_subscriber);
210
211 #ifdef CONFIG_USERSPACE
z_vrfy_register_subscriber(enum Channels channel,struct k_event * evt)212 static inline int z_vrfy_register_subscriber(enum Channels channel,
213 struct k_event *evt)
214 {
215 if (K_SYSCALL_OBJ(evt, K_OBJ_EVENT)) {
216 return -EINVAL;
217 }
218
219 return z_impl_register_subscriber(channel, evt);
220 }
221 #include <zephyr/syscalls/register_subscriber_mrsh.c>
222 #endif
223