1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 package org.apache.thrift.server;
21 
22 import java.io.IOException;
23 import java.nio.channels.SelectionKey;
24 import java.util.Iterator;
25 import org.apache.thrift.transport.TNonblockingServerTransport;
26 import org.apache.thrift.transport.TNonblockingTransport;
27 import org.apache.thrift.transport.TTransportException;
28 
29 /**
30  * A nonblocking TServer implementation. This allows for fairness amongst all connected clients in
31  * terms of invocations.
32  *
33  * <p>This server is inherently single-threaded. If you want a limited thread pool coupled with
34  * invocation-fairness, see THsHaServer.
35  *
36  * <p>To use this server, you MUST use a TFramedTransport at the outermost transport, otherwise this
37  * server will be unable to determine when a whole method call has been read off the wire. Clients
38  * must also use TFramedTransport.
39  */
40 public class TNonblockingServer extends AbstractNonblockingServer {
41 
42   public static class Args extends AbstractNonblockingServerArgs<Args> {
Args(TNonblockingServerTransport transport)43     public Args(TNonblockingServerTransport transport) {
44       super(transport);
45     }
46   }
47 
48   private SelectAcceptThread selectAcceptThread_;
49 
TNonblockingServer(AbstractNonblockingServerArgs<?> args)50   public TNonblockingServer(AbstractNonblockingServerArgs<?> args) {
51     super(args);
52   }
53 
54   /**
55    * Start the selector thread to deal with accepts and client messages.
56    *
57    * @return true if everything went ok, false if we couldn't start for some reason.
58    */
59   @Override
startThreads()60   protected boolean startThreads() {
61     // start the selector
62     try {
63       selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport) serverTransport_);
64       selectAcceptThread_.start();
65       return true;
66     } catch (IOException e) {
67       LOGGER.error("Failed to start selector thread!", e);
68       return false;
69     }
70   }
71 
72   @Override
waitForShutdown()73   protected void waitForShutdown() {
74     joinSelector();
75   }
76 
77   /** Block until the selector thread exits. */
joinSelector()78   protected void joinSelector() {
79     // wait until the selector thread exits
80     try {
81       selectAcceptThread_.join();
82     } catch (InterruptedException e) {
83       LOGGER.debug("Interrupted while waiting for accept thread", e);
84       Thread.currentThread().interrupt();
85     }
86   }
87 
88   /** Stop serving and shut everything down. */
89   @Override
stop()90   public void stop() {
91     stopped_ = true;
92     if (selectAcceptThread_ != null) {
93       selectAcceptThread_.wakeupSelector();
94     }
95   }
96 
97   /**
98    * Perform an invocation. This method could behave several different ways - invoke immediately
99    * inline, queue for separate execution, etc.
100    */
101   @Override
requestInvoke(FrameBuffer frameBuffer)102   protected boolean requestInvoke(FrameBuffer frameBuffer) {
103     frameBuffer.invoke();
104     return true;
105   }
106 
isStopped()107   public boolean isStopped() {
108     return selectAcceptThread_.isStopped();
109   }
110 
111   /**
112    * The thread that will be doing all the selecting, managing new connections and those that still
113    * need to be read.
114    */
115   protected class SelectAcceptThread extends AbstractSelectThread {
116 
117     // The server transport on which new client transports will be accepted
118     private final TNonblockingServerTransport serverTransport;
119 
120     /** Set up the thread that will handle the non-blocking accepts, reads, and writes. */
SelectAcceptThread(final TNonblockingServerTransport serverTransport)121     public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
122         throws IOException {
123       this.serverTransport = serverTransport;
124       serverTransport.registerSelector(selector);
125     }
126 
isStopped()127     public boolean isStopped() {
128       return stopped_;
129     }
130 
131     /**
132      * The work loop. Handles both selecting (all IO operations) and managing the selection
133      * preferences of all existing connections.
134      */
135     @Override
run()136     public void run() {
137       try {
138         if (eventHandler_ != null) {
139           eventHandler_.preServe();
140         }
141 
142         while (!stopped_) {
143           select();
144           processInterestChanges();
145         }
146         for (SelectionKey selectionKey : selector.keys()) {
147           cleanupSelectionKey(selectionKey);
148         }
149       } catch (Throwable t) {
150         LOGGER.error("run() exiting due to uncaught error", t);
151       } finally {
152         try {
153           selector.close();
154         } catch (IOException e) {
155           LOGGER.error("Got an IOException while closing selector!", e);
156         }
157         stopped_ = true;
158       }
159     }
160 
161     /**
162      * Select and process IO events appropriately: If there are connections to be accepted, accept
163      * them. If there are existing connections with data waiting to be read, read it, buffering
164      * until a whole frame has been read. If there are any pending responses, buffer them until
165      * their target client is available, and then send the data.
166      */
select()167     private void select() {
168       try {
169         // wait for io events.
170         selector.select();
171 
172         // process the io events we received
173         Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
174         while (!stopped_ && selectedKeys.hasNext()) {
175           SelectionKey key = selectedKeys.next();
176           selectedKeys.remove();
177 
178           // skip if not valid
179           if (!key.isValid()) {
180             cleanupSelectionKey(key);
181             continue;
182           }
183 
184           // if the key is marked Accept, then it has to be the server
185           // transport.
186           if (key.isAcceptable()) {
187             handleAccept();
188           } else if (key.isReadable()) {
189             // deal with reads
190             handleRead(key);
191           } else if (key.isWritable()) {
192             // deal with writes
193             handleWrite(key);
194           } else {
195             LOGGER.warn("Unexpected state in select! " + key.interestOps());
196           }
197         }
198       } catch (IOException e) {
199         LOGGER.warn("Got an IOException while selecting!", e);
200       }
201     }
202 
createFrameBuffer( final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread)203     protected FrameBuffer createFrameBuffer(
204         final TNonblockingTransport trans,
205         final SelectionKey selectionKey,
206         final AbstractSelectThread selectThread)
207         throws TTransportException {
208       return processorFactory_.isAsyncProcessor()
209           ? new AsyncFrameBuffer(trans, selectionKey, selectThread)
210           : new FrameBuffer(trans, selectionKey, selectThread);
211     }
212 
213     /** Accept a new connection. */
handleAccept()214     private void handleAccept() throws IOException {
215       SelectionKey clientKey = null;
216       TNonblockingTransport client = null;
217       try {
218         // accept the connection
219         client = serverTransport.accept();
220         clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
221 
222         // add this key to the map
223         FrameBuffer frameBuffer = createFrameBuffer(client, clientKey, SelectAcceptThread.this);
224 
225         clientKey.attach(frameBuffer);
226       } catch (TTransportException tte) {
227         // something went wrong accepting.
228         LOGGER.warn("Exception trying to accept!", tte);
229         if (clientKey != null) cleanupSelectionKey(clientKey);
230         if (client != null) client.close();
231       }
232     }
233   } // SelectAcceptThread
234 }
235