1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 package org.apache.thrift.server;
21 
22 import java.util.concurrent.ExecutorService;
23 import java.util.concurrent.LinkedBlockingQueue;
24 import java.util.concurrent.RejectedExecutionException;
25 import java.util.concurrent.ThreadPoolExecutor;
26 import java.util.concurrent.TimeUnit;
27 import org.apache.thrift.transport.TNonblockingServerTransport;
28 
29 /**
30  * An extension of the TNonblockingServer to a Half-Sync/Half-Async server. Like TNonblockingServer,
31  * it relies on the use of TFramedTransport.
32  */
33 public class THsHaServer extends TNonblockingServer {
34 
35   public static class Args extends AbstractNonblockingServerArgs<Args> {
36     public int minWorkerThreads = 5;
37     public int maxWorkerThreads = Integer.MAX_VALUE;
38     private int stopTimeoutVal = 60;
39     private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
40     private ExecutorService executorService = null;
41 
Args(TNonblockingServerTransport transport)42     public Args(TNonblockingServerTransport transport) {
43       super(transport);
44     }
45 
46     /**
47      * Sets the min and max threads.
48      *
49      * @deprecated use {@link #minWorkerThreads(int)} and {@link #maxWorkerThreads(int)} instead.
50      */
51     @Deprecated
workerThreads(int n)52     public Args workerThreads(int n) {
53       minWorkerThreads = n;
54       maxWorkerThreads = n;
55       return this;
56     }
57 
58     /**
59      * @return what the min threads was set to.
60      * @deprecated use {@link #getMinWorkerThreads()} and {@link #getMaxWorkerThreads()} instead.
61      */
62     @Deprecated
getWorkerThreads()63     public int getWorkerThreads() {
64       return minWorkerThreads;
65     }
66 
minWorkerThreads(int n)67     public Args minWorkerThreads(int n) {
68       minWorkerThreads = n;
69       return this;
70     }
71 
maxWorkerThreads(int n)72     public Args maxWorkerThreads(int n) {
73       maxWorkerThreads = n;
74       return this;
75     }
76 
getMinWorkerThreads()77     public int getMinWorkerThreads() {
78       return minWorkerThreads;
79     }
80 
getMaxWorkerThreads()81     public int getMaxWorkerThreads() {
82       return maxWorkerThreads;
83     }
84 
getStopTimeoutVal()85     public int getStopTimeoutVal() {
86       return stopTimeoutVal;
87     }
88 
stopTimeoutVal(int stopTimeoutVal)89     public Args stopTimeoutVal(int stopTimeoutVal) {
90       this.stopTimeoutVal = stopTimeoutVal;
91       return this;
92     }
93 
getStopTimeoutUnit()94     public TimeUnit getStopTimeoutUnit() {
95       return stopTimeoutUnit;
96     }
97 
stopTimeoutUnit(TimeUnit stopTimeoutUnit)98     public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
99       this.stopTimeoutUnit = stopTimeoutUnit;
100       return this;
101     }
102 
getExecutorService()103     public ExecutorService getExecutorService() {
104       return executorService;
105     }
106 
executorService(ExecutorService executorService)107     public Args executorService(ExecutorService executorService) {
108       this.executorService = executorService;
109       return this;
110     }
111   }
112 
113   // This wraps all the functionality of queueing and thread pool management
114   // for the passing of Invocations from the Selector to workers.
115   private final ExecutorService invoker;
116 
117   private final Args args;
118 
119   /** Create the server with the specified Args configuration */
THsHaServer(Args args)120   public THsHaServer(Args args) {
121     super(args);
122 
123     invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
124     this.args = args;
125   }
126 
127   /** {@inheritDoc} */
128   @Override
waitForShutdown()129   protected void waitForShutdown() {
130     joinSelector();
131     gracefullyShutdownInvokerPool();
132   }
133 
134   /** Helper to create an invoker pool */
createInvokerPool(Args options)135   protected static ExecutorService createInvokerPool(Args options) {
136     int minWorkerThreads = options.minWorkerThreads;
137     int maxWorkerThreads = options.maxWorkerThreads;
138     int stopTimeoutVal = options.stopTimeoutVal;
139     TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
140 
141     LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
142     ExecutorService invoker =
143         new ThreadPoolExecutor(
144             minWorkerThreads, maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue);
145 
146     return invoker;
147   }
148 
getInvoker()149   protected ExecutorService getInvoker() {
150     return invoker;
151   }
152 
gracefullyShutdownInvokerPool()153   protected void gracefullyShutdownInvokerPool() {
154     // try to gracefully shut down the executor service
155     invoker.shutdown();
156 
157     // Loop until awaitTermination finally does return without a interrupted
158     // exception. If we don't do this, then we'll shut down prematurely. We want
159     // to let the executorService clear it's task queue, closing client sockets
160     // appropriately.
161     long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
162     long now = System.currentTimeMillis();
163     while (timeoutMS >= 0) {
164       try {
165         invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
166         break;
167       } catch (InterruptedException ix) {
168         long newnow = System.currentTimeMillis();
169         timeoutMS -= (newnow - now);
170         now = newnow;
171       }
172     }
173   }
174 
175   /**
176    * We override the standard invoke method here to queue the invocation for invoker service instead
177    * of immediately invoking. The thread pool takes care of the rest.
178    */
179   @Override
requestInvoke(FrameBuffer frameBuffer)180   protected boolean requestInvoke(FrameBuffer frameBuffer) {
181     try {
182       Runnable invocation = getRunnable(frameBuffer);
183       invoker.execute(invocation);
184       return true;
185     } catch (RejectedExecutionException rx) {
186       LOGGER.warn("ExecutorService rejected execution!", rx);
187       return false;
188     }
189   }
190 
getRunnable(FrameBuffer frameBuffer)191   protected Runnable getRunnable(FrameBuffer frameBuffer) {
192     return new Invocation(frameBuffer);
193   }
194 }
195