1 /******************************************************************************
2  *
3  *  Copyright (C) 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #include <string.h>
20 
21 #include "osi/allocator.h"
22 #include "freertos/FreeRTOS.h"
23 #include "freertos/queue.h"
24 #include "osi/semaphore.h"
25 #include "osi/thread.h"
26 #include "osi/mutex.h"
27 
28 struct work_item {
29     osi_thread_func_t func;
30     void *context;
31 };
32 
33 struct work_queue {
34     QueueHandle_t queue;
35     size_t capacity;
36 };
37 
38 struct osi_thread {
39   TaskHandle_t thread_handle;           /*!< Store the thread object */
40   int  thread_id;                       /*!< May for some OS, such as Linux */
41   bool stop;
42   uint8_t work_queue_num;               /*!< Work queue number */
43   struct work_queue **work_queues;      /*!< Point to queue array, and the priority inverse array index */
44   osi_sem_t work_sem;
45   osi_sem_t stop_sem;
46 };
47 
48 struct osi_thread_start_arg {
49   osi_thread_t *thread;
50   osi_sem_t start_sem;
51   int error;
52 };
53 
54 struct osi_event {
55     struct work_item item;
56     osi_mutex_t lock;
57     uint16_t is_queued;
58     uint16_t queue_idx;
59     osi_thread_t *thread;
60 };
61 
62 static const size_t DEFAULT_WORK_QUEUE_CAPACITY = 100;
63 
osi_work_queue_create(size_t capacity)64 static struct work_queue *osi_work_queue_create(size_t capacity)
65 {
66     if (capacity == 0) {
67         return NULL;
68     }
69 
70     struct work_queue *wq = (struct work_queue *)osi_malloc(sizeof(struct work_queue));
71     if (wq != NULL) {
72         wq->queue = xQueueCreate(capacity, sizeof(struct work_item));
73         if (wq->queue != 0) {
74             wq->capacity = capacity;
75             return wq;
76         } else {
77             osi_free(wq);
78         }
79     }
80 
81     return NULL;
82 }
83 
osi_work_queue_delete(struct work_queue * wq)84 static void osi_work_queue_delete(struct work_queue *wq)
85 {
86     if (wq != NULL) {
87         if (wq->queue != 0) {
88             vQueueDelete(wq->queue);
89         }
90         wq->queue = 0;
91         wq->capacity = 0;
92         osi_free(wq);
93     }
94     return;
95 }
96 
osi_thead_work_queue_get(struct work_queue * wq,struct work_item * item)97 static bool osi_thead_work_queue_get(struct work_queue *wq, struct work_item *item)
98 {
99     assert (wq != NULL);
100     assert (wq->queue != 0);
101     assert (item != NULL);
102 
103     if (pdTRUE == xQueueReceive(wq->queue, item, 0)) {
104         return true;
105     } else {
106         return false;
107     }
108 }
109 
osi_thead_work_queue_put(struct work_queue * wq,const struct work_item * item,uint32_t timeout)110 static bool osi_thead_work_queue_put(struct work_queue *wq, const struct work_item *item, uint32_t timeout)
111 {
112     assert (wq != NULL);
113     assert (wq->queue != 0);
114     assert (item != NULL);
115 
116     bool ret = true;
117     if (timeout ==  OSI_SEM_MAX_TIMEOUT) {
118         if (xQueueSend(wq->queue, item, portMAX_DELAY) != pdTRUE) {
119             ret = false;
120         }
121     } else {
122         if (xQueueSend(wq->queue, item, timeout / portTICK_PERIOD_MS) != pdTRUE) {
123             ret = false;
124         }
125     }
126 
127     return ret;
128 }
129 
osi_thead_work_queue_len(struct work_queue * wq)130 static size_t osi_thead_work_queue_len(struct work_queue *wq)
131 {
132     assert (wq != NULL);
133     assert (wq->queue != 0);
134     assert (wq->capacity != 0);
135 
136     size_t available_spaces = (size_t)uxQueueSpacesAvailable(wq->queue);
137 
138     if (available_spaces <= wq->capacity) {
139         return wq->capacity - available_spaces;
140     } else {
141         assert (0);
142     }
143     return 0;
144 }
145 
osi_thread_run(void * arg)146 static void osi_thread_run(void *arg)
147 {
148     struct osi_thread_start_arg *start = (struct osi_thread_start_arg *)arg;
149     osi_thread_t *thread = start->thread;
150 
151     osi_sem_give(&start->start_sem);
152 
153     while (1) {
154         int idx = 0;
155 
156         osi_sem_take(&thread->work_sem, OSI_SEM_MAX_TIMEOUT);
157 
158         if (thread->stop) {
159             break;
160         }
161 
162         struct work_item item;
163         while (!thread->stop && idx < thread->work_queue_num) {
164             if (osi_thead_work_queue_get(thread->work_queues[idx], &item) == true) {
165                 item.func(item.context);
166                 idx = 0;
167                 continue;
168             } else {
169                 idx++;
170             }
171         }
172     }
173 
174     thread->thread_handle = NULL;
175     osi_sem_give(&thread->stop_sem);
176 
177     vTaskDelete(NULL);
178 }
179 
osi_thread_join(osi_thread_t * thread,uint32_t wait_ms)180 static int osi_thread_join(osi_thread_t *thread, uint32_t wait_ms)
181 {
182     assert(thread != NULL);
183     return osi_sem_take(&thread->stop_sem, wait_ms);
184 }
185 
osi_thread_stop(osi_thread_t * thread)186 static void osi_thread_stop(osi_thread_t *thread)
187 {
188     int ret;
189 
190     assert(thread != NULL);
191 
192     //stop the thread
193     thread->stop = true;
194     osi_sem_give(&thread->work_sem);
195 
196     //join
197     ret = osi_thread_join(thread, 1000); //wait 1000ms
198 
199     //if join failed, delete the task here
200     if (ret != 0 && thread->thread_handle) {
201         vTaskDelete(thread->thread_handle);
202     }
203 }
204 
205 //in linux, the stack_size, priority and core may not be set here, the code will be ignore the arguments
osi_thread_create(const char * name,size_t stack_size,int priority,osi_thread_core_t core,uint8_t work_queue_num,const size_t work_queue_len[])206 osi_thread_t *osi_thread_create(const char *name, size_t stack_size, int priority, osi_thread_core_t core, uint8_t work_queue_num, const size_t work_queue_len[])
207 {
208     int ret;
209     struct osi_thread_start_arg start_arg = {0};
210 
211     if (stack_size <= 0 ||
212             core < OSI_THREAD_CORE_0 || core > OSI_THREAD_CORE_AFFINITY ||
213             work_queue_num <= 0 || work_queue_len == NULL) {
214         return NULL;
215     }
216 
217     osi_thread_t *thread = (osi_thread_t *)osi_calloc(sizeof(osi_thread_t));
218     if (thread == NULL) {
219         goto _err;
220     }
221 
222     thread->stop = false;
223     thread->work_queues = (struct work_queue **)osi_calloc(sizeof(struct work_queue *) * work_queue_num);
224     if (thread->work_queues == NULL) {
225         goto _err;
226     }
227     thread->work_queue_num = work_queue_num;
228 
229     for (int i = 0; i < thread->work_queue_num; i++) {
230         size_t queue_len = work_queue_len[i] ? work_queue_len[i] : DEFAULT_WORK_QUEUE_CAPACITY;
231         thread->work_queues[i] = osi_work_queue_create(queue_len);
232         if (thread->work_queues[i] == NULL) {
233             goto _err;
234         }
235     }
236 
237     ret = osi_sem_new(&thread->work_sem, 1, 0);
238     if (ret != 0) {
239         goto _err;
240     }
241 
242     ret = osi_sem_new(&thread->stop_sem, 1, 0);
243     if (ret != 0) {
244         goto _err;
245     }
246 
247     start_arg.thread = thread;
248     ret = osi_sem_new(&start_arg.start_sem, 1, 0);
249     if (ret != 0) {
250         goto _err;
251     }
252 
253     if (xTaskCreatePinnedToCore(osi_thread_run, name, stack_size, &start_arg, priority, &thread->thread_handle, core) != pdPASS) {
254         goto _err;
255     }
256 
257     osi_sem_take(&start_arg.start_sem, OSI_SEM_MAX_TIMEOUT);
258     osi_sem_free(&start_arg.start_sem);
259 
260     return thread;
261 
262 _err:
263 
264     if (thread) {
265         if (start_arg.start_sem) {
266             osi_sem_free(&start_arg.start_sem);
267         }
268 
269         if (thread->thread_handle) {
270             vTaskDelete(thread->thread_handle);
271         }
272 
273         for (int i = 0; i < thread->work_queue_num; i++) {
274             if (thread->work_queues[i]) {
275                 osi_work_queue_delete(thread->work_queues[i]);
276             }
277             thread->work_queues[i] = NULL;
278         }
279 
280         if (thread->work_queues) {
281             osi_free(thread->work_queues);
282             thread->work_queues = NULL;
283         }
284 
285         if (thread->work_sem) {
286             osi_sem_free(&thread->work_sem);
287         }
288 
289         if (thread->stop_sem) {
290             osi_sem_free(&thread->stop_sem);
291         }
292 
293         osi_free(thread);
294     }
295 
296     return NULL;
297 }
298 
osi_thread_free(osi_thread_t * thread)299 void osi_thread_free(osi_thread_t *thread)
300 {
301     if (!thread)
302         return;
303 
304     osi_thread_stop(thread);
305 
306     for (int i = 0; i < thread->work_queue_num; i++) {
307         if (thread->work_queues[i]) {
308             osi_work_queue_delete(thread->work_queues[i]);
309             thread->work_queues[i] = NULL;
310         }
311     }
312 
313     if (thread->work_queues) {
314         osi_free(thread->work_queues);
315         thread->work_queues = NULL;
316     }
317 
318     if (thread->work_sem) {
319         osi_sem_free(&thread->work_sem);
320     }
321 
322     if (thread->stop_sem) {
323         osi_sem_free(&thread->stop_sem);
324     }
325 
326 
327     osi_free(thread);
328 }
329 
osi_thread_post(osi_thread_t * thread,osi_thread_func_t func,void * context,int queue_idx,uint32_t timeout)330 bool osi_thread_post(osi_thread_t *thread, osi_thread_func_t func, void *context, int queue_idx, uint32_t timeout)
331 {
332     assert(thread != NULL);
333     assert(func != NULL);
334 
335     if (queue_idx >= thread->work_queue_num) {
336         return false;
337     }
338 
339     struct work_item item;
340 
341     item.func = func;
342     item.context = context;
343 
344     if (osi_thead_work_queue_put(thread->work_queues[queue_idx], &item, timeout) == false) {
345         return false;
346     }
347 
348     osi_sem_give(&thread->work_sem);
349 
350     return true;
351 }
352 
osi_thread_set_priority(osi_thread_t * thread,int priority)353 bool osi_thread_set_priority(osi_thread_t *thread, int priority)
354 {
355     assert(thread != NULL);
356 
357     vTaskPrioritySet(thread->thread_handle, priority);
358     return true;
359 }
360 
osi_thread_name(osi_thread_t * thread)361 const char *osi_thread_name(osi_thread_t *thread)
362 {
363     assert(thread != NULL);
364 
365     return pcTaskGetName(thread->thread_handle);
366 }
367 
osi_thread_queue_wait_size(osi_thread_t * thread,int wq_idx)368 int osi_thread_queue_wait_size(osi_thread_t *thread, int wq_idx)
369 {
370     if (wq_idx < 0 || wq_idx >= thread->work_queue_num) {
371         return -1;
372     }
373 
374     return (int)(osi_thead_work_queue_len(thread->work_queues[wq_idx]));
375 }
376 
377 
osi_event_create(osi_thread_func_t func,void * context)378 struct osi_event *osi_event_create(osi_thread_func_t func, void *context)
379 {
380     struct osi_event *event = osi_calloc(sizeof(struct osi_event));
381     if (event != NULL) {
382         if (osi_mutex_new(&event->lock) == 0) {
383             event->item.func = func;
384             event->item.context = context;
385             return event;
386         }
387         osi_free(event);
388     }
389 
390     return NULL;
391 }
392 
osi_event_delete(struct osi_event * event)393 void osi_event_delete(struct osi_event* event)
394 {
395     if (event != NULL) {
396         osi_mutex_free(&event->lock);
397         memset(event, 0, sizeof(struct osi_event));
398         osi_free(event);
399     }
400 }
401 
osi_event_bind(struct osi_event * event,osi_thread_t * thread,int queue_idx)402 bool osi_event_bind(struct osi_event* event, osi_thread_t *thread, int queue_idx)
403 {
404     if (event == NULL || event->thread != NULL) {
405         return false;
406     }
407 
408     if (thread == NULL || queue_idx >= thread->work_queue_num) {
409         return false;
410     }
411 
412     event->thread = thread;
413     event->queue_idx = queue_idx;
414 
415     return true;
416 }
417 
osi_thread_generic_event_handler(void * context)418 static void osi_thread_generic_event_handler(void *context)
419 {
420     struct osi_event *event = (struct osi_event *)context;
421     if (event != NULL && event->item.func != NULL) {
422         osi_mutex_lock(&event->lock, OSI_MUTEX_MAX_TIMEOUT);
423         event->is_queued = 0;
424         osi_mutex_unlock(&event->lock);
425         event->item.func(event->item.context);
426     }
427 }
428 
osi_thread_post_event(struct osi_event * event,uint32_t timeout)429 bool osi_thread_post_event(struct osi_event *event, uint32_t timeout)
430 {
431     assert(event != NULL && event->thread != NULL);
432     assert(event->queue_idx >= 0 && event->queue_idx < event->thread->work_queue_num);
433     bool ret = false;
434     if (event->is_queued == 0) {
435         uint16_t acquire_cnt = 0;
436         osi_mutex_lock(&event->lock, OSI_MUTEX_MAX_TIMEOUT);
437         event->is_queued += 1;
438         acquire_cnt = event->is_queued;
439         osi_mutex_unlock(&event->lock);
440 
441         if (acquire_cnt == 1) {
442             ret = osi_thread_post(event->thread, osi_thread_generic_event_handler, event, event->queue_idx, timeout);
443             if (!ret) {
444                 // clear "is_queued" when post failure, to allow for following event posts
445                 osi_mutex_lock(&event->lock, OSI_MUTEX_MAX_TIMEOUT);
446                 event->is_queued = 0;
447                 osi_mutex_unlock(&event->lock);
448             }
449         }
450     }
451 
452     return ret;
453 }
454