1 /*
2 * Copyright (c) 2022 Intel Corporation.
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6
7 #include <zephyr/rtio/rtio.h>
8 #include <zephyr/kernel.h>
9
10 #include "rtio_sched.h"
11
12 #include <zephyr/logging/log.h>
13 LOG_MODULE_REGISTER(rtio_executor, CONFIG_RTIO_LOG_LEVEL);
14
15 /**
16 * @brief Callback which completes an RTIO_AWAIT_OP handled by the executor
17 *
18 * The callback is triggered when the rtio_sqe tied to the RTIO_AWAIT_OP
19 * is signaled by the user.
20 *
21 * @param iodev_sqe Submission to complete
22 * @param userdata Additional data passed along
23 */
rtio_executor_sqe_signaled(struct rtio_iodev_sqe * iodev_sqe,void * userdata)24 static void rtio_executor_sqe_signaled(struct rtio_iodev_sqe *iodev_sqe, void *userdata)
25 {
26 ARG_UNUSED(userdata);
27
28 rtio_iodev_sqe_ok(iodev_sqe, 0);
29 }
30
31 /**
32 * @brief Executor handled submissions
33 */
rtio_executor_op(struct rtio_iodev_sqe * iodev_sqe,int last_result)34 static void rtio_executor_op(struct rtio_iodev_sqe *iodev_sqe, int last_result)
35 {
36 const struct rtio_sqe *sqe = &iodev_sqe->sqe;
37
38 switch (sqe->op) {
39 case RTIO_OP_CALLBACK:
40 sqe->callback.callback(iodev_sqe->r, sqe, last_result, sqe->callback.arg0);
41 rtio_iodev_sqe_ok(iodev_sqe, 0);
42 break;
43 case RTIO_OP_DELAY:
44 rtio_sched_alarm(iodev_sqe, sqe->delay.timeout);
45 break;
46 case RTIO_OP_AWAIT:
47 rtio_iodev_sqe_await_signal(iodev_sqe, rtio_executor_sqe_signaled, NULL);
48 break;
49 default:
50 rtio_iodev_sqe_err(iodev_sqe, -EINVAL);
51 }
52 }
53
54 /**
55 * @brief Submit to an iodev a submission to work on
56 *
57 * Should be called by the executor when it wishes to submit work
58 * to an iodev.
59 *
60 * @param iodev_sqe Submission to work on
61 */
rtio_iodev_submit(struct rtio_iodev_sqe * iodev_sqe,int last_result)62 static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe, int last_result)
63 {
64 if (FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags)) {
65 rtio_iodev_sqe_err(iodev_sqe, -ECANCELED);
66 return;
67 }
68
69 /* No iodev means its an executor specific operation */
70 if (iodev_sqe->sqe.iodev == NULL) {
71 rtio_executor_op(iodev_sqe, last_result);
72 return;
73 }
74
75 iodev_sqe->sqe.iodev->api->submit(iodev_sqe);
76 }
77
78 /**
79 * @brief Submit operations in the queue to iodevs
80 *
81 * @param r RTIO context
82 *
83 * @retval 0 Always succeeds
84 */
rtio_executor_submit(struct rtio * r)85 void rtio_executor_submit(struct rtio *r)
86 {
87 const uint16_t cancel_no_response = (RTIO_SQE_CANCELED | RTIO_SQE_NO_RESPONSE);
88 struct mpsc_node *node = mpsc_pop(&r->sq);
89
90 while (node != NULL) {
91 struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
92
93 /* If this submission was cancelled before submit, then generate no response */
94 if (iodev_sqe->sqe.flags & RTIO_SQE_CANCELED) {
95 iodev_sqe->sqe.flags |= cancel_no_response;
96 }
97 iodev_sqe->r = r;
98
99 struct rtio_iodev_sqe *curr = iodev_sqe, *next;
100
101 /* Link up transaction or queue list if needed */
102 while (curr->sqe.flags & (RTIO_SQE_TRANSACTION | RTIO_SQE_CHAINED)) {
103 #ifdef CONFIG_ASSERT
104 bool transaction = iodev_sqe->sqe.flags & RTIO_SQE_TRANSACTION;
105 bool chained = iodev_sqe->sqe.flags & RTIO_SQE_CHAINED;
106 bool multishot = iodev_sqe->sqe.flags & RTIO_SQE_MULTISHOT;
107
108 __ASSERT((transaction ^ chained ^ multishot) &&
109 !(transaction && chained && multishot),
110 "Cannot have more than one of these flags"
111 " enabled: transaction, chained or multishot");
112 #endif
113 node = mpsc_pop(&iodev_sqe->r->sq);
114
115 __ASSERT(node != NULL,
116 "Expected a valid submission in the queue while in a transaction or chain");
117
118 next = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
119
120 /* If the current submission was cancelled before submit,
121 * then cancel the next one and generate no response
122 */
123 if (curr->sqe.flags & RTIO_SQE_CANCELED) {
124 next->sqe.flags |= cancel_no_response;
125 }
126 curr->next = next;
127 curr = next;
128 curr->r = r;
129
130 __ASSERT(
131 curr != NULL,
132 "Expected a valid sqe following transaction or chain flag");
133 }
134
135 curr->next = NULL;
136 curr->r = r;
137
138 rtio_iodev_submit(iodev_sqe, 0);
139
140 node = mpsc_pop(&r->sq);
141 }
142 }
143
144 /**
145 * @brief Handle common logic when :c:macro:`RTIO_SQE_MULTISHOT` is set
146 *
147 * @param[in] iodev_sqe IODEV SQE that's being marked as finished.
148 * @param[in] result The result of the latest request iteration
149 * @param[in] is_ok Whether or not the SQE's result was successful
150 */
rtio_executor_handle_multishot(struct rtio_iodev_sqe * iodev_sqe,int result,bool is_ok)151 static inline void rtio_executor_handle_multishot(struct rtio_iodev_sqe *iodev_sqe,
152 int result, bool is_ok)
153 {
154 struct rtio *r = iodev_sqe->r;
155 const bool is_canceled = FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags) == 1;
156 const bool uses_mempool = FIELD_GET(RTIO_SQE_MEMPOOL_BUFFER, iodev_sqe->sqe.flags) == 1;
157 const bool requires_response = FIELD_GET(RTIO_SQE_NO_RESPONSE, iodev_sqe->sqe.flags) == 0;
158 uint32_t cqe_flags = rtio_cqe_compute_flags(iodev_sqe);
159 void *userdata = iodev_sqe->sqe.userdata;
160
161 /** We're releasing reasources when erroring as an error handling scheme of multi-shot
162 * submissions by requiring to stop re-submitting if something goes wrong. Let the
163 * application decide what's best for handling the corresponding error: whether
164 * re-submitting, rebooting or anything else.
165 */
166 if (is_canceled || !is_ok) {
167 LOG_DBG("Releasing memory @%p size=%u", (void *)iodev_sqe->sqe.rx.buf,
168 iodev_sqe->sqe.rx.buf_len);
169 rtio_release_buffer(r, iodev_sqe->sqe.rx.buf, iodev_sqe->sqe.rx.buf_len);
170 rtio_sqe_pool_free(r->sqe_pool, iodev_sqe);
171 } else {
172 /* Request was not canceled, put the SQE back in the queue */
173 if (iodev_sqe->sqe.op == RTIO_OP_RX && uses_mempool) {
174 /* Reset the buffer info so the next request can get a new one */
175 iodev_sqe->sqe.rx.buf = NULL;
176 iodev_sqe->sqe.rx.buf_len = 0;
177 }
178
179 mpsc_push(&r->sq, &iodev_sqe->q);
180 rtio_executor_submit(r);
181 }
182
183 if (requires_response) {
184 rtio_cqe_submit(r, result, userdata, cqe_flags);
185 }
186 }
187
188 /**
189 * @brief Handle common logic one-shot items
190 *
191 * @param[in] iodev_sqe IODEV SQE that's being marked as finished.
192 * @param[in] result The result of the latest request iteration
193 * @param[in] is_ok Whether or not the SQE's result was successful
194 */
rtio_executor_handle_oneshot(struct rtio_iodev_sqe * iodev_sqe,int last_result,bool is_ok)195 static inline void rtio_executor_handle_oneshot(struct rtio_iodev_sqe *iodev_sqe,
196 int last_result, bool is_ok)
197 {
198 const bool is_canceled = FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags) == 1;
199 struct rtio_iodev_sqe *curr = iodev_sqe;
200 struct rtio *r = iodev_sqe->r;
201 uint32_t sqe_flags;
202 int result = last_result;
203
204 /** Single-shot items may be linked as transactions or be chained together.
205 * Untangle the set of SQEs and act accordingly on each one.
206 */
207 do {
208 void *userdata = curr->sqe.userdata;
209 uint32_t cqe_flags = rtio_cqe_compute_flags(iodev_sqe);
210 struct rtio_iodev_sqe *next = rtio_iodev_sqe_next(curr);
211
212 sqe_flags = curr->sqe.flags;
213
214 if (!is_canceled && FIELD_GET(RTIO_SQE_NO_RESPONSE, sqe_flags) == 0) {
215 /* Generate a result back to the client if need be.*/
216 rtio_cqe_submit(r, result, userdata, cqe_flags);
217 }
218
219 rtio_sqe_pool_free(r->sqe_pool, curr);
220 curr = next;
221
222 if (!is_ok) {
223 /* This is an error path, so cancel any chained SQEs */
224 result = -ECANCELED;
225 }
226 } while (FIELD_GET(RTIO_SQE_TRANSACTION, sqe_flags) == 1);
227
228 /* curr should now be the last sqe in the transaction if that is what completed */
229 if (FIELD_GET(RTIO_SQE_CHAINED, sqe_flags) == 1) {
230 rtio_iodev_submit(curr, last_result);
231 }
232 }
233
rtio_executor_done(struct rtio_iodev_sqe * iodev_sqe,int result,bool is_ok)234 static inline void rtio_executor_done(struct rtio_iodev_sqe *iodev_sqe, int result, bool is_ok)
235 {
236 const bool is_multishot = FIELD_GET(RTIO_SQE_MULTISHOT, iodev_sqe->sqe.flags) == 1;
237
238 if (is_multishot) {
239 rtio_executor_handle_multishot(iodev_sqe, result, is_ok);
240 } else {
241 rtio_executor_handle_oneshot(iodev_sqe, result, is_ok);
242 }
243 }
244
245 /**
246 * @brief Callback from an iodev describing success
247 */
rtio_executor_ok(struct rtio_iodev_sqe * iodev_sqe,int result)248 void rtio_executor_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
249 {
250 rtio_executor_done(iodev_sqe, result, true);
251 }
252
253 /**
254 * @brief Callback from an iodev describing error
255 *
256 * Some assumptions are made and should have been validated on rtio_submit
257 * - a sqe marked as chained or transaction has a next sqe
258 * - a sqe is marked either chained or transaction but not both
259 */
rtio_executor_err(struct rtio_iodev_sqe * iodev_sqe,int result)260 void rtio_executor_err(struct rtio_iodev_sqe *iodev_sqe, int result)
261 {
262 rtio_executor_done(iodev_sqe, result, false);
263 }
264