1 /******************************************************************************
2 *
3 * Copyright (C) 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
19 #include <string.h>
20
21 #include "osi/allocator.h"
22 #include "freertos/FreeRTOS.h"
23 #include "freertos/queue.h"
24 #include "osi/semaphore.h"
25 #include "osi/thread.h"
26 #include "osi/mutex.h"
27
28 struct work_item {
29 osi_thread_func_t func;
30 void *context;
31 };
32
33 struct work_queue {
34 QueueHandle_t queue;
35 size_t capacity;
36 };
37
38 struct osi_thread {
39 TaskHandle_t thread_handle; /*!< Store the thread object */
40 int thread_id; /*!< May for some OS, such as Linux */
41 bool stop;
42 uint8_t work_queue_num; /*!< Work queue number */
43 struct work_queue **work_queues; /*!< Point to queue array, and the priority inverse array index */
44 osi_sem_t work_sem;
45 osi_sem_t stop_sem;
46 };
47
48 struct osi_thread_start_arg {
49 osi_thread_t *thread;
50 osi_sem_t start_sem;
51 int error;
52 };
53
54 struct osi_event {
55 struct work_item item;
56 osi_mutex_t lock;
57 uint16_t is_queued;
58 uint16_t queue_idx;
59 osi_thread_t *thread;
60 };
61
62 static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 100;
63
osi_work_queue_create(size_t capacity)64 static struct work_queue *osi_work_queue_create(size_t capacity)
65 {
66 if (capacity == 0) {
67 return NULL;
68 }
69
70 struct work_queue *wq = (struct work_queue *)osi_malloc(sizeof(struct work_queue));
71 if (wq != NULL) {
72 wq->queue = xQueueCreate(capacity, sizeof(struct work_item));
73 if (wq->queue != 0) {
74 wq->capacity = capacity;
75 return wq;
76 } else {
77 osi_free(wq);
78 }
79 }
80
81 return NULL;
82 }
83
osi_work_queue_delete(struct work_queue * wq)84 static void osi_work_queue_delete(struct work_queue *wq)
85 {
86 if (wq != NULL) {
87 if (wq->queue != 0) {
88 vQueueDelete(wq->queue);
89 }
90 wq->queue = 0;
91 wq->capacity = 0;
92 osi_free(wq);
93 }
94 return;
95 }
96
osi_thead_work_queue_get(struct work_queue * wq,struct work_item * item)97 static bool osi_thead_work_queue_get(struct work_queue *wq, struct work_item *item)
98 {
99 assert (wq != NULL);
100 assert (wq->queue != 0);
101 assert (item != NULL);
102
103 if (pdTRUE == xQueueReceive(wq->queue, item, 0)) {
104 return true;
105 } else {
106 return false;
107 }
108 }
109
osi_thead_work_queue_put(struct work_queue * wq,const struct work_item * item,uint32_t timeout)110 static bool osi_thead_work_queue_put(struct work_queue *wq, const struct work_item *item, uint32_t timeout)
111 {
112 assert (wq != NULL);
113 assert (wq->queue != 0);
114 assert (item != NULL);
115
116 bool ret = true;
117 if (timeout == OSI_SEM_MAX_TIMEOUT) {
118 if (xQueueSend(wq->queue, item, portMAX_DELAY) != pdTRUE) {
119 ret = false;
120 }
121 } else {
122 if (xQueueSend(wq->queue, item, timeout / portTICK_PERIOD_MS) != pdTRUE) {
123 ret = false;
124 }
125 }
126
127 return ret;
128 }
129
osi_thead_work_queue_len(struct work_queue * wq)130 static size_t osi_thead_work_queue_len(struct work_queue *wq)
131 {
132 assert (wq != NULL);
133 assert (wq->queue != 0);
134 assert (wq->capacity != 0);
135
136 size_t available_spaces = (size_t)uxQueueSpacesAvailable(wq->queue);
137
138 if (available_spaces <= wq->capacity) {
139 return wq->capacity - available_spaces;
140 } else {
141 assert (0);
142 }
143 return 0;
144 }
145
osi_thread_run(void * arg)146 static void osi_thread_run(void *arg)
147 {
148 struct osi_thread_start_arg *start = (struct osi_thread_start_arg *)arg;
149 osi_thread_t *thread = start->thread;
150
151 osi_sem_give(&start->start_sem);
152
153 while (1) {
154 int idx = 0;
155
156 osi_sem_take(&thread->work_sem, OSI_SEM_MAX_TIMEOUT);
157
158 if (thread->stop) {
159 break;
160 }
161
162 struct work_item item;
163 while (!thread->stop && idx < thread->work_queue_num) {
164 if (osi_thead_work_queue_get(thread->work_queues[idx], &item) == true) {
165 item.func(item.context);
166 idx = 0;
167 continue;
168 } else {
169 idx++;
170 }
171 }
172 }
173
174 thread->thread_handle = NULL;
175 osi_sem_give(&thread->stop_sem);
176
177 vTaskDelete(NULL);
178 }
179
osi_thread_join(osi_thread_t * thread,uint32_t wait_ms)180 static int osi_thread_join(osi_thread_t *thread, uint32_t wait_ms)
181 {
182 assert(thread != NULL);
183 return osi_sem_take(&thread->stop_sem, wait_ms);
184 }
185
osi_thread_stop(osi_thread_t * thread)186 static void osi_thread_stop(osi_thread_t *thread)
187 {
188 int ret;
189
190 assert(thread != NULL);
191
192 //stop the thread
193 thread->stop = true;
194 osi_sem_give(&thread->work_sem);
195
196 //join
197 ret = osi_thread_join(thread, 1000); //wait 1000ms
198
199 //if join failed, delete the task here
200 if (ret != 0 && thread->thread_handle) {
201 vTaskDelete(thread->thread_handle);
202 }
203 }
204
205 //in linux, the stack_size, priority and core may not be set here, the code will be ignore the arguments
osi_thread_create(const char * name,size_t stack_size,int priority,osi_thread_core_t core,uint8_t work_queue_num,const size_t work_queue_len[])206 osi_thread_t *osi_thread_create(const char *name, size_t stack_size, int priority, osi_thread_core_t core, uint8_t work_queue_num, const size_t work_queue_len[])
207 {
208 int ret;
209 struct osi_thread_start_arg start_arg = {0};
210
211 if (stack_size <= 0 ||
212 core < OSI_THREAD_CORE_0 || core > OSI_THREAD_CORE_AFFINITY ||
213 work_queue_num <= 0 || work_queue_len == NULL) {
214 return NULL;
215 }
216
217 osi_thread_t *thread = (osi_thread_t *)osi_calloc(sizeof(osi_thread_t));
218 if (thread == NULL) {
219 goto _err;
220 }
221
222 thread->stop = false;
223 thread->work_queues = (struct work_queue **)osi_calloc(sizeof(struct work_queue *) * work_queue_num);
224 if (thread->work_queues == NULL) {
225 goto _err;
226 }
227 thread->work_queue_num = work_queue_num;
228
229 for (int i = 0; i < thread->work_queue_num; i++) {
230 size_t queue_len = work_queue_len[i] ? work_queue_len[i] : DEFAULT_WORK_QUEUE_CAPACITY;
231 thread->work_queues[i] = osi_work_queue_create(queue_len);
232 if (thread->work_queues[i] == NULL) {
233 goto _err;
234 }
235 }
236
237 ret = osi_sem_new(&thread->work_sem, 1, 0);
238 if (ret != 0) {
239 goto _err;
240 }
241
242 ret = osi_sem_new(&thread->stop_sem, 1, 0);
243 if (ret != 0) {
244 goto _err;
245 }
246
247 start_arg.thread = thread;
248 ret = osi_sem_new(&start_arg.start_sem, 1, 0);
249 if (ret != 0) {
250 goto _err;
251 }
252
253 if (xTaskCreatePinnedToCore(osi_thread_run, name, stack_size, &start_arg, priority, &thread->thread_handle, core) != pdPASS) {
254 goto _err;
255 }
256
257 osi_sem_take(&start_arg.start_sem, OSI_SEM_MAX_TIMEOUT);
258 osi_sem_free(&start_arg.start_sem);
259
260 return thread;
261
262 _err:
263
264 if (thread) {
265 if (start_arg.start_sem) {
266 osi_sem_free(&start_arg.start_sem);
267 }
268
269 if (thread->thread_handle) {
270 vTaskDelete(thread->thread_handle);
271 }
272
273 for (int i = 0; i < thread->work_queue_num; i++) {
274 if (thread->work_queues[i]) {
275 osi_work_queue_delete(thread->work_queues[i]);
276 }
277 thread->work_queues[i] = NULL;
278 }
279
280 if (thread->work_queues) {
281 osi_free(thread->work_queues);
282 thread->work_queues = NULL;
283 }
284
285 if (thread->work_sem) {
286 osi_sem_free(&thread->work_sem);
287 }
288
289 if (thread->stop_sem) {
290 osi_sem_free(&thread->stop_sem);
291 }
292
293 osi_free(thread);
294 }
295
296 return NULL;
297 }
298
osi_thread_free(osi_thread_t * thread)299 void osi_thread_free(osi_thread_t *thread)
300 {
301 if (!thread)
302 return;
303
304 osi_thread_stop(thread);
305
306 for (int i = 0; i < thread->work_queue_num; i++) {
307 if (thread->work_queues[i]) {
308 osi_work_queue_delete(thread->work_queues[i]);
309 thread->work_queues[i] = NULL;
310 }
311 }
312
313 if (thread->work_queues) {
314 osi_free(thread->work_queues);
315 thread->work_queues = NULL;
316 }
317
318 if (thread->work_sem) {
319 osi_sem_free(&thread->work_sem);
320 }
321
322 if (thread->stop_sem) {
323 osi_sem_free(&thread->stop_sem);
324 }
325
326
327 osi_free(thread);
328 }
329
osi_thread_post(osi_thread_t * thread,osi_thread_func_t func,void * context,int queue_idx,uint32_t timeout)330 bool osi_thread_post(osi_thread_t *thread, osi_thread_func_t func, void *context, int queue_idx, uint32_t timeout)
331 {
332 assert(thread != NULL);
333 assert(func != NULL);
334
335 if (queue_idx >= thread->work_queue_num) {
336 return false;
337 }
338
339 struct work_item item;
340
341 item.func = func;
342 item.context = context;
343
344 if (osi_thead_work_queue_put(thread->work_queues[queue_idx], &item, timeout) == false) {
345 return false;
346 }
347
348 osi_sem_give(&thread->work_sem);
349
350 return true;
351 }
352
osi_thread_set_priority(osi_thread_t * thread,int priority)353 bool osi_thread_set_priority(osi_thread_t *thread, int priority)
354 {
355 assert(thread != NULL);
356
357 vTaskPrioritySet(thread->thread_handle, priority);
358 return true;
359 }
360
osi_thread_name(osi_thread_t * thread)361 const char *osi_thread_name(osi_thread_t *thread)
362 {
363 assert(thread != NULL);
364
365 return pcTaskGetName(thread->thread_handle);
366 }
367
osi_thread_queue_wait_size(osi_thread_t * thread,int wq_idx)368 int osi_thread_queue_wait_size(osi_thread_t *thread, int wq_idx)
369 {
370 if (wq_idx < 0 || wq_idx >= thread->work_queue_num) {
371 return -1;
372 }
373
374 return (int)(osi_thead_work_queue_len(thread->work_queues[wq_idx]));
375 }
376
377
osi_event_create(osi_thread_func_t func,void * context)378 struct osi_event *osi_event_create(osi_thread_func_t func, void *context)
379 {
380 struct osi_event *event = osi_calloc(sizeof(struct osi_event));
381 if (event != NULL) {
382 if (osi_mutex_new(&event->lock) == 0) {
383 event->item.func = func;
384 event->item.context = context;
385 return event;
386 }
387 osi_free(event);
388 }
389
390 return NULL;
391 }
392
osi_event_delete(struct osi_event * event)393 void osi_event_delete(struct osi_event* event)
394 {
395 if (event != NULL) {
396 osi_mutex_free(&event->lock);
397 memset(event, 0, sizeof(struct osi_event));
398 osi_free(event);
399 }
400 }
401
osi_event_bind(struct osi_event * event,osi_thread_t * thread,int queue_idx)402 bool osi_event_bind(struct osi_event* event, osi_thread_t *thread, int queue_idx)
403 {
404 if (event == NULL || event->thread != NULL) {
405 return false;
406 }
407
408 if (thread == NULL || queue_idx >= thread->work_queue_num) {
409 return false;
410 }
411
412 event->thread = thread;
413 event->queue_idx = queue_idx;
414
415 return true;
416 }
417
osi_thread_generic_event_handler(void * context)418 static void osi_thread_generic_event_handler(void *context)
419 {
420 struct osi_event *event = (struct osi_event *)context;
421 if (event != NULL && event->item.func != NULL) {
422 osi_mutex_lock(&event->lock, OSI_MUTEX_MAX_TIMEOUT);
423 event->is_queued = 0;
424 osi_mutex_unlock(&event->lock);
425 event->item.func(event->item.context);
426 }
427 }
428
osi_thread_post_event(struct osi_event * event,uint32_t timeout)429 bool osi_thread_post_event(struct osi_event *event, uint32_t timeout)
430 {
431 assert(event != NULL && event->thread != NULL);
432 assert(event->queue_idx >= 0 && event->queue_idx < event->thread->work_queue_num);
433 bool ret = false;
434 if (event->is_queued == 0) {
435 uint16_t acquire_cnt = 0;
436 osi_mutex_lock(&event->lock, OSI_MUTEX_MAX_TIMEOUT);
437 event->is_queued += 1;
438 acquire_cnt = event->is_queued;
439 osi_mutex_unlock(&event->lock);
440
441 if (acquire_cnt == 1) {
442 ret = osi_thread_post(event->thread, osi_thread_generic_event_handler, event, event->queue_idx, timeout);
443 if (!ret) {
444 // clear "is_queued" when post failure, to allow for following event posts
445 osi_mutex_lock(&event->lock, OSI_MUTEX_MAX_TIMEOUT);
446 event->is_queued = 0;
447 osi_mutex_unlock(&event->lock);
448 }
449 }
450 }
451
452 return ret;
453 }
454