1 /*
2  * Copyright (c) 2022 Intel Corporation.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/rtio/rtio.h>
8 #include <zephyr/kernel.h>
9 
10 #include "rtio_sched.h"
11 
12 #include <zephyr/logging/log.h>
13 LOG_MODULE_REGISTER(rtio_executor, CONFIG_RTIO_LOG_LEVEL);
14 
15 /**
16  * @brief Callback which completes an RTIO_AWAIT_OP handled by the executor
17  *
18  * The callback is triggered when the rtio_sqe tied to the RTIO_AWAIT_OP
19  * is signaled by the user.
20  *
21  * @param iodev_sqe Submission to complete
22  * @param userdata Additional data passed along
23  */
rtio_executor_sqe_signaled(struct rtio_iodev_sqe * iodev_sqe,void * userdata)24 static void rtio_executor_sqe_signaled(struct rtio_iodev_sqe *iodev_sqe, void *userdata)
25 {
26 	ARG_UNUSED(userdata);
27 
28 	rtio_iodev_sqe_ok(iodev_sqe, 0);
29 }
30 
31 /**
32  * @brief Executor handled submissions
33  */
rtio_executor_op(struct rtio_iodev_sqe * iodev_sqe,int last_result)34 static void rtio_executor_op(struct rtio_iodev_sqe *iodev_sqe, int last_result)
35 {
36 	const struct rtio_sqe *sqe = &iodev_sqe->sqe;
37 
38 	switch (sqe->op) {
39 	case RTIO_OP_CALLBACK:
40 		sqe->callback.callback(iodev_sqe->r, sqe, last_result, sqe->callback.arg0);
41 		rtio_iodev_sqe_ok(iodev_sqe, 0);
42 		break;
43 	case RTIO_OP_DELAY:
44 		rtio_sched_alarm(iodev_sqe, sqe->delay.timeout);
45 		break;
46 	case RTIO_OP_AWAIT:
47 		rtio_iodev_sqe_await_signal(iodev_sqe, rtio_executor_sqe_signaled, NULL);
48 		break;
49 	default:
50 		rtio_iodev_sqe_err(iodev_sqe, -EINVAL);
51 	}
52 }
53 
54 /**
55  * @brief Submit to an iodev a submission to work on
56  *
57  * Should be called by the executor when it wishes to submit work
58  * to an iodev.
59  *
60  * @param iodev_sqe Submission to work on
61  */
rtio_iodev_submit(struct rtio_iodev_sqe * iodev_sqe,int last_result)62 static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe, int last_result)
63 {
64 	if (FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags)) {
65 		rtio_iodev_sqe_err(iodev_sqe, -ECANCELED);
66 		return;
67 	}
68 
69 	/* No iodev means its an executor specific operation */
70 	if (iodev_sqe->sqe.iodev == NULL) {
71 		rtio_executor_op(iodev_sqe, last_result);
72 		return;
73 	}
74 
75 	iodev_sqe->sqe.iodev->api->submit(iodev_sqe);
76 }
77 
78 /**
79  * @brief Submit operations in the queue to iodevs
80  *
81  * @param r RTIO context
82  *
83  * @retval 0 Always succeeds
84  */
rtio_executor_submit(struct rtio * r)85 void rtio_executor_submit(struct rtio *r)
86 {
87 	const uint16_t cancel_no_response = (RTIO_SQE_CANCELED | RTIO_SQE_NO_RESPONSE);
88 	struct mpsc_node *node = mpsc_pop(&r->sq);
89 
90 	while (node != NULL) {
91 		struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
92 
93 		/* If this submission was cancelled before submit, then generate no response */
94 		if (iodev_sqe->sqe.flags  & RTIO_SQE_CANCELED) {
95 			iodev_sqe->sqe.flags |= cancel_no_response;
96 		}
97 		iodev_sqe->r = r;
98 
99 		struct rtio_iodev_sqe *curr = iodev_sqe, *next;
100 
101 		/* Link up transaction or queue list if needed */
102 		while (curr->sqe.flags & (RTIO_SQE_TRANSACTION | RTIO_SQE_CHAINED)) {
103 #ifdef CONFIG_ASSERT
104 			bool transaction = iodev_sqe->sqe.flags & RTIO_SQE_TRANSACTION;
105 			bool chained = iodev_sqe->sqe.flags & RTIO_SQE_CHAINED;
106 			bool multishot = iodev_sqe->sqe.flags & RTIO_SQE_MULTISHOT;
107 
108 			__ASSERT((transaction ^ chained ^ multishot) &&
109 				 !(transaction && chained && multishot),
110 				 "Cannot have more than one of these flags"
111 				 " enabled: transaction, chained or multishot");
112 #endif
113 			node = mpsc_pop(&iodev_sqe->r->sq);
114 
115 			__ASSERT(node != NULL,
116 				    "Expected a valid submission in the queue while in a transaction or chain");
117 
118 			next = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
119 
120 			/* If the current submission was cancelled before submit,
121 			 * then cancel the next one and generate no response
122 			 */
123 			if (curr->sqe.flags  & RTIO_SQE_CANCELED) {
124 				next->sqe.flags |= cancel_no_response;
125 			}
126 			curr->next = next;
127 			curr = next;
128 			curr->r = r;
129 
130 			__ASSERT(
131 				curr != NULL,
132 				"Expected a valid sqe following transaction or chain flag");
133 		}
134 
135 		curr->next = NULL;
136 		curr->r = r;
137 
138 		rtio_iodev_submit(iodev_sqe, 0);
139 
140 		node = mpsc_pop(&r->sq);
141 	}
142 }
143 
144 /**
145  * @brief Handle common logic when :c:macro:`RTIO_SQE_MULTISHOT` is set
146  *
147  * @param[in] iodev_sqe IODEV SQE that's being marked as finished.
148  * @param[in] result The result of the latest request iteration
149  * @param[in] is_ok Whether or not the SQE's result was successful
150  */
rtio_executor_handle_multishot(struct rtio_iodev_sqe * iodev_sqe,int result,bool is_ok)151 static inline void rtio_executor_handle_multishot(struct rtio_iodev_sqe *iodev_sqe,
152 						  int result, bool is_ok)
153 {
154 	struct rtio *r = iodev_sqe->r;
155 	const bool is_canceled = FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags) == 1;
156 	const bool uses_mempool = FIELD_GET(RTIO_SQE_MEMPOOL_BUFFER, iodev_sqe->sqe.flags) == 1;
157 	const bool requires_response = FIELD_GET(RTIO_SQE_NO_RESPONSE, iodev_sqe->sqe.flags) == 0;
158 	uint32_t cqe_flags = rtio_cqe_compute_flags(iodev_sqe);
159 	void *userdata = iodev_sqe->sqe.userdata;
160 
161 	/** We're releasing reasources when erroring as an error handling scheme of multi-shot
162 	 * submissions by requiring to stop re-submitting if something goes wrong. Let the
163 	 * application decide what's best for handling the corresponding error: whether
164 	 * re-submitting, rebooting or anything else.
165 	 */
166 	if (is_canceled || !is_ok) {
167 		LOG_DBG("Releasing memory @%p size=%u", (void *)iodev_sqe->sqe.rx.buf,
168 			iodev_sqe->sqe.rx.buf_len);
169 		rtio_release_buffer(r, iodev_sqe->sqe.rx.buf, iodev_sqe->sqe.rx.buf_len);
170 		rtio_sqe_pool_free(r->sqe_pool, iodev_sqe);
171 	} else {
172 		/* Request was not canceled, put the SQE back in the queue */
173 		if (iodev_sqe->sqe.op == RTIO_OP_RX && uses_mempool) {
174 			/* Reset the buffer info so the next request can get a new one */
175 			iodev_sqe->sqe.rx.buf = NULL;
176 			iodev_sqe->sqe.rx.buf_len = 0;
177 		}
178 
179 		mpsc_push(&r->sq, &iodev_sqe->q);
180 		rtio_executor_submit(r);
181 	}
182 
183 	if (requires_response) {
184 		rtio_cqe_submit(r, result, userdata, cqe_flags);
185 	}
186 }
187 
188 /**
189  * @brief Handle common logic one-shot items
190  *
191  * @param[in] iodev_sqe IODEV SQE that's being marked as finished.
192  * @param[in] result The result of the latest request iteration
193  * @param[in] is_ok Whether or not the SQE's result was successful
194  */
rtio_executor_handle_oneshot(struct rtio_iodev_sqe * iodev_sqe,int last_result,bool is_ok)195 static inline void rtio_executor_handle_oneshot(struct rtio_iodev_sqe *iodev_sqe,
196 						int last_result, bool is_ok)
197 {
198 	const bool is_canceled = FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags) == 1;
199 	struct rtio_iodev_sqe *curr = iodev_sqe;
200 	struct rtio *r = iodev_sqe->r;
201 	uint32_t sqe_flags;
202 	int result = last_result;
203 
204 	/** Single-shot items may be linked as transactions or be chained together.
205 	 * Untangle the set of SQEs and act accordingly on each one.
206 	 */
207 	do {
208 		void *userdata = curr->sqe.userdata;
209 		uint32_t cqe_flags = rtio_cqe_compute_flags(iodev_sqe);
210 		struct rtio_iodev_sqe *next = rtio_iodev_sqe_next(curr);
211 
212 		sqe_flags = curr->sqe.flags;
213 
214 		if (!is_canceled && FIELD_GET(RTIO_SQE_NO_RESPONSE, sqe_flags) == 0) {
215 			/* Generate a result back to the client if need be.*/
216 			rtio_cqe_submit(r, result, userdata, cqe_flags);
217 		}
218 
219 		rtio_sqe_pool_free(r->sqe_pool, curr);
220 		curr = next;
221 
222 		if (!is_ok) {
223 			/* This is an error path, so cancel any chained SQEs */
224 			result = -ECANCELED;
225 		}
226 	} while (FIELD_GET(RTIO_SQE_TRANSACTION, sqe_flags) == 1);
227 
228 	/* curr should now be the last sqe in the transaction if that is what completed */
229 	if (FIELD_GET(RTIO_SQE_CHAINED, sqe_flags) == 1) {
230 		rtio_iodev_submit(curr, last_result);
231 	}
232 }
233 
rtio_executor_done(struct rtio_iodev_sqe * iodev_sqe,int result,bool is_ok)234 static inline void rtio_executor_done(struct rtio_iodev_sqe *iodev_sqe, int result, bool is_ok)
235 {
236 	const bool is_multishot = FIELD_GET(RTIO_SQE_MULTISHOT, iodev_sqe->sqe.flags) == 1;
237 
238 	if (is_multishot) {
239 		rtio_executor_handle_multishot(iodev_sqe, result, is_ok);
240 	} else {
241 		rtio_executor_handle_oneshot(iodev_sqe, result, is_ok);
242 	}
243 }
244 
245 /**
246  * @brief Callback from an iodev describing success
247  */
rtio_executor_ok(struct rtio_iodev_sqe * iodev_sqe,int result)248 void rtio_executor_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
249 {
250 	rtio_executor_done(iodev_sqe, result, true);
251 }
252 
253 /**
254  * @brief Callback from an iodev describing error
255  *
256  * Some assumptions are made and should have been validated on rtio_submit
257  * - a sqe marked as chained or transaction has a next sqe
258  * - a sqe is marked either chained or transaction but not both
259  */
rtio_executor_err(struct rtio_iodev_sqe * iodev_sqe,int result)260 void rtio_executor_err(struct rtio_iodev_sqe *iodev_sqe, int result)
261 {
262 	rtio_executor_done(iodev_sqe, result, false);
263 }
264