1 /******************************************************************************
2  *
3  *  Copyright (C) 2014 Google, Inc.
4  *
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at:
8  *
9  *  http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  ******************************************************************************/
18 
19 #include "osi/allocator.h"
20 #include "osi/fixed_queue.h"
21 #include "osi/list.h"
22 #include "osi/osi.h"
23 #include "osi/mutex.h"
24 #include "osi/semaphore.h"
25 
26 typedef struct fixed_queue_t {
27 
28     list_t *list;
29     osi_sem_t enqueue_sem;
30     osi_sem_t dequeue_sem;
31     osi_mutex_t lock;
32     size_t capacity;
33 
34     fixed_queue_cb dequeue_ready;
35 } fixed_queue_t;
36 
37 
fixed_queue_new(size_t capacity)38 fixed_queue_t *fixed_queue_new(size_t capacity)
39 {
40     fixed_queue_t *ret = osi_calloc(sizeof(fixed_queue_t));
41     if (!ret) {
42         goto error;
43     }
44 
45     osi_mutex_new(&ret->lock);
46     ret->capacity = capacity;
47 
48     ret->list = list_new(NULL);
49     if (!ret->list) {
50         goto error;
51     }
52 
53 
54     osi_sem_new(&ret->enqueue_sem, capacity, capacity);
55     if (!ret->enqueue_sem) {
56         goto error;
57     }
58 
59     osi_sem_new(&ret->dequeue_sem, capacity, 0);
60     if (!ret->dequeue_sem) {
61         goto error;
62     }
63 
64     return ret;
65 
66 error:;
67     fixed_queue_free(ret, NULL);
68     return NULL;
69 }
70 
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)71 void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb)
72 {
73     const list_node_t *node;
74 
75     if (queue == NULL) {
76 	    return;
77 	}
78 
79     fixed_queue_unregister_dequeue(queue);
80 
81     if (free_cb) {
82         for (node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node)) {
83             free_cb(list_node(node));
84         }
85     }
86 
87     list_free(queue->list);
88     osi_sem_free(&queue->enqueue_sem);
89     osi_sem_free(&queue->dequeue_sem);
90     osi_mutex_free(&queue->lock);
91     osi_free(queue);
92 }
93 
fixed_queue_is_empty(fixed_queue_t * queue)94 bool fixed_queue_is_empty(fixed_queue_t *queue)
95 {
96     bool is_empty = false;
97 
98     if (queue == NULL) {
99         return true;
100     }
101 
102     osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
103     is_empty = list_is_empty(queue->list);
104     osi_mutex_unlock(&queue->lock);
105 
106     return is_empty;
107 }
108 
fixed_queue_length(fixed_queue_t * queue)109 size_t fixed_queue_length(fixed_queue_t *queue)
110 {
111     size_t length;
112 
113     if (queue == NULL) {
114         return 0;
115     }
116 
117     osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
118     length = list_length(queue->list);
119     osi_mutex_unlock(&queue->lock);
120 
121     return length;
122 }
fixed_queue_capacity(fixed_queue_t * queue)123 size_t fixed_queue_capacity(fixed_queue_t *queue)
124 {
125     assert(queue != NULL);
126 
127     return queue->capacity;
128 }
129 
fixed_queue_enqueue(fixed_queue_t * queue,void * data,uint32_t timeout)130 bool fixed_queue_enqueue(fixed_queue_t *queue, void *data, uint32_t timeout)
131 {
132     bool status=false; //Flag whether enqueued success
133 
134     assert(queue != NULL);
135     assert(data != NULL);
136 
137     if (osi_sem_take(&queue->enqueue_sem, timeout) != 0) {
138         return false;
139     }
140 
141     osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
142     status = list_append(queue->list, data); //Check whether enqueued success
143     osi_mutex_unlock(&queue->lock);
144 
145     if(status == true )
146         osi_sem_give(&queue->dequeue_sem);
147 
148     return status;
149 }
150 
fixed_queue_dequeue(fixed_queue_t * queue,uint32_t timeout)151 void *fixed_queue_dequeue(fixed_queue_t *queue, uint32_t timeout)
152 {
153     void *ret = NULL;
154 
155     assert(queue != NULL);
156 
157     if (osi_sem_take(&queue->dequeue_sem, timeout) != 0) {
158         return NULL;
159     }
160 
161     osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
162     ret = list_front(queue->list);
163     list_remove(queue->list, ret);
164     osi_mutex_unlock(&queue->lock);
165 
166     osi_sem_give(&queue->enqueue_sem);
167 
168     return ret;
169 }
170 
fixed_queue_try_peek_first(fixed_queue_t * queue)171 void *fixed_queue_try_peek_first(fixed_queue_t *queue)
172 {
173     void *ret = NULL;
174 
175     if (queue == NULL) {
176         return NULL;
177     }
178 
179     osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
180     ret = list_is_empty(queue->list) ? NULL : list_front(queue->list);
181     osi_mutex_unlock(&queue->lock);
182 
183     return ret;
184 }
185 
fixed_queue_try_peek_last(fixed_queue_t * queue)186 void *fixed_queue_try_peek_last(fixed_queue_t *queue)
187 {
188     void *ret = NULL;
189 
190     if (queue == NULL) {
191         return NULL;
192     }
193 
194     osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
195     ret = list_is_empty(queue->list) ? NULL : list_back(queue->list);
196     osi_mutex_unlock(&queue->lock);
197 
198     return ret;
199 }
200 
fixed_queue_try_remove_from_queue(fixed_queue_t * queue,void * data)201 void *fixed_queue_try_remove_from_queue(fixed_queue_t *queue, void *data)
202 {
203     bool removed = false;
204 
205     if (queue == NULL) {
206         return NULL;
207     }
208 
209     osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
210     if (list_contains(queue->list, data) &&
211             osi_sem_take(&queue->dequeue_sem, 0) == 0) {
212         removed = list_remove(queue->list, data);
213         assert(removed);
214     }
215     osi_mutex_unlock(&queue->lock);
216 
217     if (removed) {
218         osi_sem_give(&queue->enqueue_sem);
219         return data;
220     }
221 
222     return NULL;
223 }
224 
fixed_queue_get_list(fixed_queue_t * queue)225 list_t *fixed_queue_get_list(fixed_queue_t *queue)
226 {
227     assert(queue != NULL);
228 
229     // NOTE: This function is not thread safe, and there is no point for
230     // calling osi_mutex_lock() / osi_mutex_unlock()
231     return queue->list;
232 }
233 
fixed_queue_register_dequeue(fixed_queue_t * queue,fixed_queue_cb ready_cb)234 void fixed_queue_register_dequeue(fixed_queue_t *queue, fixed_queue_cb ready_cb)
235 {
236     assert(queue != NULL);
237     assert(ready_cb != NULL);
238 
239     queue->dequeue_ready = ready_cb;
240 }
241 
fixed_queue_unregister_dequeue(fixed_queue_t * queue)242 void fixed_queue_unregister_dequeue(fixed_queue_t *queue)
243 {
244     assert(queue != NULL);
245 
246     queue->dequeue_ready = NULL;
247 }
248 
fixed_queue_process(fixed_queue_t * queue)249 void fixed_queue_process(fixed_queue_t *queue)
250 {
251     assert(queue != NULL);
252 
253     if (queue->dequeue_ready) {
254         queue->dequeue_ready(queue);
255     }
256 }
257