1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.thrift.server; 21 22 import java.util.concurrent.ExecutorService; 23 import java.util.concurrent.LinkedBlockingQueue; 24 import java.util.concurrent.RejectedExecutionException; 25 import java.util.concurrent.ThreadPoolExecutor; 26 import java.util.concurrent.TimeUnit; 27 import org.apache.thrift.transport.TNonblockingServerTransport; 28 29 /** 30 * An extension of the TNonblockingServer to a Half-Sync/Half-Async server. Like TNonblockingServer, 31 * it relies on the use of TFramedTransport. 32 */ 33 public class THsHaServer extends TNonblockingServer { 34 35 public static class Args extends AbstractNonblockingServerArgs<Args> { 36 public int minWorkerThreads = 5; 37 public int maxWorkerThreads = Integer.MAX_VALUE; 38 private int stopTimeoutVal = 60; 39 private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS; 40 private ExecutorService executorService = null; 41 Args(TNonblockingServerTransport transport)42 public Args(TNonblockingServerTransport transport) { 43 super(transport); 44 } 45 46 /** 47 * Sets the min and max threads. 48 * 49 * @deprecated use {@link #minWorkerThreads(int)} and {@link #maxWorkerThreads(int)} instead. 50 */ 51 @Deprecated workerThreads(int n)52 public Args workerThreads(int n) { 53 minWorkerThreads = n; 54 maxWorkerThreads = n; 55 return this; 56 } 57 58 /** 59 * @return what the min threads was set to. 60 * @deprecated use {@link #getMinWorkerThreads()} and {@link #getMaxWorkerThreads()} instead. 61 */ 62 @Deprecated getWorkerThreads()63 public int getWorkerThreads() { 64 return minWorkerThreads; 65 } 66 minWorkerThreads(int n)67 public Args minWorkerThreads(int n) { 68 minWorkerThreads = n; 69 return this; 70 } 71 maxWorkerThreads(int n)72 public Args maxWorkerThreads(int n) { 73 maxWorkerThreads = n; 74 return this; 75 } 76 getMinWorkerThreads()77 public int getMinWorkerThreads() { 78 return minWorkerThreads; 79 } 80 getMaxWorkerThreads()81 public int getMaxWorkerThreads() { 82 return maxWorkerThreads; 83 } 84 getStopTimeoutVal()85 public int getStopTimeoutVal() { 86 return stopTimeoutVal; 87 } 88 stopTimeoutVal(int stopTimeoutVal)89 public Args stopTimeoutVal(int stopTimeoutVal) { 90 this.stopTimeoutVal = stopTimeoutVal; 91 return this; 92 } 93 getStopTimeoutUnit()94 public TimeUnit getStopTimeoutUnit() { 95 return stopTimeoutUnit; 96 } 97 stopTimeoutUnit(TimeUnit stopTimeoutUnit)98 public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) { 99 this.stopTimeoutUnit = stopTimeoutUnit; 100 return this; 101 } 102 getExecutorService()103 public ExecutorService getExecutorService() { 104 return executorService; 105 } 106 executorService(ExecutorService executorService)107 public Args executorService(ExecutorService executorService) { 108 this.executorService = executorService; 109 return this; 110 } 111 } 112 113 // This wraps all the functionality of queueing and thread pool management 114 // for the passing of Invocations from the Selector to workers. 115 private final ExecutorService invoker; 116 117 private final Args args; 118 119 /** Create the server with the specified Args configuration */ THsHaServer(Args args)120 public THsHaServer(Args args) { 121 super(args); 122 123 invoker = args.executorService == null ? createInvokerPool(args) : args.executorService; 124 this.args = args; 125 } 126 127 /** {@inheritDoc} */ 128 @Override waitForShutdown()129 protected void waitForShutdown() { 130 joinSelector(); 131 gracefullyShutdownInvokerPool(); 132 } 133 134 /** Helper to create an invoker pool */ createInvokerPool(Args options)135 protected static ExecutorService createInvokerPool(Args options) { 136 int minWorkerThreads = options.minWorkerThreads; 137 int maxWorkerThreads = options.maxWorkerThreads; 138 int stopTimeoutVal = options.stopTimeoutVal; 139 TimeUnit stopTimeoutUnit = options.stopTimeoutUnit; 140 141 LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); 142 ExecutorService invoker = 143 new ThreadPoolExecutor( 144 minWorkerThreads, maxWorkerThreads, stopTimeoutVal, stopTimeoutUnit, queue); 145 146 return invoker; 147 } 148 getInvoker()149 protected ExecutorService getInvoker() { 150 return invoker; 151 } 152 gracefullyShutdownInvokerPool()153 protected void gracefullyShutdownInvokerPool() { 154 // try to gracefully shut down the executor service 155 invoker.shutdown(); 156 157 // Loop until awaitTermination finally does return without a interrupted 158 // exception. If we don't do this, then we'll shut down prematurely. We want 159 // to let the executorService clear it's task queue, closing client sockets 160 // appropriately. 161 long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal); 162 long now = System.currentTimeMillis(); 163 while (timeoutMS >= 0) { 164 try { 165 invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS); 166 break; 167 } catch (InterruptedException ix) { 168 long newnow = System.currentTimeMillis(); 169 timeoutMS -= (newnow - now); 170 now = newnow; 171 } 172 } 173 } 174 175 /** 176 * We override the standard invoke method here to queue the invocation for invoker service instead 177 * of immediately invoking. The thread pool takes care of the rest. 178 */ 179 @Override requestInvoke(FrameBuffer frameBuffer)180 protected boolean requestInvoke(FrameBuffer frameBuffer) { 181 try { 182 Runnable invocation = getRunnable(frameBuffer); 183 invoker.execute(invocation); 184 return true; 185 } catch (RejectedExecutionException rx) { 186 LOGGER.warn("ExecutorService rejected execution!", rx); 187 return false; 188 } 189 } 190 getRunnable(FrameBuffer frameBuffer)191 protected Runnable getRunnable(FrameBuffer frameBuffer) { 192 return new Invocation(frameBuffer); 193 } 194 } 195