1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.thrift.server; 21 22 import java.io.IOException; 23 import java.nio.channels.SelectionKey; 24 import java.util.Iterator; 25 import org.apache.thrift.transport.TNonblockingServerTransport; 26 import org.apache.thrift.transport.TNonblockingTransport; 27 import org.apache.thrift.transport.TTransportException; 28 29 /** 30 * A nonblocking TServer implementation. This allows for fairness amongst all connected clients in 31 * terms of invocations. 32 * 33 * <p>This server is inherently single-threaded. If you want a limited thread pool coupled with 34 * invocation-fairness, see THsHaServer. 35 * 36 * <p>To use this server, you MUST use a TFramedTransport at the outermost transport, otherwise this 37 * server will be unable to determine when a whole method call has been read off the wire. Clients 38 * must also use TFramedTransport. 39 */ 40 public class TNonblockingServer extends AbstractNonblockingServer { 41 42 public static class Args extends AbstractNonblockingServerArgs<Args> { Args(TNonblockingServerTransport transport)43 public Args(TNonblockingServerTransport transport) { 44 super(transport); 45 } 46 } 47 48 private SelectAcceptThread selectAcceptThread_; 49 TNonblockingServer(AbstractNonblockingServerArgs<?> args)50 public TNonblockingServer(AbstractNonblockingServerArgs<?> args) { 51 super(args); 52 } 53 54 /** 55 * Start the selector thread to deal with accepts and client messages. 56 * 57 * @return true if everything went ok, false if we couldn't start for some reason. 58 */ 59 @Override startThreads()60 protected boolean startThreads() { 61 // start the selector 62 try { 63 selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport) serverTransport_); 64 selectAcceptThread_.start(); 65 return true; 66 } catch (IOException e) { 67 LOGGER.error("Failed to start selector thread!", e); 68 return false; 69 } 70 } 71 72 @Override waitForShutdown()73 protected void waitForShutdown() { 74 joinSelector(); 75 } 76 77 /** Block until the selector thread exits. */ joinSelector()78 protected void joinSelector() { 79 // wait until the selector thread exits 80 try { 81 selectAcceptThread_.join(); 82 } catch (InterruptedException e) { 83 LOGGER.debug("Interrupted while waiting for accept thread", e); 84 Thread.currentThread().interrupt(); 85 } 86 } 87 88 /** Stop serving and shut everything down. */ 89 @Override stop()90 public void stop() { 91 stopped_ = true; 92 if (selectAcceptThread_ != null) { 93 selectAcceptThread_.wakeupSelector(); 94 } 95 } 96 97 /** 98 * Perform an invocation. This method could behave several different ways - invoke immediately 99 * inline, queue for separate execution, etc. 100 */ 101 @Override requestInvoke(FrameBuffer frameBuffer)102 protected boolean requestInvoke(FrameBuffer frameBuffer) { 103 frameBuffer.invoke(); 104 return true; 105 } 106 isStopped()107 public boolean isStopped() { 108 return selectAcceptThread_.isStopped(); 109 } 110 111 /** 112 * The thread that will be doing all the selecting, managing new connections and those that still 113 * need to be read. 114 */ 115 protected class SelectAcceptThread extends AbstractSelectThread { 116 117 // The server transport on which new client transports will be accepted 118 private final TNonblockingServerTransport serverTransport; 119 120 /** Set up the thread that will handle the non-blocking accepts, reads, and writes. */ SelectAcceptThread(final TNonblockingServerTransport serverTransport)121 public SelectAcceptThread(final TNonblockingServerTransport serverTransport) 122 throws IOException { 123 this.serverTransport = serverTransport; 124 serverTransport.registerSelector(selector); 125 } 126 isStopped()127 public boolean isStopped() { 128 return stopped_; 129 } 130 131 /** 132 * The work loop. Handles both selecting (all IO operations) and managing the selection 133 * preferences of all existing connections. 134 */ 135 @Override run()136 public void run() { 137 try { 138 if (eventHandler_ != null) { 139 eventHandler_.preServe(); 140 } 141 142 while (!stopped_) { 143 select(); 144 processInterestChanges(); 145 } 146 for (SelectionKey selectionKey : selector.keys()) { 147 cleanupSelectionKey(selectionKey); 148 } 149 } catch (Throwable t) { 150 LOGGER.error("run() exiting due to uncaught error", t); 151 } finally { 152 try { 153 selector.close(); 154 } catch (IOException e) { 155 LOGGER.error("Got an IOException while closing selector!", e); 156 } 157 stopped_ = true; 158 } 159 } 160 161 /** 162 * Select and process IO events appropriately: If there are connections to be accepted, accept 163 * them. If there are existing connections with data waiting to be read, read it, buffering 164 * until a whole frame has been read. If there are any pending responses, buffer them until 165 * their target client is available, and then send the data. 166 */ select()167 private void select() { 168 try { 169 // wait for io events. 170 selector.select(); 171 172 // process the io events we received 173 Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator(); 174 while (!stopped_ && selectedKeys.hasNext()) { 175 SelectionKey key = selectedKeys.next(); 176 selectedKeys.remove(); 177 178 // skip if not valid 179 if (!key.isValid()) { 180 cleanupSelectionKey(key); 181 continue; 182 } 183 184 // if the key is marked Accept, then it has to be the server 185 // transport. 186 if (key.isAcceptable()) { 187 handleAccept(); 188 } else if (key.isReadable()) { 189 // deal with reads 190 handleRead(key); 191 } else if (key.isWritable()) { 192 // deal with writes 193 handleWrite(key); 194 } else { 195 LOGGER.warn("Unexpected state in select! " + key.interestOps()); 196 } 197 } 198 } catch (IOException e) { 199 LOGGER.warn("Got an IOException while selecting!", e); 200 } 201 } 202 createFrameBuffer( final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread)203 protected FrameBuffer createFrameBuffer( 204 final TNonblockingTransport trans, 205 final SelectionKey selectionKey, 206 final AbstractSelectThread selectThread) 207 throws TTransportException { 208 return processorFactory_.isAsyncProcessor() 209 ? new AsyncFrameBuffer(trans, selectionKey, selectThread) 210 : new FrameBuffer(trans, selectionKey, selectThread); 211 } 212 213 /** Accept a new connection. */ handleAccept()214 private void handleAccept() throws IOException { 215 SelectionKey clientKey = null; 216 TNonblockingTransport client = null; 217 try { 218 // accept the connection 219 client = serverTransport.accept(); 220 clientKey = client.registerSelector(selector, SelectionKey.OP_READ); 221 222 // add this key to the map 223 FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this); 224 225 clientKey.attach(frameBuffer); 226 } catch (TTransportException tte) { 227 // something went wrong accepting. 228 LOGGER.warn("Exception trying to accept!", tte); 229 if (clientKey != null) cleanupSelectionKey(clientKey); 230 if (client != null) client.close(); 231 } 232 } 233 } // SelectAcceptThread 234 } 235