1 /******************************************************************************
2 *
3 * Copyright (C) 2014 Google, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at:
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 ******************************************************************************/
18
19 #include "osi/allocator.h"
20 #include "osi/fixed_queue.h"
21 #include "osi/list.h"
22 #include "osi/osi.h"
23 #include "osi/mutex.h"
24 #include "osi/semaphore.h"
25
26 typedef struct fixed_queue_t {
27
28 list_t *list;
29 osi_sem_t enqueue_sem;
30 osi_sem_t dequeue_sem;
31 osi_mutex_t lock;
32 size_t capacity;
33
34 fixed_queue_cb dequeue_ready;
35 } fixed_queue_t;
36
37
fixed_queue_new(size_t capacity)38 fixed_queue_t *fixed_queue_new(size_t capacity)
39 {
40 fixed_queue_t *ret = osi_calloc(sizeof(fixed_queue_t));
41 if (!ret) {
42 goto error;
43 }
44
45 osi_mutex_new(&ret->lock);
46 ret->capacity = capacity;
47
48 ret->list = list_new(NULL);
49 if (!ret->list) {
50 goto error;
51 }
52
53
54 osi_sem_new(&ret->enqueue_sem, capacity, capacity);
55 if (!ret->enqueue_sem) {
56 goto error;
57 }
58
59 osi_sem_new(&ret->dequeue_sem, capacity, 0);
60 if (!ret->dequeue_sem) {
61 goto error;
62 }
63
64 return ret;
65
66 error:;
67 fixed_queue_free(ret, NULL);
68 return NULL;
69 }
70
fixed_queue_free(fixed_queue_t * queue,fixed_queue_free_cb free_cb)71 void fixed_queue_free(fixed_queue_t *queue, fixed_queue_free_cb free_cb)
72 {
73 const list_node_t *node;
74
75 if (queue == NULL) {
76 return;
77 }
78
79 fixed_queue_unregister_dequeue(queue);
80
81 if (free_cb) {
82 for (node = list_begin(queue->list); node != list_end(queue->list); node = list_next(node)) {
83 free_cb(list_node(node));
84 }
85 }
86
87 list_free(queue->list);
88 osi_sem_free(&queue->enqueue_sem);
89 osi_sem_free(&queue->dequeue_sem);
90 osi_mutex_free(&queue->lock);
91 osi_free(queue);
92 }
93
fixed_queue_is_empty(fixed_queue_t * queue)94 bool fixed_queue_is_empty(fixed_queue_t *queue)
95 {
96 bool is_empty = false;
97
98 if (queue == NULL) {
99 return true;
100 }
101
102 osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
103 is_empty = list_is_empty(queue->list);
104 osi_mutex_unlock(&queue->lock);
105
106 return is_empty;
107 }
108
fixed_queue_length(fixed_queue_t * queue)109 size_t fixed_queue_length(fixed_queue_t *queue)
110 {
111 size_t length;
112
113 if (queue == NULL) {
114 return 0;
115 }
116
117 osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
118 length = list_length(queue->list);
119 osi_mutex_unlock(&queue->lock);
120
121 return length;
122 }
fixed_queue_capacity(fixed_queue_t * queue)123 size_t fixed_queue_capacity(fixed_queue_t *queue)
124 {
125 assert(queue != NULL);
126
127 return queue->capacity;
128 }
129
fixed_queue_enqueue(fixed_queue_t * queue,void * data,uint32_t timeout)130 bool fixed_queue_enqueue(fixed_queue_t *queue, void *data, uint32_t timeout)
131 {
132 bool status=false; //Flag whether enqueued success
133
134 assert(queue != NULL);
135 assert(data != NULL);
136
137 if (osi_sem_take(&queue->enqueue_sem, timeout) != 0) {
138 return false;
139 }
140
141 osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
142 status = list_append(queue->list, data); //Check whether enqueued success
143 osi_mutex_unlock(&queue->lock);
144
145 if(status == true )
146 osi_sem_give(&queue->dequeue_sem);
147
148 return status;
149 }
150
fixed_queue_dequeue(fixed_queue_t * queue,uint32_t timeout)151 void *fixed_queue_dequeue(fixed_queue_t *queue, uint32_t timeout)
152 {
153 void *ret = NULL;
154
155 assert(queue != NULL);
156
157 if (osi_sem_take(&queue->dequeue_sem, timeout) != 0) {
158 return NULL;
159 }
160
161 osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
162 ret = list_front(queue->list);
163 list_remove(queue->list, ret);
164 osi_mutex_unlock(&queue->lock);
165
166 osi_sem_give(&queue->enqueue_sem);
167
168 return ret;
169 }
170
fixed_queue_try_peek_first(fixed_queue_t * queue)171 void *fixed_queue_try_peek_first(fixed_queue_t *queue)
172 {
173 void *ret = NULL;
174
175 if (queue == NULL) {
176 return NULL;
177 }
178
179 osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
180 ret = list_is_empty(queue->list) ? NULL : list_front(queue->list);
181 osi_mutex_unlock(&queue->lock);
182
183 return ret;
184 }
185
fixed_queue_try_peek_last(fixed_queue_t * queue)186 void *fixed_queue_try_peek_last(fixed_queue_t *queue)
187 {
188 void *ret = NULL;
189
190 if (queue == NULL) {
191 return NULL;
192 }
193
194 osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
195 ret = list_is_empty(queue->list) ? NULL : list_back(queue->list);
196 osi_mutex_unlock(&queue->lock);
197
198 return ret;
199 }
200
fixed_queue_try_remove_from_queue(fixed_queue_t * queue,void * data)201 void *fixed_queue_try_remove_from_queue(fixed_queue_t *queue, void *data)
202 {
203 bool removed = false;
204
205 if (queue == NULL) {
206 return NULL;
207 }
208
209 osi_mutex_lock(&queue->lock, OSI_MUTEX_MAX_TIMEOUT);
210 if (list_contains(queue->list, data) &&
211 osi_sem_take(&queue->dequeue_sem, 0) == 0) {
212 removed = list_remove(queue->list, data);
213 assert(removed);
214 }
215 osi_mutex_unlock(&queue->lock);
216
217 if (removed) {
218 osi_sem_give(&queue->enqueue_sem);
219 return data;
220 }
221
222 return NULL;
223 }
224
fixed_queue_get_list(fixed_queue_t * queue)225 list_t *fixed_queue_get_list(fixed_queue_t *queue)
226 {
227 assert(queue != NULL);
228
229 // NOTE: This function is not thread safe, and there is no point for
230 // calling osi_mutex_lock() / osi_mutex_unlock()
231 return queue->list;
232 }
233
fixed_queue_register_dequeue(fixed_queue_t * queue,fixed_queue_cb ready_cb)234 void fixed_queue_register_dequeue(fixed_queue_t *queue, fixed_queue_cb ready_cb)
235 {
236 assert(queue != NULL);
237 assert(ready_cb != NULL);
238
239 queue->dequeue_ready = ready_cb;
240 }
241
fixed_queue_unregister_dequeue(fixed_queue_t * queue)242 void fixed_queue_unregister_dequeue(fixed_queue_t *queue)
243 {
244 assert(queue != NULL);
245
246 queue->dequeue_ready = NULL;
247 }
248
fixed_queue_process(fixed_queue_t * queue)249 void fixed_queue_process(fixed_queue_t *queue)
250 {
251 assert(queue != NULL);
252
253 if (queue->dequeue_ready) {
254 queue->dequeue_ready(queue);
255 }
256 }
257