1 /******************************************************************************
2 *
3 * Copyright (C) 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
19 #include <string.h>
20
21 #include "osi/allocator.h"
22 #include "osi/fixed_queue.h"
23 #include "osi/semaphore.h"
24 #include "osi/thread.h"
25
26 struct osi_thread {
27 void *thread_handle; /*!< Store the thread object */
28 int thread_id; /*!< May for some OS, such as Linux */
29 bool stop;
30 uint8_t work_queue_num; /*!< Work queue number */
31 fixed_queue_t **work_queues; /*!< Point to queue array, and the priority inverse array index */
32 osi_sem_t work_sem;
33 osi_sem_t stop_sem;
34 };
35
36 struct osi_thread_start_arg {
37 osi_thread_t *thread;
38 osi_sem_t start_sem;
39 int error;
40 };
41
42 typedef struct {
43 osi_thread_func_t func;
44 void *context;
45 } work_item_t;
46
47 static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 100;
48
osi_thread_run(void * arg)49 static void osi_thread_run(void *arg)
50 {
51 struct osi_thread_start_arg *start = (struct osi_thread_start_arg *)arg;
52 osi_thread_t *thread = start->thread;
53
54 osi_sem_give(&start->start_sem);
55
56 while (1) {
57 int idx = 0;
58
59 osi_sem_take(&thread->work_sem, OSI_SEM_MAX_TIMEOUT);
60
61 if (thread->stop) {
62 break;
63 }
64
65 while (!thread->stop && idx < thread->work_queue_num) {
66 work_item_t *item = fixed_queue_dequeue(thread->work_queues[idx], 0);
67 if (item) {
68 item->func(item->context);
69 osi_free(item);
70 idx = 0;
71 continue;
72 } else {
73 idx++;
74 }
75 }
76 }
77
78 thread->thread_handle = NULL;
79 osi_sem_give(&thread->stop_sem);
80
81 vTaskDelete(NULL);
82 }
83
osi_thread_join(osi_thread_t * thread,uint32_t wait_ms)84 static int osi_thread_join(osi_thread_t *thread, uint32_t wait_ms)
85 {
86 assert(thread != NULL);
87 return osi_sem_take(&thread->stop_sem, wait_ms);
88 }
89
osi_thread_stop(osi_thread_t * thread)90 static void osi_thread_stop(osi_thread_t *thread)
91 {
92 int ret;
93
94 assert(thread != NULL);
95
96 //stop the thread
97 thread->stop = true;
98 osi_sem_give(&thread->work_sem);
99
100 //join
101 ret = osi_thread_join(thread, 1000); //wait 1000ms
102
103 //if join failed, delete the task here
104 if (ret != 0 && thread->thread_handle) {
105 vTaskDelete(thread->thread_handle);
106 }
107 }
108
109 //in linux, the stack_size, priority and core may not be set here, the code will be ignore the arguments
osi_thread_create(const char * name,size_t stack_size,int priority,osi_thread_core_t core,uint8_t work_queue_num)110 osi_thread_t *osi_thread_create(const char *name, size_t stack_size, int priority, osi_thread_core_t core, uint8_t work_queue_num)
111 {
112 int ret;
113 struct osi_thread_start_arg start_arg = {0};
114
115 if (stack_size <= 0 ||
116 core < OSI_THREAD_CORE_0 || core > OSI_THREAD_CORE_AFFINITY ||
117 work_queue_num <= 0) {
118 return NULL;
119 }
120
121 osi_thread_t *thread = (osi_thread_t *)osi_malloc(sizeof(osi_thread_t));
122 if (thread == NULL) {
123 goto _err;
124 }
125
126 thread->stop = false;
127 thread->work_queue_num = work_queue_num;
128 thread->work_queues = (fixed_queue_t **)osi_malloc(sizeof(fixed_queue_t *) * work_queue_num);
129 if (thread->work_queues == NULL) {
130 goto _err;
131 }
132
133 for (int i = 0; i < thread->work_queue_num; i++) {
134 thread->work_queues[i] = fixed_queue_new(DEFAULT_WORK_QUEUE_CAPACITY);
135 if (thread->work_queues[i] == NULL) {
136 goto _err;
137 }
138 }
139
140 ret = osi_sem_new(&thread->work_sem, 1, 0);
141 if (ret != 0) {
142 goto _err;
143 }
144
145 ret = osi_sem_new(&thread->stop_sem, 1, 0);
146 if (ret != 0) {
147 goto _err;
148 }
149
150 start_arg.thread = thread;
151 ret = osi_sem_new(&start_arg.start_sem, 1, 0);
152 if (ret != 0) {
153 goto _err;
154 }
155
156 if (xTaskCreatePinnedToCore(osi_thread_run, name, stack_size, &start_arg, priority, &thread->thread_handle, core) != pdPASS) {
157 goto _err;
158 }
159
160 osi_sem_take(&start_arg.start_sem, OSI_SEM_MAX_TIMEOUT);
161 osi_sem_free(&start_arg.start_sem);
162
163 return thread;
164
165 _err:
166
167 if (thread) {
168 if (start_arg.start_sem) {
169 osi_sem_free(&start_arg.start_sem);
170 }
171
172 if (thread->thread_handle) {
173 vTaskDelete(thread->thread_handle);
174 }
175
176 for (int i = 0; i < thread->work_queue_num; i++) {
177 if (thread->work_queues[i]) {
178 fixed_queue_free(thread->work_queues[i], osi_free_func);
179 }
180 }
181
182 if (thread->work_queues) {
183 osi_free(thread->work_queues);
184 }
185
186 if (thread->work_sem) {
187 osi_sem_free(&thread->work_sem);
188 }
189
190 if (thread->stop_sem) {
191 osi_sem_free(&thread->stop_sem);
192 }
193
194 osi_free(thread);
195 }
196
197 return NULL;
198 }
199
osi_thread_free(osi_thread_t * thread)200 void osi_thread_free(osi_thread_t *thread)
201 {
202 if (!thread)
203 return;
204
205 osi_thread_stop(thread);
206
207 for (int i = 0; i < thread->work_queue_num; i++) {
208 if (thread->work_queues[i]) {
209 fixed_queue_free(thread->work_queues[i], osi_free_func);
210 }
211 }
212
213 if (thread->work_queues) {
214 osi_free(thread->work_queues);
215 }
216
217 if (thread->work_sem) {
218 osi_sem_free(&thread->work_sem);
219 }
220
221 if (thread->stop_sem) {
222 osi_sem_free(&thread->stop_sem);
223 }
224
225
226 osi_free(thread);
227 }
228
osi_thread_post(osi_thread_t * thread,osi_thread_func_t func,void * context,int queue_idx,uint32_t timeout)229 bool osi_thread_post(osi_thread_t *thread, osi_thread_func_t func, void *context, int queue_idx, uint32_t timeout)
230 {
231 assert(thread != NULL);
232 assert(func != NULL);
233
234 if (queue_idx >= thread->work_queue_num) {
235 return false;
236 }
237
238 work_item_t *item = (work_item_t *)osi_malloc(sizeof(work_item_t));
239 if (item == NULL) {
240 return false;
241 }
242 item->func = func;
243 item->context = context;
244
245 if (fixed_queue_enqueue(thread->work_queues[queue_idx], item, timeout) == false) {
246 osi_free(item);
247 return false;
248 }
249
250 osi_sem_give(&thread->work_sem);
251
252 return true;
253 }
254
osi_thread_set_priority(osi_thread_t * thread,int priority)255 bool osi_thread_set_priority(osi_thread_t *thread, int priority)
256 {
257 assert(thread != NULL);
258
259 vTaskPrioritySet(thread->thread_handle, priority);
260 return true;
261 }
262
osi_thread_name(osi_thread_t * thread)263 const char *osi_thread_name(osi_thread_t *thread)
264 {
265 assert(thread != NULL);
266
267 return pcTaskGetTaskName(thread->thread_handle);
268 }
269
osi_thread_queue_wait_size(osi_thread_t * thread,int wq_idx)270 int osi_thread_queue_wait_size(osi_thread_t *thread, int wq_idx)
271 {
272 if (wq_idx < 0 || wq_idx >= thread->work_queue_num) {
273 return -1;
274 }
275
276 return fixed_queue_length(thread->work_queues[wq_idx]);
277 }
278