1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 module thrift.internal.resource_pool;
20
21 import core.time : Duration, dur, TickDuration;
22 import std.algorithm : minPos, reduce, remove;
23 import std.array : array, empty;
24 import std.exception : enforce;
25 import std.conv : to;
26 import std.random : randomCover, rndGen;
27 import std.range : zip;
28 import thrift.internal.algorithm : removeEqual;
29
30 /**
31 * A pool of resources, which can be iterated over, and where resources that
32 * have failed too often can be temporarily disabled.
33 *
34 * This class is oblivious to the actual resource type managed.
35 */
TResourcePool(Resource)36 final class TResourcePool(Resource) {
37 /**
38 * Constructs a new instance.
39 *
40 * Params:
41 * resources = The initial members of the pool.
42 */
43 this(Resource[] resources) {
44 resources_ = resources;
45 }
46
47 /**
48 * Adds a resource to the pool.
49 */
50 void add(Resource resource) {
51 resources_ ~= resource;
52 }
53
54 /**
55 * Removes a resource from the pool.
56 *
57 * Returns: Whether the resource could be found in the pool.
58 */
59 bool remove(Resource resource) {
60 auto oldLength = resources_.length;
61 resources_ = removeEqual(resources_, resource);
62 return resources_.length < oldLength;
63 }
64
65 /**
66 * Returns an »enriched« input range to iterate over the pool members.
67 */
68 static struct Range {
69 /**
70 * Whether the range is empty.
71 *
72 * This is the case if all members of the pool have been popped (or skipped
73 * because they were disabled) and TResourcePool.cycle is false, or there
74 * is no element to return in cycle mode because all have been temporarily
75 * disabled.
76 */
77 bool empty() @property {
78 // If no resources are in the pool, the range will never become non-empty.
79 if (resources_.empty) return true;
80
81 // If we already got the next resource in the cache, it doesn't matter
82 // whether there are more.
83 if (cached_) return false;
84
85 size_t examineCount;
86 if (parent_.cycle) {
87 // We want to check all the resources, but not iterate more than once
88 // to avoid spinning in a loop if nothing is available.
89 examineCount = resources_.length;
90 } else {
91 // When not in cycle mode, we just iterate the list exactly once. If all
92 // items have been consumed, the interval below is empty.
93 examineCount = resources_.length - nextIndex_;
94 }
95
96 foreach (i; 0 .. examineCount) {
97 auto r = resources_[(nextIndex_ + i) % resources_.length];
98 auto fi = r in parent_.faultInfos_;
99
100 if (fi && fi.resetTime != fi.resetTime.init) {
101 if (fi.resetTime < parent_.getCurrentTick_()) {
102 // The timeout expired, remove the resource from the list and go
103 // ahead trying it.
104 parent_.faultInfos_.remove(r);
105 } else {
106 // The timeout didn't expire yet, try the next resource.
107 continue;
108 }
109 }
110
111 cache_ = r;
112 cached_ = true;
113 nextIndex_ = nextIndex_ + i + 1;
114 return false;
115 }
116
117 // If we get here, all resources are currently inactive or the non-cycle
118 // pool has been exhausted, so there is nothing we can do.
119 nextIndex_ = nextIndex_ + examineCount;
120 return true;
121 }
122
123 /**
124 * Returns the first resource in the range.
125 */
126 Resource front() @property {
127 enforce(!empty);
128 return cache_;
129 }
130
131 /**
132 * Removes the first resource from the range.
133 *
134 * Usually, this is combined with a call to TResourcePool.recordSuccess()
135 * or recordFault().
136 */
137 void popFront() {
138 enforce(!empty);
139 cached_ = false;
140 }
141
142 /**
143 * Returns whether the range will become non-empty at some point in the
144 * future, and provides additional information when this will happen and
145 * what will be the next resource.
146 *
147 * Makes only sense to call on empty ranges.
148 *
149 * Params:
150 * next = The next resource that will become available.
151 * waitTime = The duration until that resource will become available.
152 */
153 bool willBecomeNonempty(out Resource next, out Duration waitTime) {
154 // If no resources are in the pool, the range will never become non-empty.
155 if (resources_.empty) return false;
156
157 // If cycle mode is not enabled, a range never becomes non-empty after
158 // being empty once, because all the elements have already been
159 // used/skipped in order to become empty.
160 if (!parent_.cycle) return false;
161
162 auto fi = parent_.faultInfos_;
163 auto nextPair = minPos!"a[1].resetTime < b[1].resetTime"(
164 zip(fi.keys, fi.values)
165 ).front;
166
167 next = nextPair[0];
168 waitTime = to!Duration(nextPair[1].resetTime - parent_.getCurrentTick_());
169
170 return true;
171 }
172
173 private:
174 this(TResourcePool parent, Resource[] resources) {
175 parent_ = parent;
176 resources_ = resources;
177 }
178
179 TResourcePool parent_;
180
181 /// All available resources. We keep a copy of it as to not get confused
182 /// when resources are added to/removed from the parent pool.
183 Resource[] resources_;
184
185 /// After we have determined the next element in empty(), we store it here.
186 Resource cache_;
187
188 /// Whether there is currently something in the cache.
189 bool cached_;
190
191 /// The index to start searching from at the next call to empty().
192 size_t nextIndex_;
193 }
194
195 /// Ditto
196 Range opSlice() {
197 auto res = resources_;
198 if (permute) {
199 res = array(randomCover(res, rndGen));
200 }
201 return Range(this, res);
202 }
203
204 /**
205 * Records a success for an operation on the given resource, cancelling a
206 * fault streak, if any.
207 */
208 void recordSuccess(Resource resource) {
209 if (resource in faultInfos_) {
210 faultInfos_.remove(resource);
211 }
212 }
213
214 /**
215 * Records a fault for the given resource.
216 *
217 * If a resource fails consecutively for more than faultDisableCount times,
218 * it is temporarily disabled (no longer considered) until
219 * faultDisableDuration has passed.
220 */
221 void recordFault(Resource resource) {
222 auto fi = resource in faultInfos_;
223
224 if (!fi) {
225 faultInfos_[resource] = FaultInfo();
226 fi = resource in faultInfos_;
227 }
228
229 ++fi.count;
230 if (fi.count >= faultDisableCount) {
231 // If the resource has hit the fault count limit, disable it for
232 // specified duration.
233 fi.resetTime = getCurrentTick_() + cast(TickDuration)faultDisableDuration;
234 }
235 }
236
237 /**
238 * Whether to randomly permute the order of the resources in the pool when
239 * taking a range using opSlice().
240 *
241 * This can be used e.g. as a simple form of load balancing.
242 */
243 bool permute = true;
244
245 /**
246 * Whether to keep iterating over the pool members after all have been
247 * returned/have failed once.
248 */
249 bool cycle = false;
250
251 /**
252 * The number of consecutive faults after which a resource is disabled until
253 * faultDisableDuration has passed. Zero to never disable resources.
254 *
255 * Defaults to zero.
256 */
257 ushort faultDisableCount = 0;
258
259 /**
260 * The duration for which a resource is no longer considered after it has
261 * failed too often.
262 *
263 * Defaults to one second.
264 */
265 Duration faultDisableDuration = dur!"seconds"(1);
266
267 private:
268 Resource[] resources_;
269 FaultInfo[Resource] faultInfos_;
270
271 /// Function to get the current timestamp from some monotonic system clock.
272 ///
273 /// This is overridable to be able to write timing-insensitive unit tests.
274 /// The extra indirection should not matter much performance-wise compared to
275 /// the actual system call, and by its very nature thisshould not be on a hot
276 /// path anyway.
277 typeof(&TickDuration.currSystemTick) getCurrentTick_ =
278 &TickDuration.currSystemTick;
279 }
280
281 private {
282 struct FaultInfo {
283 ushort count;
284 TickDuration resetTime;
285 }
286 }
287
288 unittest {
289 auto pool = new TResourcePool!Object([]);
290 enforce(pool[].empty);
291 Object dummyRes;
292 Duration dummyDur;
293 enforce(!pool[].willBecomeNonempty(dummyRes, dummyDur));
294 }
295
296 unittest {
297 import std.datetime;
298 import thrift.base;
299
300 auto a = new Object;
301 auto b = new Object;
302 auto c = new Object;
303 auto objs = [a, b, c];
304 auto pool = new TResourcePool!Object(objs);
305 pool.permute = false;
306
307 static Duration fakeClock;
308 pool.getCurrentTick_ = () => cast(TickDuration)fakeClock;
309
310 Object dummyRes = void;
311 Duration dummyDur = void;
312
313 {
314 auto r = pool[];
315
316 foreach (i, o; objs) {
317 enforce(!r.empty);
318 enforce(r.front == o);
319 r.popFront();
320 }
321
322 enforce(r.empty);
323 enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
324 }
325
326 {
327 pool.faultDisableCount = 2;
328
329 enforce(pool[].front == a);
330 pool.recordFault(a);
331 enforce(pool[].front == a);
332 pool.recordSuccess(a);
333 enforce(pool[].front == a);
334 pool.recordFault(a);
335 enforce(pool[].front == a);
336 pool.recordFault(a);
337
338 auto r = pool[];
339 enforce(r.front == b);
340 r.popFront();
341 enforce(r.front == c);
342 r.popFront();
343 enforce(r.empty);
344 enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
345
346 fakeClock += 2.seconds;
347 // Not in cycle mode, has to be still empty after the timeouts expired.
348 enforce(r.empty);
349 enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
350
351 foreach (o; objs) pool.recordSuccess(o);
352 }
353
354 {
355 pool.faultDisableCount = 1;
356
357 pool.recordFault(a);
358 pool.recordFault(b);
359 pool.recordFault(c);
360
361 auto r = pool[];
362 enforce(r.empty);
363 enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
364
365 foreach (o; objs) pool.recordSuccess(o);
366 }
367
368 pool.cycle = true;
369
370 {
371 auto r = pool[];
372
373 foreach (o; objs ~ objs) {
374 enforce(!r.empty);
375 enforce(r.front == o);
376 r.popFront();
377 }
378 }
379
380 {
381 pool.faultDisableCount = 2;
382
383 enforce(pool[].front == a);
384 pool.recordFault(a);
385 enforce(pool[].front == a);
386 pool.recordSuccess(a);
387 enforce(pool[].front == a);
388 pool.recordFault(a);
389 enforce(pool[].front == a);
390 pool.recordFault(a);
391
392 auto r = pool[];
393 enforce(r.front == b);
394 r.popFront();
395 enforce(r.front == c);
396 r.popFront();
397 enforce(r.front == b);
398
399 fakeClock += 2.seconds;
400
401 r.popFront();
402 enforce(r.front == c);
403
404 r.popFront();
405 enforce(r.front == a);
406
407 enforce(pool[].front == a);
408
409 foreach (o; objs) pool.recordSuccess(o);
410 }
411
412 {
413 pool.faultDisableCount = 1;
414
415 pool.recordFault(a);
416 fakeClock += 1.msecs;
417 pool.recordFault(b);
418 fakeClock += 1.msecs;
419 pool.recordFault(c);
420
421 auto r = pool[];
422 enforce(r.empty);
423
424 // Make sure willBecomeNonempty gets the order right.
425 enforce(r.willBecomeNonempty(dummyRes, dummyDur));
426 enforce(dummyRes == a);
427 enforce(dummyDur > Duration.zero);
428
429 foreach (o; objs) pool.recordSuccess(o);
430 }
431 }
432