1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.internal.resource_pool;
20 
21 import core.time : Duration, dur, TickDuration;
22 import std.algorithm : minPos, reduce, remove;
23 import std.array : array, empty;
24 import std.exception : enforce;
25 import std.conv : to;
26 import std.random : randomCover, rndGen;
27 import std.range : zip;
28 import thrift.internal.algorithm : removeEqual;
29 
30 /**
31  * A pool of resources, which can be iterated over, and where resources that
32  * have failed too often can be temporarily disabled.
33  *
34  * This class is oblivious to the actual resource type managed.
35  */
TResourcePool(Resource)36 final class TResourcePool(Resource) {
37   /**
38    * Constructs a new instance.
39    *
40    * Params:
41    *   resources = The initial members of the pool.
42    */
43   this(Resource[] resources) {
44     resources_ = resources;
45   }
46 
47   /**
48    * Adds a resource to the pool.
49    */
50   void add(Resource resource) {
51     resources_ ~= resource;
52   }
53 
54   /**
55    * Removes a resource from the pool.
56    *
57    * Returns: Whether the resource could be found in the pool.
58    */
59   bool remove(Resource resource) {
60     auto oldLength = resources_.length;
61     resources_ = removeEqual(resources_, resource);
62     return resources_.length < oldLength;
63   }
64 
65   /**
66    * Returns an »enriched« input range to iterate over the pool members.
67    */
68   static struct Range {
69     /**
70      * Whether the range is empty.
71      *
72      * This is the case if all members of the pool have been popped (or skipped
73      * because they were disabled) and TResourcePool.cycle is false, or there
74      * is no element to return in cycle mode because all have been temporarily
75      * disabled.
76      */
77     bool empty() @property {
78       // If no resources are in the pool, the range will never become non-empty.
79       if (resources_.empty) return true;
80 
81       // If we already got the next resource in the cache, it doesn't matter
82       // whether there are more.
83       if (cached_) return false;
84 
85       size_t examineCount;
86       if (parent_.cycle) {
87         // We want to check all the resources, but not iterate more than once
88         // to avoid spinning in a loop if nothing is available.
89         examineCount = resources_.length;
90       } else {
91         // When not in cycle mode, we just iterate the list exactly once. If all
92         // items have been consumed, the interval below is empty.
93         examineCount = resources_.length - nextIndex_;
94       }
95 
96       foreach (i; 0 .. examineCount) {
97         auto r = resources_[(nextIndex_ + i) % resources_.length];
98         auto fi = r in parent_.faultInfos_;
99 
100         if (fi && fi.resetTime != fi.resetTime.init) {
101           if (fi.resetTime < parent_.getCurrentTick_()) {
102             // The timeout expired, remove the resource from the list and go
103             // ahead trying it.
104             parent_.faultInfos_.remove(r);
105           } else {
106             // The timeout didn't expire yet, try the next resource.
107             continue;
108           }
109         }
110 
111         cache_ = r;
112         cached_ = true;
113         nextIndex_ = nextIndex_ + i + 1;
114         return false;
115       }
116 
117       // If we get here, all resources are currently inactive or the non-cycle
118       // pool has been exhausted, so there is nothing we can do.
119       nextIndex_ = nextIndex_ + examineCount;
120       return true;
121     }
122 
123     /**
124      * Returns the first resource in the range.
125      */
126     Resource front() @property {
127       enforce(!empty);
128       return cache_;
129     }
130 
131     /**
132      * Removes the first resource from the range.
133      *
134      * Usually, this is combined with a call to TResourcePool.recordSuccess()
135      * or recordFault().
136      */
137     void popFront() {
138       enforce(!empty);
139       cached_ = false;
140     }
141 
142     /**
143      * Returns whether the range will become non-empty at some point in the
144      * future, and provides additional information when this will happen and
145      * what will be the next resource.
146      *
147      * Makes only sense to call on empty ranges.
148      *
149      * Params:
150      *   next = The next resource that will become available.
151      *   waitTime = The duration until that resource will become available.
152      */
153     bool willBecomeNonempty(out Resource next, out Duration waitTime) {
154       // If no resources are in the pool, the range will never become non-empty.
155       if (resources_.empty) return false;
156 
157       // If cycle mode is not enabled, a range never becomes non-empty after
158       // being empty once, because all the elements have already been
159       // used/skipped in order to become empty.
160       if (!parent_.cycle) return false;
161 
162       auto fi = parent_.faultInfos_;
163       auto nextPair = minPos!"a[1].resetTime < b[1].resetTime"(
164         zip(fi.keys, fi.values)
165       ).front;
166 
167       next = nextPair[0];
168       waitTime = to!Duration(nextPair[1].resetTime - parent_.getCurrentTick_());
169 
170       return true;
171     }
172 
173   private:
174     this(TResourcePool parent, Resource[] resources) {
175       parent_ = parent;
176       resources_ = resources;
177     }
178 
179     TResourcePool parent_;
180 
181     /// All available resources. We keep a copy of it as to not get confused
182     /// when resources are added to/removed from the parent pool.
183     Resource[] resources_;
184 
185     /// After we have determined the next element in empty(), we store it here.
186     Resource cache_;
187 
188     /// Whether there is currently something in the cache.
189     bool cached_;
190 
191     /// The index to start searching from at the next call to empty().
192     size_t nextIndex_;
193   }
194 
195   /// Ditto
196   Range opSlice() {
197     auto res = resources_;
198     if (permute) {
199       res = array(randomCover(res, rndGen));
200     }
201     return Range(this, res);
202   }
203 
204   /**
205    * Records a success for an operation on the given resource, cancelling a
206    * fault streak, if any.
207    */
208   void recordSuccess(Resource resource) {
209     if (resource in faultInfos_) {
210       faultInfos_.remove(resource);
211     }
212   }
213 
214   /**
215    * Records a fault for the given resource.
216    *
217    * If a resource fails consecutively for more than faultDisableCount times,
218    * it is temporarily disabled (no longer considered) until
219    * faultDisableDuration has passed.
220    */
221   void recordFault(Resource resource) {
222     auto fi = resource in faultInfos_;
223 
224     if (!fi) {
225       faultInfos_[resource] = FaultInfo();
226       fi = resource in faultInfos_;
227     }
228 
229     ++fi.count;
230     if (fi.count >= faultDisableCount) {
231       // If the resource has hit the fault count limit, disable it for
232       // specified duration.
233       fi.resetTime = getCurrentTick_() + cast(TickDuration)faultDisableDuration;
234     }
235   }
236 
237   /**
238    * Whether to randomly permute the order of the resources in the pool when
239    * taking a range using opSlice().
240    *
241    * This can be used e.g. as a simple form of load balancing.
242    */
243   bool permute = true;
244 
245   /**
246    * Whether to keep iterating over the pool members after all have been
247    * returned/have failed once.
248    */
249   bool cycle = false;
250 
251   /**
252    * The number of consecutive faults after which a resource is disabled until
253    * faultDisableDuration has passed. Zero to never disable resources.
254    *
255    * Defaults to zero.
256    */
257   ushort faultDisableCount = 0;
258 
259   /**
260    * The duration for which a resource is no longer considered after it has
261    * failed too often.
262    *
263    * Defaults to one second.
264    */
265   Duration faultDisableDuration = dur!"seconds"(1);
266 
267 private:
268   Resource[] resources_;
269   FaultInfo[Resource] faultInfos_;
270 
271   /// Function to get the current timestamp from some monotonic system clock.
272   ///
273   /// This is overridable to be able to write timing-insensitive unit tests.
274   /// The extra indirection should not matter much performance-wise compared to
275   /// the actual system call, and by its very nature thisshould not be on a hot
276   /// path anyway.
277   typeof(&TickDuration.currSystemTick) getCurrentTick_ =
278     &TickDuration.currSystemTick;
279 }
280 
281 private {
282   struct FaultInfo {
283     ushort count;
284     TickDuration resetTime;
285   }
286 }
287 
288 unittest {
289   auto pool = new TResourcePool!Object([]);
290   enforce(pool[].empty);
291   Object dummyRes;
292   Duration dummyDur;
293   enforce(!pool[].willBecomeNonempty(dummyRes, dummyDur));
294 }
295 
296 unittest {
297   import std.datetime;
298   import thrift.base;
299 
300   auto a = new Object;
301   auto b = new Object;
302   auto c = new Object;
303   auto objs = [a, b, c];
304   auto pool = new TResourcePool!Object(objs);
305   pool.permute = false;
306 
307   static Duration fakeClock;
308   pool.getCurrentTick_ = () => cast(TickDuration)fakeClock;
309 
310   Object dummyRes = void;
311   Duration dummyDur = void;
312 
313   {
314     auto r = pool[];
315 
316     foreach (i, o; objs) {
317       enforce(!r.empty);
318       enforce(r.front == o);
319       r.popFront();
320     }
321 
322     enforce(r.empty);
323     enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
324   }
325 
326   {
327     pool.faultDisableCount = 2;
328 
329     enforce(pool[].front == a);
330     pool.recordFault(a);
331     enforce(pool[].front == a);
332     pool.recordSuccess(a);
333     enforce(pool[].front == a);
334     pool.recordFault(a);
335     enforce(pool[].front == a);
336     pool.recordFault(a);
337 
338     auto r = pool[];
339     enforce(r.front == b);
340     r.popFront();
341     enforce(r.front == c);
342     r.popFront();
343     enforce(r.empty);
344     enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
345 
346     fakeClock += 2.seconds;
347     // Not in cycle mode, has to be still empty after the timeouts expired.
348     enforce(r.empty);
349     enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
350 
351     foreach (o; objs) pool.recordSuccess(o);
352   }
353 
354   {
355     pool.faultDisableCount = 1;
356 
357     pool.recordFault(a);
358     pool.recordFault(b);
359     pool.recordFault(c);
360 
361     auto r = pool[];
362     enforce(r.empty);
363     enforce(!r.willBecomeNonempty(dummyRes, dummyDur));
364 
365     foreach (o; objs) pool.recordSuccess(o);
366   }
367 
368   pool.cycle = true;
369 
370   {
371     auto r = pool[];
372 
373     foreach (o; objs ~ objs) {
374       enforce(!r.empty);
375       enforce(r.front == o);
376       r.popFront();
377     }
378   }
379 
380   {
381     pool.faultDisableCount = 2;
382 
383     enforce(pool[].front == a);
384     pool.recordFault(a);
385     enforce(pool[].front == a);
386     pool.recordSuccess(a);
387     enforce(pool[].front == a);
388     pool.recordFault(a);
389     enforce(pool[].front == a);
390     pool.recordFault(a);
391 
392     auto r = pool[];
393     enforce(r.front == b);
394     r.popFront();
395     enforce(r.front == c);
396     r.popFront();
397     enforce(r.front == b);
398 
399     fakeClock += 2.seconds;
400 
401     r.popFront();
402     enforce(r.front == c);
403 
404     r.popFront();
405     enforce(r.front == a);
406 
407     enforce(pool[].front == a);
408 
409     foreach (o; objs) pool.recordSuccess(o);
410   }
411 
412   {
413     pool.faultDisableCount = 1;
414 
415     pool.recordFault(a);
416     fakeClock += 1.msecs;
417     pool.recordFault(b);
418     fakeClock += 1.msecs;
419     pool.recordFault(c);
420 
421     auto r = pool[];
422     enforce(r.empty);
423 
424     // Make sure willBecomeNonempty gets the order right.
425     enforce(r.willBecomeNonempty(dummyRes, dummyDur));
426     enforce(dummyRes == a);
427     enforce(dummyDur > Duration.zero);
428 
429     foreach (o; objs) pool.recordSuccess(o);
430   }
431 }
432