1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 module client_pool_test;
20
21 import core.sync.semaphore : Semaphore;
22 import core.time : Duration, dur;
23 import core.thread : Thread;
24 import std.algorithm;
25 import std.array;
26 import std.conv;
27 import std.exception;
28 import std.getopt;
29 import std.range;
30 import std.stdio;
31 import std.typecons;
32 import std.variant : Variant;
33 import thrift.base;
34 import thrift.async.libevent;
35 import thrift.async.socket;
36 import thrift.codegen.base;
37 import thrift.codegen.async_client;
38 import thrift.codegen.async_client_pool;
39 import thrift.codegen.client;
40 import thrift.codegen.client_pool;
41 import thrift.codegen.processor;
42 import thrift.protocol.base;
43 import thrift.protocol.binary;
44 import thrift.server.base;
45 import thrift.server.simple;
46 import thrift.server.transport.socket;
47 import thrift.transport.base;
48 import thrift.transport.buffered;
49 import thrift.transport.socket;
50 import thrift.util.cancellation;
51 import thrift.util.future;
52
53 // We use this as our RPC-layer exception here to make sure socket/… problems
54 // (that would usually considered to be RPC layer faults) cause the tests to
55 // fail, even though we are testing the RPC exception handling.
56 class TestServiceException : TException {
57 int port;
58 }
59
60 interface TestService {
61 int getPort();
62 alias .TestServiceException TestServiceException;
63 enum methodMeta = [TMethodMeta("getPort", [],
64 [TExceptionMeta("a", 1, "TestServiceException")])];
65 }
66
67 // Use some derived service, just to check that the pools handle inheritance
68 // correctly.
69 interface ExTestService : TestService {
70 int[] getPortInArray();
71 enum methodMeta = [TMethodMeta("getPortInArray", [],
72 [TExceptionMeta("a", 1, "TestServiceException")])];
73 }
74
75 class ExTestHandler : ExTestService {
this(ushort port,Duration delay,bool failing,bool trace)76 this(ushort port, Duration delay, bool failing, bool trace) {
77 this.port = port;
78 this.delay = delay;
79 this.failing = failing;
80 this.trace = trace;
81 }
82
getPort()83 override int getPort() {
84 if (trace) {
85 stderr.writefln("getPort() called on %s (delay: %s, failing: %s)", port,
86 delay, failing);
87 }
88 sleep();
89 failIfEnabled();
90 return port;
91 }
92
getPortInArray()93 override int[] getPortInArray() {
94 return [getPort()];
95 }
96
97 ushort port;
98 Duration delay;
99 bool failing;
100 bool trace;
101
102 private:
sleep()103 void sleep() {
104 if (delay > dur!"hnsecs"(0)) Thread.sleep(delay);
105 }
106
failIfEnabled()107 void failIfEnabled() {
108 if (!failing) return;
109
110 auto e = new TestServiceException;
111 e.port = port;
112 throw e;
113 }
114 }
115
116 class ServerPreServeHandler : TServerEventHandler {
this(Semaphore sem)117 this(Semaphore sem) {
118 sem_ = sem;
119 }
120
preServe()121 override void preServe() {
122 sem_.notify();
123 }
124
createContext(TProtocol input,TProtocol output)125 Variant createContext(TProtocol input, TProtocol output) { return Variant.init; }
deleteContext(Variant serverContext,TProtocol input,TProtocol output)126 void deleteContext(Variant serverContext, TProtocol input, TProtocol output) {}
preProcess(Variant serverContext,TTransport transport)127 void preProcess(Variant serverContext, TTransport transport) {}
128
129 private:
130 Semaphore sem_;
131 }
132
133 class ServerThread : Thread {
this(ExTestHandler handler,ServerPreServeHandler serverHandler,TCancellation cancellation)134 this(ExTestHandler handler, ServerPreServeHandler serverHandler, TCancellation cancellation) {
135 super(&run);
136 handler_ = handler;
137 cancellation_ = cancellation;
138 serverHandler_ = serverHandler;
139 }
140 private:
run()141 void run() {
142 try {
143 auto protocolFactory = new TBinaryProtocolFactory!();
144 auto processor = new TServiceProcessor!ExTestService(handler_);
145 auto serverTransport = new TServerSocket(handler_.port);
146 serverTransport.recvTimeout = dur!"seconds"(3);
147 auto transportFactory = new TBufferedTransportFactory;
148
149 auto server = new TSimpleServer(processor, serverTransport, transportFactory, protocolFactory);
150 server.eventHandler = serverHandler_;
151 server.serve(cancellation_);
152 } catch (Exception e) {
153 writefln("Server thread on port %s failed: %s", handler_.port, e);
154 }
155 }
156
157 ExTestHandler handler_;
158 ServerPreServeHandler serverHandler_;
159 TCancellation cancellation_;
160 }
161
main(string[]args)162 void main(string[] args) {
163 bool trace;
164 ushort port = 9090;
165 getopt(args, "port", &port, "trace", &trace);
166
167 auto serverCancellation = new TCancellationOrigin;
168 scope (exit) serverCancellation.trigger();
169
170 immutable ports = cast(immutable)array(map!"cast(ushort)a"(iota(port, port + 6)));
171
172 // semaphore that will be incremented whenever each server thread has bound and started listening
173 Semaphore sem = new Semaphore(0);
174
175 version (none) {
176 // Cannot use this due to multiple DMD @@BUG@@s:
177 // 1. »function D main is a nested function and cannot be accessed from array«
178 // when calling array() on the result of the outer map() – would have to
179 // manually do the eager evaluation/array conversion.
180 // 2. »Zip.opSlice cannot get frame pointer to map« for the delay argument,
181 // can be worked around by calling array() on the map result first.
182 // 3. Even when using the workarounds for the last two points, the DMD-built
183 // executable crashes when building without (sic!) inlining enabled,
184 // the backtrace points into the first delegate literal.
185 auto handlers = array(map!((args){
186 return new ExTestHandler(args._0, args._1, args._2, trace);
187 })(zip(
188 ports,
189 map!((a){ return dur!`msecs`(a); })([1, 10, 100, 1, 10, 100]),
190 [false, false, false, true, true, true]
191 )));
192 } else {
193 auto handlers = [
194 new ExTestHandler(cast(ushort)(port + 0), dur!"msecs"(1), false, trace),
195 new ExTestHandler(cast(ushort)(port + 1), dur!"msecs"(10), false, trace),
196 new ExTestHandler(cast(ushort)(port + 2), dur!"msecs"(100), false, trace),
197 new ExTestHandler(cast(ushort)(port + 3), dur!"msecs"(1), true, trace),
198 new ExTestHandler(cast(ushort)(port + 4), dur!"msecs"(10), true, trace),
199 new ExTestHandler(cast(ushort)(port + 5), dur!"msecs"(100), true, trace)
200 ];
201 }
202
203 // Fire up the server threads.
204 foreach (h; handlers) (new ServerThread(h, new ServerPreServeHandler(sem), serverCancellation)).start();
205
206 // wait until all the handlers signal that they're ready to serve
207 foreach (h; handlers) (sem.wait(dur!`seconds`(1)));
208
209 syncClientPoolTest(ports, handlers);
210 asyncClientPoolTest(ports, handlers);
211 asyncFastestClientPoolTest(ports, handlers);
212 asyncAggregatorTest(ports, handlers);
213 }
214
215
syncClientPoolTest(const (ushort)[]ports,ExTestHandler[]handlers)216 void syncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
217 auto clients = array(map!((a){
218 return cast(TClientBase!ExTestService)tClient!ExTestService(
219 tBinaryProtocol(new TSocket("127.0.0.1", a))
220 );
221 })(ports));
222
223 scope(exit) foreach (c; clients) c.outputProtocol.transport.close();
224
225 // Try the case where the first client succeeds.
226 {
227 enforce(makePool(clients).getPort() == ports[0]);
228 }
229
230 // Try the case where all clients fail.
231 {
232 auto pool = makePool(clients[3 .. $]);
233 auto e = cast(TCompoundOperationException)collectException(pool.getPort());
234 enforce(e);
235 enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
236 ports[3 .. $]));
237 }
238
239 // Try the case where the first clients fail, but a later one succeeds.
240 {
241 auto pool = makePool(clients[3 .. $] ~ clients[0 .. 3]);
242 enforce(pool.getPortInArray() == [ports[0]]);
243 }
244
245 // Make sure a client is properly deactivated when it has failed too often.
246 {
247 auto pool = makePool(clients);
248 pool.faultDisableCount = 1;
249 pool.faultDisableDuration = dur!"msecs"(50);
250
251 handlers[0].failing = true;
252 enforce(pool.getPort() == ports[1]);
253
254 handlers[0].failing = false;
255 enforce(pool.getPort() == ports[1]);
256
257 Thread.sleep(dur!"msecs"(50));
258 enforce(pool.getPort() == ports[0]);
259 }
260 }
261
262 auto makePool(TClientBase!ExTestService[] clients) {
263 auto p = tClientPool(clients);
264 p.permuteClients = false;
265 p.rpcFaultFilter = (Exception e) {
266 return (cast(TestServiceException)e !is null);
267 };
268 return p;
269 }
270
271
asyncClientPoolTest(const (ushort)[]ports,ExTestHandler[]handlers)272 void asyncClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
273 auto manager = new TLibeventAsyncManager;
274 scope (exit) manager.stop(dur!"hnsecs"(0));
275
276 auto clients = makeAsyncClients(manager, ports);
277 scope(exit) foreach (c; clients) c.transport.close();
278
279 // Try the case where the first client succeeds.
280 {
281 enforce(makeAsyncPool(clients).getPort() == ports[0]);
282 }
283
284 // Try the case where all clients fail.
285 {
286 auto pool = makeAsyncPool(clients[3 .. $]);
287 auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
288 enforce(e);
289 enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
290 ports[3 .. $]));
291 }
292
293 // Try the case where the first clients fail, but a later one succeeds.
294 {
295 auto pool = makeAsyncPool(clients[3 .. $] ~ clients[0 .. 3]);
296 enforce(pool.getPortInArray() == [ports[0]]);
297 }
298
299 // Make sure a client is properly deactivated when it has failed too often.
300 {
301 auto pool = makeAsyncPool(clients);
302 pool.faultDisableCount = 1;
303 pool.faultDisableDuration = dur!"msecs"(50);
304
305 handlers[0].failing = true;
306 enforce(pool.getPort() == ports[1]);
307
308 handlers[0].failing = false;
309 enforce(pool.getPort() == ports[1]);
310
311 Thread.sleep(dur!"msecs"(50));
312 enforce(pool.getPort() == ports[0]);
313 }
314 }
315
316 auto makeAsyncPool(TAsyncClientBase!ExTestService[] clients) {
317 auto p = tAsyncClientPool(clients);
318 p.permuteClients = false;
319 p.rpcFaultFilter = (Exception e) {
320 return (cast(TestServiceException)e !is null);
321 };
322 return p;
323 }
324
makeAsyncClients(TLibeventAsyncManager manager,in ushort[]ports)325 auto makeAsyncClients(TLibeventAsyncManager manager, in ushort[] ports) {
326 // DMD @@BUG@@ workaround: Using array on the lazyHandlers map result leads
327 // to »function D main is a nested function and cannot be accessed from array«.
328 // Thus, we manually do the array conversion.
329 auto lazyClients = map!((a){
330 return new TAsyncClient!ExTestService(
331 new TAsyncSocket(manager, "127.0.0.1", a),
332 new TBufferedTransportFactory,
333 new TBinaryProtocolFactory!(TBufferedTransport)
334 );
335 })(ports);
336 TAsyncClientBase!ExTestService[] clients;
337 foreach (c; lazyClients) clients ~= c;
338 return clients;
339 }
340
341
asyncFastestClientPoolTest(const (ushort)[]ports,ExTestHandler[]handlers)342 void asyncFastestClientPoolTest(const(ushort)[] ports, ExTestHandler[] handlers) {
343 auto manager = new TLibeventAsyncManager;
344 scope (exit) manager.stop(dur!"hnsecs"(0));
345
346 auto clients = makeAsyncClients(manager, ports);
347 scope(exit) foreach (c; clients) c.transport.close();
348
349 // Make sure the fastest client wins, even if they are called in some other
350 // order.
351 {
352 auto result = makeAsyncFastestPool(array(retro(clients))).getPort().waitGet();
353 enforce(result == ports[0]);
354 }
355
356 // Try the case where all clients fail.
357 {
358 auto pool = makeAsyncFastestPool(clients[3 .. $]);
359 auto e = cast(TCompoundOperationException)collectException(pool.getPort().waitGet());
360 enforce(e);
361 enforce(equal(map!"a.port"(cast(TestServiceException[])e.exceptions),
362 ports[3 .. $]));
363 }
364
365 // Try the case where the first clients fail, but a later one succeeds.
366 {
367 auto pool = makeAsyncFastestPool(clients[1 .. $]);
368 enforce(pool.getPortInArray() == [ports[1]]);
369 }
370 }
371
372 auto makeAsyncFastestPool(TAsyncClientBase!ExTestService[] clients) {
373 auto p = tAsyncFastestClientPool(clients);
374 p.rpcFaultFilter = (Exception e) {
375 return (cast(TestServiceException)e !is null);
376 };
377 return p;
378 }
379
380
asyncAggregatorTest(const (ushort)[]ports,ExTestHandler[]handlers)381 void asyncAggregatorTest(const(ushort)[] ports, ExTestHandler[] handlers) {
382 auto manager = new TLibeventAsyncManager;
383 scope (exit) manager.stop(dur!"hnsecs"(0));
384
385 auto clients = makeAsyncClients(manager, ports);
386 scope(exit) foreach (c; clients) c.transport.close();
387
388 auto aggregator = tAsyncAggregator(
389 cast(TAsyncClientBase!ExTestService[])clients);
390
391 // Test aggregator range interface.
392 {
393 auto range = aggregator.getPort().range(dur!"msecs"(50));
394 enforce(equal(range, ports[0 .. 2][]));
395 enforce(equal(map!"a.port"(cast(TestServiceException[])range.exceptions),
396 ports[3 .. $ - 1]));
397 enforce(range.completedCount == 4);
398 }
399
400 // Test default accumulator for scalars.
401 {
402 auto fullResult = aggregator.getPort().accumulate();
403 enforce(fullResult.waitGet() == ports[0 .. 3]);
404
405 auto partialResult = aggregator.getPort().accumulate();
406 Thread.sleep(dur!"msecs"(20));
407 enforce(partialResult.finishGet() == ports[0 .. 2]);
408
409 }
410
411 // Test default accumulator for arrays.
412 {
413 auto fullResult = aggregator.getPortInArray().accumulate();
414 enforce(fullResult.waitGet() == ports[0 .. 3]);
415
416 auto partialResult = aggregator.getPortInArray().accumulate();
417 Thread.sleep(dur!"msecs"(20));
418 enforce(partialResult.finishGet() == ports[0 .. 2]);
419 }
420
421 // Test custom accumulator.
422 {
423 auto fullResult = aggregator.getPort().accumulate!(function(int[] results){
424 return reduce!"a + b"(results);
425 })();
426 enforce(fullResult.waitGet() == ports[0] + ports[1] + ports[2]);
427
428 auto partialResult = aggregator.getPort().accumulate!(
429 function(int[] results, Exception[] exceptions) {
430 // Return a tuple of the parameters so we can check them outside of
431 // this function (to verify the values, we need access to »ports«, but
432 // due to DMD @@BUG5710@@, we can't use a delegate literal).f
433 return tuple(results, exceptions);
434 }
435 )();
436 Thread.sleep(dur!"msecs"(20));
437 auto resultTuple = partialResult.finishGet();
438 enforce(resultTuple[0] == ports[0 .. 2]);
439 enforce(equal(map!"a.port"(cast(TestServiceException[])resultTuple[1]),
440 ports[3 .. $ - 1]));
441 }
442 }
443