1 /*
2  * Copyright (c) 2022 Intel Corporation.
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 
7 #include <zephyr/rtio/rtio.h>
8 #include <zephyr/kernel.h>
9 
10 #include <zephyr/logging/log.h>
11 LOG_MODULE_REGISTER(rtio_executor, CONFIG_RTIO_LOG_LEVEL);
12 
13 /**
14  * @brief Executor handled submissions
15  */
rtio_executor_op(struct rtio_iodev_sqe * iodev_sqe)16 static void rtio_executor_op(struct rtio_iodev_sqe *iodev_sqe)
17 {
18 	const struct rtio_sqe *sqe = &iodev_sqe->sqe;
19 
20 	switch (sqe->op) {
21 	case RTIO_OP_CALLBACK:
22 		sqe->callback.callback(iodev_sqe->r, sqe, sqe->callback.arg0);
23 		rtio_iodev_sqe_ok(iodev_sqe, 0);
24 		break;
25 	default:
26 		rtio_iodev_sqe_err(iodev_sqe, -EINVAL);
27 	}
28 }
29 
30 /**
31  * @brief Submit to an iodev a submission to work on
32  *
33  * Should be called by the executor when it wishes to submit work
34  * to an iodev.
35  *
36  * @param iodev_sqe Submission to work on
37  */
rtio_iodev_submit(struct rtio_iodev_sqe * iodev_sqe)38 static inline void rtio_iodev_submit(struct rtio_iodev_sqe *iodev_sqe)
39 {
40 	if (FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags)) {
41 		rtio_iodev_sqe_err(iodev_sqe, -ECANCELED);
42 		return;
43 	}
44 
45 	/* No iodev means its an executor specific operation */
46 	if (iodev_sqe->sqe.iodev == NULL) {
47 		rtio_executor_op(iodev_sqe);
48 		return;
49 	}
50 
51 	iodev_sqe->sqe.iodev->api->submit(iodev_sqe);
52 }
53 
54 /**
55  * @brief Submit operations in the queue to iodevs
56  *
57  * @param r RTIO context
58  *
59  * @retval 0 Always succeeds
60  */
rtio_executor_submit(struct rtio * r)61 void rtio_executor_submit(struct rtio *r)
62 {
63 	const uint16_t cancel_no_response = (RTIO_SQE_CANCELED | RTIO_SQE_NO_RESPONSE);
64 	struct mpsc_node *node = mpsc_pop(&r->sq);
65 
66 	while (node != NULL) {
67 		struct rtio_iodev_sqe *iodev_sqe = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
68 
69 		/* If this submission was cancelled before submit, then generate no response */
70 		if (iodev_sqe->sqe.flags  & RTIO_SQE_CANCELED) {
71 			iodev_sqe->sqe.flags |= cancel_no_response;
72 		}
73 		iodev_sqe->r = r;
74 
75 		struct rtio_iodev_sqe *curr = iodev_sqe, *next;
76 
77 		/* Link up transaction or queue list if needed */
78 		while (curr->sqe.flags & (RTIO_SQE_TRANSACTION | RTIO_SQE_CHAINED)) {
79 #ifdef CONFIG_ASSERT
80 			bool transaction = iodev_sqe->sqe.flags & RTIO_SQE_TRANSACTION;
81 			bool chained = iodev_sqe->sqe.flags & RTIO_SQE_CHAINED;
82 
83 			__ASSERT(transaction != chained,
84 				    "Expected chained or transaction flag, not both");
85 #endif
86 			node = mpsc_pop(&iodev_sqe->r->sq);
87 
88 			__ASSERT(node != NULL,
89 				    "Expected a valid submission in the queue while in a transaction or chain");
90 
91 			next = CONTAINER_OF(node, struct rtio_iodev_sqe, q);
92 
93 			/* If the current submission was cancelled before submit,
94 			 * then cancel the next one and generate no response
95 			 */
96 			if (curr->sqe.flags  & RTIO_SQE_CANCELED) {
97 				next->sqe.flags |= cancel_no_response;
98 			}
99 			curr->next = next;
100 			curr = next;
101 			curr->r = r;
102 
103 			__ASSERT(
104 				curr != NULL,
105 				"Expected a valid sqe following transaction or chain flag");
106 		}
107 
108 		curr->next = NULL;
109 		curr->r = r;
110 
111 		rtio_iodev_submit(iodev_sqe);
112 
113 		node = mpsc_pop(&r->sq);
114 	}
115 }
116 
117 /**
118  * @brief Handle common logic when :c:macro:`RTIO_SQE_MULTISHOT` is set
119  *
120  * @param[in] r RTIO context
121  * @param[in] curr Current IODev SQE that's being marked for finished.
122  * @param[in] is_canceled Whether or not the SQE is canceled
123  */
rtio_executor_handle_multishot(struct rtio * r,struct rtio_iodev_sqe * curr,bool is_canceled)124 static inline void rtio_executor_handle_multishot(struct rtio *r, struct rtio_iodev_sqe *curr,
125 						  bool is_canceled)
126 {
127 	/* Reset the mempool if needed */
128 	if (curr->sqe.op == RTIO_OP_RX && FIELD_GET(RTIO_SQE_MEMPOOL_BUFFER, curr->sqe.flags)) {
129 		if (is_canceled) {
130 			/* Free the memory first since no CQE will be generated */
131 			LOG_DBG("Releasing memory @%p size=%u", (void *)curr->sqe.rx.buf,
132 				curr->sqe.rx.buf_len);
133 			rtio_release_buffer(r, curr->sqe.rx.buf, curr->sqe.rx.buf_len);
134 		}
135 		/* Reset the buffer info so the next request can get a new one */
136 		curr->sqe.rx.buf = NULL;
137 		curr->sqe.rx.buf_len = 0;
138 	}
139 	if (!is_canceled) {
140 		/* Request was not canceled, put the SQE back in the queue */
141 		mpsc_push(&r->sq, &curr->q);
142 		rtio_executor_submit(r);
143 	}
144 }
145 
rtio_executor_done(struct rtio_iodev_sqe * iodev_sqe,int result,bool is_ok)146 static inline void rtio_executor_done(struct rtio_iodev_sqe *iodev_sqe, int result, bool is_ok)
147 {
148 	const bool is_multishot = FIELD_GET(RTIO_SQE_MULTISHOT, iodev_sqe->sqe.flags) == 1;
149 	const bool is_canceled = FIELD_GET(RTIO_SQE_CANCELED, iodev_sqe->sqe.flags) == 1;
150 	struct rtio *r = iodev_sqe->r;
151 	struct rtio_iodev_sqe *curr = iodev_sqe, *next;
152 	void *userdata;
153 	uint32_t sqe_flags, cqe_flags;
154 
155 	do {
156 		userdata = curr->sqe.userdata;
157 		sqe_flags = curr->sqe.flags;
158 		cqe_flags = rtio_cqe_compute_flags(iodev_sqe);
159 
160 		next = rtio_iodev_sqe_next(curr);
161 		if (is_multishot) {
162 			rtio_executor_handle_multishot(r, curr, is_canceled);
163 		}
164 		if (!is_multishot || is_canceled) {
165 			/* SQE is no longer needed, release it */
166 			rtio_sqe_pool_free(r->sqe_pool, curr);
167 		}
168 		if (!is_canceled && FIELD_GET(RTIO_SQE_NO_RESPONSE, sqe_flags) == 0) {
169 			/* Request was not canceled, generate a CQE */
170 			rtio_cqe_submit(r, result, userdata, cqe_flags);
171 		}
172 		curr = next;
173 		if (!is_ok) {
174 			/* This is an error path, so cancel any chained SQEs */
175 			result = -ECANCELED;
176 		}
177 	} while (sqe_flags & RTIO_SQE_TRANSACTION);
178 
179 	/* curr should now be the last sqe in the transaction if that is what completed */
180 	if (sqe_flags & RTIO_SQE_CHAINED) {
181 		rtio_iodev_submit(curr);
182 	}
183 }
184 
185 /**
186  * @brief Callback from an iodev describing success
187  */
rtio_executor_ok(struct rtio_iodev_sqe * iodev_sqe,int result)188 void rtio_executor_ok(struct rtio_iodev_sqe *iodev_sqe, int result)
189 {
190 	rtio_executor_done(iodev_sqe, result, true);
191 }
192 
193 /**
194  * @brief Callback from an iodev describing error
195  *
196  * Some assumptions are made and should have been validated on rtio_submit
197  * - a sqe marked as chained or transaction has a next sqe
198  * - a sqe is marked either chained or transaction but not both
199  */
rtio_executor_err(struct rtio_iodev_sqe * iodev_sqe,int result)200 void rtio_executor_err(struct rtio_iodev_sqe *iodev_sqe, int result)
201 {
202 	rtio_executor_done(iodev_sqe, result, false);
203 }
204