1 /*
2  * Copyright (c) 2022 Intel Corporation.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/rtio/rtio.h>
8 #include <zephyr/kernel.h>
9 
10 #include <zephyr/logging/log.h>
11 LOG_MODULE_REGISTER(rtio_executor, CONFIG_RTIO_LOG_LEVEL);
12 
13 /**
14  * @brief Executor handled submissions
15  */
rtio_executor_op(struct rtio_iodev_sqe * iodev_sqe)16 static void rtio_executor_op(struct rtio_iodev_sqe *iodev_sqe)
17 {
18 	const struct rtio_sqe *sqe = &iodev_sqe->sqe;
19 
20 	switch (sqe->op) {
21 	case RTIO_OP_CALLBACK:
22 		sqe->callback.callback(iodev_sqe->r, sqe, sqe->callback.arg0);
23 		rtio_iodev_sqe_ok(iodev_sqe, 0);
24 		break;
25 	default:
26 		rtio_iodev_sqe_err(iodev_sqe, -EINVAL);
27 	}
28 }
29 
30 /**
31  * @brief Submit to an iodev a submission to work on
32  *
33  * Should be called by the executor when it wishes to submit work
34  * to an iodev.
35  *
36  * @param iodev_sqe Submission to work on
37  */
rtio_iodev_submit(struct rtio_iodev_sqe * iodev_sqe)38 static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe)
39 {
40 	if (FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags)) {
41 		rtio_iodev_sqe_err(iodev_sqe, -ECANCELED);
42 		return;
43 	}
44 
45 	/* No iodev means its an executor specific operation */
46 	if (iodev_sqe->sqe.iodev == NULL) {
47 		rtio_executor_op(iodev_sqe);
48 		return;
49 	}
50 
51 	iodev_sqe->sqe.iodev->api->submit(iodev_sqe);
52 }
53 
54 /**
55  * @brief Submit operations in the queue to iodevs
56  *
57  * @param r RTIO context
58  *
59  * @retval 0 Always succeeds
60  */
rtio_executor_submit(struct rtio * r)61 void rtio_executor_submit(struct rtio *r)
62 {
63 	const uint16_t cancel_no_response = (RTIO_SQE_CANCELED | RTIO_SQE_NO_RESPONSE);
64 	struct mpsc_node *node = mpsc_pop(&r->sq);
65 
66 	while (node != NULL) {
67 		struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
68 
69 		/* If this submission was cancelled before submit, then generate no response */
70 		if (iodev_sqe->sqe.flags  & RTIO_SQE_CANCELED) {
71 			iodev_sqe->sqe.flags |= cancel_no_response;
72 		}
73 		iodev_sqe->r = r;
74 
75 		struct rtio_iodev_sqe *curr = iodev_sqe, *next;
76 
77 		/* Link up transaction or queue list if needed */
78 		while (curr->sqe.flags & (RTIO_SQE_TRANSACTION | RTIO_SQE_CHAINED)) {
79 #ifdef CONFIG_ASSERT
80 			bool transaction = iodev_sqe->sqe.flags & RTIO_SQE_TRANSACTION;
81 			bool chained = iodev_sqe->sqe.flags & RTIO_SQE_CHAINED;
82 
83 			__ASSERT(transaction != chained,
84 				    "Expected chained or transaction flag, not both");
85 #endif
86 			node = mpsc_pop(&iodev_sqe->r->sq);
87 			next = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
88 
89 			/* If the current submission was cancelled before submit,
90 			 * then cancel the next one and generate no response
91 			 */
92 			if (curr->sqe.flags  & RTIO_SQE_CANCELED) {
93 				next->sqe.flags |= cancel_no_response;
94 			}
95 			curr->next = next;
96 			curr = next;
97 			curr->r = r;
98 
99 			__ASSERT(
100 				curr != NULL,
101 				"Expected a valid sqe following transaction or chain flag");
102 		}
103 
104 		curr->next = NULL;
105 		curr->r = r;
106 
107 		rtio_iodev_submit(iodev_sqe);
108 
109 		node = mpsc_pop(&r->sq);
110 	}
111 }
112 
113 /**
114  * @brief Handle common logic when :c:macro:`RTIO_SQE_MULTISHOT` is set
115  *
116  * @param[in] r RTIO context
117  * @param[in] curr Current IODev SQE that's being marked for finished.
118  * @param[in] is_canceled Whether or not the SQE is canceled
119  */
rtio_executor_handle_multishot(struct rtio * r,struct rtio_iodev_sqe * curr,bool is_canceled)120 static inline void rtio_executor_handle_multishot(struct rtio *r, struct rtio_iodev_sqe *curr,
121 						  bool is_canceled)
122 {
123 	/* Reset the mempool if needed */
124 	if (curr->sqe.op == RTIO_OP_RX && FIELD_GET(RTIO_SQE_MEMPOOL_BUFFER, curr->sqe.flags)) {
125 		if (is_canceled) {
126 			/* Free the memory first since no CQE will be generated */
127 			LOG_DBG("Releasing memory @%p size=%u", (void *)curr->sqe.rx.buf,
128 				curr->sqe.rx.buf_len);
129 			rtio_release_buffer(r, curr->sqe.rx.buf, curr->sqe.rx.buf_len);
130 		}
131 		/* Reset the buffer info so the next request can get a new one */
132 		curr->sqe.rx.buf = NULL;
133 		curr->sqe.rx.buf_len = 0;
134 	}
135 	if (!is_canceled) {
136 		/* Request was not canceled, put the SQE back in the queue */
137 		mpsc_push(&r->sq, &curr->q);
138 		rtio_executor_submit(r);
139 	}
140 }
141 
rtio_executor_done(struct rtio_iodev_sqe * iodev_sqe,int result,bool is_ok)142 static inline void rtio_executor_done(struct rtio_iodev_sqe *iodev_sqe, int result, bool is_ok)
143 {
144 	const bool is_multishot = FIELD_GET(RTIO_SQE_MULTISHOT, iodev_sqe->sqe.flags) == 1;
145 	const bool is_canceled = FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags) == 1;
146 	struct rtio *r = iodev_sqe->r;
147 	struct rtio_iodev_sqe *curr = iodev_sqe, *next;
148 	void *userdata;
149 	uint32_t sqe_flags, cqe_flags;
150 
151 	do {
152 		userdata = curr->sqe.userdata;
153 		sqe_flags = curr->sqe.flags;
154 		cqe_flags = rtio_cqe_compute_flags(iodev_sqe);
155 
156 		next = rtio_iodev_sqe_next(curr);
157 		if (is_multishot) {
158 			rtio_executor_handle_multishot(r, curr, is_canceled);
159 		}
160 		if (!is_multishot || is_canceled) {
161 			/* SQE is no longer needed, release it */
162 			rtio_sqe_pool_free(r->sqe_pool, curr);
163 		}
164 		if (!is_canceled && FIELD_GET(RTIO_SQE_NO_RESPONSE, sqe_flags) == 0) {
165 			/* Request was not canceled, generate a CQE */
166 			rtio_cqe_submit(r, result, userdata, cqe_flags);
167 		}
168 		curr = next;
169 		if (!is_ok) {
170 			/* This is an error path, so cancel any chained SQEs */
171 			result = -ECANCELED;
172 		}
173 	} while (sqe_flags & RTIO_SQE_TRANSACTION);
174 
175 	/* curr should now be the last sqe in the transaction if that is what completed */
176 	if (sqe_flags & RTIO_SQE_CHAINED) {
177 		rtio_iodev_submit(curr);
178 	}
179 }
180 
181 /**
182  * @brief Callback from an iodev describing success
183  */
rtio_executor_ok(struct rtio_iodev_sqe * iodev_sqe,int result)184 void rtio_executor_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
185 {
186 	rtio_executor_done(iodev_sqe, result, true);
187 }
188 
189 /**
190  * @brief Callback from an iodev describing error
191  *
192  * Some assumptions are made and should have been validated on rtio_submit
193  * - a sqe marked as chained or transaction has a next sqe
194  * - a sqe is marked either chained or transaction but not both
195  */
rtio_executor_err(struct rtio_iodev_sqe * iodev_sqe,int result)196 void rtio_executor_err(struct rtio_iodev_sqe *iodev_sqe, int result)
197 {
198 	rtio_executor_done(iodev_sqe, result, false);
199 }
200