1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.codegen.client_pool;
20 
21 import core.time : dur, Duration, TickDuration;
22 import std.traits : ParameterTypeTuple, ReturnType;
23 import thrift.base;
24 import thrift.codegen.base;
25 import thrift.codegen.client;
26 import thrift.internal.codegen;
27 import thrift.internal.resource_pool;
28 
29 /**
30  * Manages a pool of TClients for the given interface, forwarding RPC calls to
31  * members of the pool.
32  *
33  * If a request fails, another client from the pool is tried, and optionally,
34  * a client is disabled for a configurable amount of time if it fails too
35  * often. If all clients fail (and keepTrying is false), a
36  * TCompoundOperationException is thrown, containing all the collected RPC
37  * exceptions.
38  */
39 class TClientPool(Interface) if (isService!Interface) : Interface {
40   /// Shorthand for TClientBase!Interface, the client type this instance
41   /// operates on.
42   alias TClientBase!Interface Client;
43 
44   /**
45    * Creates a new instance and adds the given clients to the pool.
46    */
this(Client[]clients)47   this(Client[] clients) {
48     pool_ = new TResourcePool!Client(clients);
49 
50     rpcFaultFilter = (Exception e) {
51       import thrift.protocol.base;
52       import thrift.transport.base;
53       return (
54         (cast(TTransportException)e !is null) ||
55         (cast(TApplicationException)e !is null)
56       );
57     };
58   }
59 
60   /**
61    * Executes an operation on the first currently active client.
62    *
63    * If the operation fails (throws an exception for which rpcFaultFilter is
64    * true), the failure is recorded and the next client in the pool is tried.
65    *
66    * Throws: Any non-rpc exception that occurs, a TCompoundOperationException
67    *   if all clients failed with an rpc exception (if keepTrying is false).
68    *
69    * Example:
70    * ---
71    * interface Foo { string bar(); }
72    * auto poolClient = tClientPool([tClient!Foo(someProtocol)]);
73    * auto result = poolClient.execute((c){ return c.bar(); });
74    * ---
75    */
execute(ResultType)76   ResultType execute(ResultType)(scope ResultType delegate(Client) work) {
77     return executeOnPool!Client(work);
78   }
79 
80   /**
81    * Adds a client to the pool.
82    */
addClient(Client client)83   void addClient(Client client) {
84     pool_.add(client);
85   }
86 
87   /**
88    * Removes a client from the pool.
89    *
90    * Returns: Whether the client was found in the pool.
91    */
removeClient(Client client)92   bool removeClient(Client client) {
93     return pool_.remove(client);
94   }
95 
96   mixin(poolForwardCode!Interface());
97 
98   /// Whether to open the underlying transports of a client before trying to
99   /// execute a method if they are not open. This is usually desirable
100   /// because it allows e.g. to automatically reconnect to a remote server
101   /// if the network connection is dropped.
102   ///
103   /// Defaults to true.
104   bool reopenTransports = true;
105 
106   /// Called to determine whether an exception comes from a client from the
107   /// pool not working properly, or if it an exception thrown at the
108   /// application level.
109   ///
110   /// If the delegate returns true, the server/connection is considered to be
111   /// at fault, if it returns false, the exception is just passed on to the
112   /// caller.
113   ///
114   /// By default, returns true for instances of TTransportException and
115   /// TApplicationException, false otherwise.
116   bool delegate(Exception) rpcFaultFilter;
117 
118   /**
119    * Whether to keep trying to find a working client if all have failed in a
120    * row.
121    *
122    * Defaults to false.
123    */
keepTrying()124   bool keepTrying() const @property {
125     return pool_.cycle;
126   }
127 
128   /// Ditto
keepTrying(bool value)129   void keepTrying(bool value) @property {
130     pool_.cycle = value;
131   }
132 
133   /**
134    * Whether to use a random permutation of the client pool on every call to
135    * execute(). This can be used e.g. as a simple form of load balancing.
136    *
137    * Defaults to true.
138    */
permuteClients()139   bool permuteClients() const @property {
140     return pool_.permute;
141   }
142 
143   /// Ditto
permuteClients(bool value)144   void permuteClients(bool value) @property {
145     pool_.permute = value;
146   }
147 
148   /**
149    * The number of consecutive faults after which a client is disabled until
150    * faultDisableDuration has passed. 0 to never disable clients.
151    *
152    * Defaults to 0.
153    */
faultDisableCount()154   ushort faultDisableCount() @property {
155     return pool_.faultDisableCount;
156   }
157 
158   /// Ditto
faultDisableCount(ushort value)159   void faultDisableCount(ushort value) @property {
160     pool_.faultDisableCount = value;
161   }
162 
163   /**
164    * The duration for which a client is no longer considered after it has
165    * failed too often.
166    *
167    * Defaults to one second.
168    */
faultDisableDuration()169   Duration faultDisableDuration() @property {
170     return pool_.faultDisableDuration;
171   }
172 
173   /// Ditto
faultDisableDuration(Duration value)174   void faultDisableDuration(Duration value) @property {
175     pool_.faultDisableDuration = value;
176   }
177 
178 protected:
executeOnPool(ResultType)179   ResultType executeOnPool(ResultType)(scope ResultType delegate(Client) work) {
180     auto clients = pool_[];
181     if (clients.empty) {
182       throw new TException("No clients available to try.");
183     }
184 
185     while (true) {
186       Exception[] rpcExceptions;
187       while (!clients.empty) {
188         auto c = clients.front;
189         clients.popFront;
190         try {
191           scope (success) {
192             pool_.recordSuccess(c);
193           }
194 
195           if (reopenTransports) {
196             c.inputProtocol.transport.open();
197             c.outputProtocol.transport.open();
198           }
199 
200           return work(c);
201         } catch (Exception e) {
202           if (rpcFaultFilter && rpcFaultFilter(e)) {
203             pool_.recordFault(c);
204             rpcExceptions ~= e;
205           } else {
206             // We are dealing with a normal exception thrown by the
207             // server-side method, just pass it on. As far as we are
208             // concerned, the method call succeeded.
209             pool_.recordSuccess(c);
210             throw e;
211           }
212         }
213       }
214 
215       // If we get here, no client succeeded during the current iteration.
216       Duration waitTime;
217       Client dummy;
218       if (clients.willBecomeNonempty(dummy, waitTime)) {
219         if (waitTime > dur!"hnsecs"(0)) {
220           import core.thread;
221           Thread.sleep(waitTime);
222         }
223       } else {
224         throw new TCompoundOperationException("All clients failed.",
225           rpcExceptions);
226       }
227     }
228   }
229 
230 private:
231   TResourcePool!Client pool_;
232 }
233 
234 private {
235   // Cannot use an anonymous delegate literal for this because they aren't
236   // allowed in class scope.
poolForwardCode(Interface)237   string poolForwardCode(Interface)() {
238     string code = "";
239 
240     foreach (methodName; AllMemberMethodNames!Interface) {
241       enum qn = "Interface." ~ methodName;
242       code ~= "ReturnType!(" ~ qn ~ ") " ~ methodName ~
243         "(ParameterTypeTuple!(" ~ qn ~ ") args) {\n";
244       code ~= "return executeOnPool((Client c){ return c." ~
245         methodName ~ "(args); });\n";
246       code ~= "}\n";
247     }
248 
249     return code;
250   }
251 }
252 
253 /**
254  * TClientPool construction helper to avoid having to explicitly specify
255  * the interface type, i.e. to allow the constructor being called using IFTI
256  * (see $(DMDBUG 6082, D Bugzilla enhancement requet 6082)).
257  */
258 TClientPool!Interface tClientPool(Interface)(
259   TClientBase!Interface[] clients
260 ) if (isService!Interface) {
261   return new typeof(return)(clients);
262 }
263