1 /*
2 * Copyright (c) 2020 - 2024 the ThorVG project. All rights reserved.
3
4 * Permission is hereby granted, free of charge, to any person obtaining a copy
5 * of this software and associated documentation files (the "Software"), to deal
6 * in the Software without restriction, including without limitation the rights
7 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
8 * copies of the Software, and to permit persons to whom the Software is
9 * furnished to do so, subject to the following conditions:
10
11 * The above copyright notice and this permission notice shall be included in all
12 * copies or substantial portions of the Software.
13
14 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
15 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
16 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
17 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
18 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
20 * SOFTWARE.
21 */
22
23 #include "../../lv_conf_internal.h"
24 #if LV_USE_THORVG_INTERNAL
25
26 #include "tvgArray.h"
27 #include "tvgInlist.h"
28 #include "tvgTaskScheduler.h"
29
30 #ifdef THORVG_THREAD_SUPPORT
31 #include <thread>
32 #include <atomic>
33 #endif
34
35 /************************************************************************/
36 /* Internal Class Implementation */
37 /************************************************************************/
38
39 namespace tvg {
40
41 struct TaskSchedulerImpl;
42 static TaskSchedulerImpl* inst = nullptr;
43
44 #ifdef THORVG_THREAD_SUPPORT
45
46 static thread_local bool _async = true;
47
48 struct TaskQueue {
49 Inlist<Task> taskDeque;
50 mutex mtx;
51 condition_variable ready;
52 bool done = false;
53
tryPoptvg::TaskQueue54 bool tryPop(Task** task)
55 {
56 unique_lock<mutex> lock{mtx, try_to_lock};
57 if (!lock || taskDeque.empty()) return false;
58 *task = taskDeque.front();
59 return true;
60 }
61
tryPushtvg::TaskQueue62 bool tryPush(Task* task)
63 {
64 {
65 unique_lock<mutex> lock{mtx, try_to_lock};
66 if (!lock) return false;
67 taskDeque.back(task);
68 }
69 ready.notify_one();
70 return true;
71 }
72
completetvg::TaskQueue73 void complete()
74 {
75 {
76 lock_guard<mutex> lock{mtx};
77 done = true;
78 }
79 ready.notify_all();
80 }
81
poptvg::TaskQueue82 bool pop(Task** task)
83 {
84 unique_lock<mutex> lock{mtx};
85
86 while (taskDeque.empty() && !done) {
87 ready.wait(lock);
88 }
89
90 if (taskDeque.empty()) return false;
91
92 *task = taskDeque.front();
93 return true;
94 }
95
pushtvg::TaskQueue96 void push(Task* task)
97 {
98 {
99 lock_guard<mutex> lock{mtx};
100 taskDeque.back(task);
101 }
102 ready.notify_one();
103 }
104 };
105
106
107 struct TaskSchedulerImpl
108 {
109 Array<thread*> threads;
110 Array<TaskQueue*> taskQueues;
111 atomic<uint32_t> idx{0};
112
TaskSchedulerImpltvg::TaskSchedulerImpl113 TaskSchedulerImpl(uint32_t threadCnt)
114 {
115 threads.reserve(threadCnt);
116 taskQueues.reserve(threadCnt);
117
118 for (uint32_t i = 0; i < threadCnt; ++i) {
119 taskQueues.push(new TaskQueue);
120 threads.push(new thread);
121 }
122 for (uint32_t i = 0; i < threadCnt; ++i) {
123 *threads.data[i] = thread([&, i] { run(i); });
124 }
125 }
126
~TaskSchedulerImpltvg::TaskSchedulerImpl127 ~TaskSchedulerImpl()
128 {
129 for (auto tq = taskQueues.begin(); tq < taskQueues.end(); ++tq) {
130 (*tq)->complete();
131 }
132 for (auto thread = threads.begin(); thread < threads.end(); ++thread) {
133 (*thread)->join();
134 delete(*thread);
135 }
136 for (auto tq = taskQueues.begin(); tq < taskQueues.end(); ++tq) {
137 delete(*tq);
138 }
139 }
140
runtvg::TaskSchedulerImpl141 void run(unsigned i)
142 {
143 Task* task;
144
145 //Thread Loop
146 while (true) {
147 auto success = false;
148 for (uint32_t x = 0; x < threads.count * 2; ++x) {
149 if (taskQueues[(i + x) % threads.count]->tryPop(&task)) {
150 success = true;
151 break;
152 }
153 }
154
155 if (!success && !taskQueues[i]->pop(&task)) break;
156 (*task)(i + 1);
157 }
158 }
159
requesttvg::TaskSchedulerImpl160 void request(Task* task)
161 {
162 //Async
163 if (threads.count > 0 && _async) {
164 task->prepare();
165 auto i = idx++;
166 for (uint32_t n = 0; n < threads.count; ++n) {
167 if (taskQueues[(i + n) % threads.count]->tryPush(task)) return;
168 }
169 taskQueues[i % threads.count]->push(task);
170 //Sync
171 } else {
172 task->run(0);
173 }
174 }
175
threadCnttvg::TaskSchedulerImpl176 uint32_t threadCnt()
177 {
178 return threads.count;
179 }
180 };
181
182 #else //THORVG_THREAD_SUPPORT
183
184 static bool _async = true;
185
186 struct TaskSchedulerImpl
187 {
TaskSchedulerImpltvg::TaskSchedulerImpl188 TaskSchedulerImpl(TVG_UNUSED uint32_t threadCnt) {}
requesttvg::TaskSchedulerImpl189 void request(Task* task) { task->run(0); }
threadCnttvg::TaskSchedulerImpl190 uint32_t threadCnt() { return 0; }
191 };
192
193 #endif //THORVG_THREAD_SUPPORT
194
195 } //namespace
196
197 /************************************************************************/
198 /* External Class Implementation */
199 /************************************************************************/
200
init(uint32_t threads)201 void TaskScheduler::init(uint32_t threads)
202 {
203 if (inst) return;
204 inst = new TaskSchedulerImpl(threads);
205 }
206
207
term()208 void TaskScheduler::term()
209 {
210 delete(inst);
211 inst = nullptr;
212 }
213
214
request(Task * task)215 void TaskScheduler::request(Task* task)
216 {
217 if (inst) inst->request(task);
218 }
219
220
threads()221 uint32_t TaskScheduler::threads()
222 {
223 if (inst) return inst->threadCnt();
224 return 0;
225 }
226
227
async(bool on)228 void TaskScheduler::async(bool on)
229 {
230 //toggle async tasking for each thread on/off
231 _async = on;
232 }
233
234 #endif /* LV_USE_THORVG_INTERNAL */
235
236