1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 package org.apache.thrift.server;
21 
22 import java.io.IOException;
23 import java.nio.ByteBuffer;
24 import java.nio.channels.SelectionKey;
25 import java.nio.channels.Selector;
26 import java.nio.channels.spi.SelectorProvider;
27 import java.util.HashSet;
28 import java.util.Set;
29 import java.util.concurrent.atomic.AtomicLong;
30 import org.apache.thrift.TAsyncProcessor;
31 import org.apache.thrift.TByteArrayOutputStream;
32 import org.apache.thrift.TException;
33 import org.apache.thrift.protocol.TProtocol;
34 import org.apache.thrift.transport.TIOStreamTransport;
35 import org.apache.thrift.transport.TMemoryInputTransport;
36 import org.apache.thrift.transport.TNonblockingServerTransport;
37 import org.apache.thrift.transport.TNonblockingTransport;
38 import org.apache.thrift.transport.TTransport;
39 import org.apache.thrift.transport.TTransportException;
40 import org.apache.thrift.transport.layered.TFramedTransport;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43 
44 /** Provides common methods and classes used by nonblocking TServer implementations. */
45 public abstract class AbstractNonblockingServer extends TServer {
46   protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
47 
48   public abstract static class AbstractNonblockingServerArgs<
49           T extends AbstractNonblockingServerArgs<T>>
50       extends AbstractServerArgs<T> {
51     public long maxReadBufferBytes = 256 * 1024 * 1024;
52 
AbstractNonblockingServerArgs(TNonblockingServerTransport transport)53     public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
54       super(transport);
55       transportFactory(new TFramedTransport.Factory());
56     }
57   }
58 
59   /**
60    * The maximum amount of memory we will allocate to client IO buffers at a time. Without this
61    * limit, the server will gladly allocate client buffers right into an out of memory exception,
62    * rather than waiting.
63    */
64   final long MAX_READ_BUFFER_BYTES;
65 
66   /** How many bytes are currently allocated to read buffers. */
67   final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
68 
AbstractNonblockingServer(AbstractNonblockingServerArgs<?> args)69   public AbstractNonblockingServer(AbstractNonblockingServerArgs<?> args) {
70     super(args);
71     MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
72   }
73 
74   /** Begin accepting connections and processing invocations. */
serve()75   public void serve() {
76     // start any IO threads
77     if (!startThreads()) {
78       return;
79     }
80 
81     // start listening, or exit
82     if (!startListening()) {
83       return;
84     }
85 
86     setServing(true);
87 
88     // this will block while we serve
89     waitForShutdown();
90 
91     setServing(false);
92 
93     // do a little cleanup
94     stopListening();
95   }
96 
97   /**
98    * Starts any threads required for serving.
99    *
100    * @return true if everything went ok, false if threads could not be started.
101    */
startThreads()102   protected abstract boolean startThreads();
103 
104   /** A method that will block until when threads handling the serving have been shut down. */
waitForShutdown()105   protected abstract void waitForShutdown();
106 
107   /**
108    * Have the server transport start accepting connections.
109    *
110    * @return true if we started listening successfully, false if something went wrong.
111    */
startListening()112   protected boolean startListening() {
113     try {
114       serverTransport_.listen();
115       return true;
116     } catch (TTransportException ttx) {
117       LOGGER.error("Failed to start listening on server socket!", ttx);
118       return false;
119     }
120   }
121 
122   /** Stop listening for connections. */
stopListening()123   protected void stopListening() {
124     serverTransport_.close();
125   }
126 
127   /**
128    * Perform an invocation. This method could behave several different ways - invoke immediately
129    * inline, queue for separate execution, etc.
130    *
131    * @return true if invocation was successfully requested, which is not a guarantee that invocation
132    *     has completed. False if the request failed.
133    */
requestInvoke(FrameBuffer frameBuffer)134   protected abstract boolean requestInvoke(FrameBuffer frameBuffer);
135 
136   /**
137    * An abstract thread that handles selecting on a set of transports and {@link FrameBuffer
138    * FrameBuffers} associated with selected keys corresponding to requests.
139    */
140   protected abstract class AbstractSelectThread extends Thread {
141     protected Selector selector;
142 
143     // List of FrameBuffers that want to change their selection interests.
144     protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
145 
AbstractSelectThread()146     public AbstractSelectThread() throws IOException {
147       this.selector = SelectorProvider.provider().openSelector();
148     }
149 
150     /** If the selector is blocked, wake it up. */
wakeupSelector()151     public void wakeupSelector() {
152       selector.wakeup();
153     }
154 
155     /**
156      * Add FrameBuffer to the list of select interest changes and wake up the selector if it's
157      * blocked. When the select() call exits, it'll give the FrameBuffer a chance to change its
158      * interests.
159      */
requestSelectInterestChange(FrameBuffer frameBuffer)160     public void requestSelectInterestChange(FrameBuffer frameBuffer) {
161       synchronized (selectInterestChanges) {
162         selectInterestChanges.add(frameBuffer);
163       }
164       // wakeup the selector, if it's currently blocked.
165       selector.wakeup();
166     }
167 
168     /**
169      * Check to see if there are any FrameBuffers that have switched their interest type from read
170      * to write or vice versa.
171      */
processInterestChanges()172     protected void processInterestChanges() {
173       synchronized (selectInterestChanges) {
174         for (FrameBuffer fb : selectInterestChanges) {
175           fb.changeSelectInterests();
176         }
177         selectInterestChanges.clear();
178       }
179     }
180 
181     /**
182      * Do the work required to read from a readable client. If the frame is fully read, then invoke
183      * the method call.
184      */
handleRead(SelectionKey key)185     protected void handleRead(SelectionKey key) {
186       FrameBuffer buffer = (FrameBuffer) key.attachment();
187       if (!buffer.read()) {
188         cleanupSelectionKey(key);
189         return;
190       }
191 
192       // if the buffer's frame read is complete, invoke the method.
193       if (buffer.isFrameFullyRead() && !requestInvoke(buffer)) {
194         cleanupSelectionKey(key);
195       }
196     }
197 
198     /** Let a writable client get written, if there's data to be written. */
handleWrite(SelectionKey key)199     protected void handleWrite(SelectionKey key) {
200       FrameBuffer buffer = (FrameBuffer) key.attachment();
201       if (!buffer.write()) {
202         cleanupSelectionKey(key);
203       }
204     }
205 
206     /** Do connection-close cleanup on a given SelectionKey. */
cleanupSelectionKey(SelectionKey key)207     protected void cleanupSelectionKey(SelectionKey key) {
208       // remove the records from the two maps
209       FrameBuffer buffer = (FrameBuffer) key.attachment();
210       if (buffer != null) {
211         // close the buffer
212         buffer.close();
213       }
214       // cancel the selection key
215       key.cancel();
216     }
217   } // SelectThread
218 
219   /** Possible states for the FrameBuffer state machine. */
220   private enum FrameBufferState {
221     // in the midst of reading the frame size off the wire
222     READING_FRAME_SIZE,
223     // reading the actual frame data now, but not all the way done yet
224     READING_FRAME,
225     // completely read the frame, so an invocation can now happen
226     READ_FRAME_COMPLETE,
227     // waiting to get switched to listening for write events
228     AWAITING_REGISTER_WRITE,
229     // started writing response data, not fully complete yet
230     WRITING,
231     // another thread wants this framebuffer to go back to reading
232     AWAITING_REGISTER_READ,
233     // we want our transport and selection key invalidated in the selector
234     // thread
235     AWAITING_CLOSE
236   }
237 
238   /**
239    * Class that implements a sort of state machine around the interaction with a client and an
240    * invoker. It manages reading the frame size and frame data, getting it handed off as wrapped
241    * transports, and then the writing of response data back to the client. In the process it manages
242    * flipping the read and write bits on the selection key for its client.
243    */
244   public class FrameBuffer {
245     private final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
246 
247     // the actual transport hooked up to the client.
248     protected final TNonblockingTransport trans_;
249 
250     // the SelectionKey that corresponds to our transport
251     protected final SelectionKey selectionKey_;
252 
253     // the SelectThread that owns the registration of our transport
254     protected final AbstractSelectThread selectThread_;
255 
256     // where in the process of reading/writing are we?
257     protected FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;
258 
259     // the ByteBuffer we'll be using to write and read, depending on the state
260     protected ByteBuffer buffer_;
261 
262     protected final TByteArrayOutputStream response_;
263 
264     // the frame that the TTransport should wrap.
265     protected final TMemoryInputTransport frameTrans_;
266 
267     // the transport that should be used to connect to clients
268     protected final TTransport inTrans_;
269 
270     protected final TTransport outTrans_;
271 
272     // the input protocol to use on frames
273     protected final TProtocol inProt_;
274 
275     // the output protocol to use on frames
276     protected final TProtocol outProt_;
277 
278     // context associated with this connection
279     protected final ServerContext context_;
280 
FrameBuffer( final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread)281     public FrameBuffer(
282         final TNonblockingTransport trans,
283         final SelectionKey selectionKey,
284         final AbstractSelectThread selectThread)
285         throws TTransportException {
286       trans_ = trans;
287       selectionKey_ = selectionKey;
288       selectThread_ = selectThread;
289       buffer_ = ByteBuffer.allocate(4);
290 
291       frameTrans_ = new TMemoryInputTransport();
292       response_ = new TByteArrayOutputStream();
293       inTrans_ = inputTransportFactory_.getTransport(frameTrans_);
294       outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
295       inProt_ = inputProtocolFactory_.getProtocol(inTrans_);
296       outProt_ = outputProtocolFactory_.getProtocol(outTrans_);
297 
298       if (eventHandler_ != null) {
299         context_ = eventHandler_.createContext(inProt_, outProt_);
300       } else {
301         context_ = null;
302       }
303     }
304 
305     /**
306      * Give this FrameBuffer a chance to read. The selector loop should have received a read event
307      * for this FrameBuffer.
308      *
309      * @return true if the connection should live on, false if it should be closed
310      */
read()311     public boolean read() {
312       if (state_ == FrameBufferState.READING_FRAME_SIZE) {
313         // try to read the frame size completely
314         if (!internalRead()) {
315           return false;
316         }
317 
318         // if the frame size has been read completely, then prepare to read the
319         // actual frame.
320         if (buffer_.remaining() == 0) {
321           // pull out the frame size as an integer.
322           int frameSize = buffer_.getInt(0);
323           if (frameSize <= 0) {
324             LOGGER.error(
325                 "Read an invalid frame size of "
326                     + frameSize
327                     + ". Are you using TFramedTransport on the client side?");
328             return false;
329           }
330 
331           // if this frame will always be too large for this server, log the
332           // error and close the connection.
333           if (frameSize > trans_.getMaxFrameSize()) {
334             LOGGER.error(
335                 "Read a frame size of "
336                     + frameSize
337                     + ", which is bigger than the maximum allowable frame size "
338                     + trans_.getMaxFrameSize()
339                     + " for ALL connections.");
340             return false;
341           }
342 
343           // if this frame will push us over the memory limit, then return.
344           // with luck, more memory will free up the next time around.
345           if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
346             return true;
347           }
348 
349           // increment the amount of memory allocated to read buffers
350           readBufferBytesAllocated.addAndGet(frameSize + 4);
351 
352           // reallocate the readbuffer as a frame-sized buffer
353           buffer_ = ByteBuffer.allocate(frameSize + 4);
354           buffer_.putInt(frameSize);
355 
356           state_ = FrameBufferState.READING_FRAME;
357         } else {
358           // this skips the check of READING_FRAME state below, since we can't
359           // possibly go on to that state if there's data left to be read at
360           // this one.
361           return true;
362         }
363       }
364 
365       // it is possible to fall through from the READING_FRAME_SIZE section
366       // to READING_FRAME if there's already some frame data available once
367       // READING_FRAME_SIZE is complete.
368 
369       if (state_ == FrameBufferState.READING_FRAME) {
370         if (!internalRead()) {
371           return false;
372         }
373 
374         // since we're already in the select loop here for sure, we can just
375         // modify our selection key directly.
376         if (buffer_.remaining() == 0) {
377           // get rid of the read select interests
378           selectionKey_.interestOps(0);
379           state_ = FrameBufferState.READ_FRAME_COMPLETE;
380         }
381 
382         return true;
383       }
384 
385       // if we fall through to this point, then the state must be invalid.
386       LOGGER.error("Read was called but state is invalid (" + state_ + ")");
387       return false;
388     }
389 
390     /** Give this FrameBuffer a chance to write its output to the final client. */
write()391     public boolean write() {
392       if (state_ == FrameBufferState.WRITING) {
393         try {
394           if (trans_.write(buffer_) < 0) {
395             return false;
396           }
397         } catch (TTransportException e) {
398           LOGGER.warn("Got an Exception during write", e);
399           return false;
400         }
401 
402         // we're done writing. now we need to switch back to reading.
403         if (buffer_.remaining() == 0) {
404           prepareRead();
405         }
406         return true;
407       }
408 
409       LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
410       return false;
411     }
412 
413     /** Give this FrameBuffer a chance to set its interest to write, once data has come in. */
changeSelectInterests()414     public void changeSelectInterests() {
415       switch (state_) {
416         case AWAITING_REGISTER_WRITE:
417           // set the OP_WRITE interest
418           selectionKey_.interestOps(SelectionKey.OP_WRITE);
419           state_ = FrameBufferState.WRITING;
420           break;
421         case AWAITING_REGISTER_READ:
422           prepareRead();
423           break;
424         case AWAITING_CLOSE:
425           close();
426           selectionKey_.cancel();
427           break;
428         default:
429           LOGGER.error("changeSelectInterest was called, but state is invalid ({})", state_);
430       }
431     }
432 
433     /** Shut the connection down. */
close()434     public void close() {
435       // if we're being closed due to an error, we might have allocated a
436       // buffer that we need to subtract for our memory accounting.
437       if (state_ == FrameBufferState.READING_FRAME
438           || state_ == FrameBufferState.READ_FRAME_COMPLETE
439           || state_ == FrameBufferState.AWAITING_CLOSE) {
440         readBufferBytesAllocated.addAndGet(-buffer_.array().length);
441       }
442       trans_.close();
443       if (eventHandler_ != null) {
444         eventHandler_.deleteContext(context_, inProt_, outProt_);
445       }
446     }
447 
448     /** Check if this FrameBuffer has a full frame read. */
isFrameFullyRead()449     public boolean isFrameFullyRead() {
450       return state_ == FrameBufferState.READ_FRAME_COMPLETE;
451     }
452 
453     /**
454      * After the processor has processed the invocation, whatever thread is managing invocations
455      * should call this method on this FrameBuffer so we know it's time to start trying to write
456      * again. Also, if it turns out that there actually isn't any data in the response buffer, we'll
457      * skip trying to write and instead go back to reading.
458      */
responseReady()459     public void responseReady() {
460       // the read buffer is definitely no longer in use, so we will decrement
461       // our read buffer count. we do this here as well as in close because
462       // we'd like to free this read memory up as quickly as possible for other
463       // clients.
464       readBufferBytesAllocated.addAndGet(-buffer_.array().length);
465 
466       if (response_.len() == 0) {
467         // go straight to reading again. this was probably an oneway method
468         state_ = FrameBufferState.AWAITING_REGISTER_READ;
469         buffer_ = null;
470       } else {
471         buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
472 
473         // set state that we're waiting to be switched to write. we do this
474         // asynchronously through requestSelectInterestChange() because there is
475         // a possibility that we're not in the main thread, and thus currently
476         // blocked in select(). (this functionality is in place for the sake of
477         // the HsHa server.)
478         state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
479       }
480       requestSelectInterestChange();
481     }
482 
483     /** Actually invoke the method signified by this FrameBuffer. */
invoke()484     public void invoke() {
485       frameTrans_.reset(buffer_.array());
486       response_.reset();
487 
488       try {
489         if (eventHandler_ != null) {
490           eventHandler_.processContext(context_, inTrans_, outTrans_);
491         }
492         processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_);
493         responseReady();
494         return;
495       } catch (TException te) {
496         LOGGER.warn("Exception while invoking!", te);
497       } catch (Throwable t) {
498         LOGGER.error("Unexpected throwable while invoking!", t);
499       }
500       // This will only be reached when there is a throwable.
501       state_ = FrameBufferState.AWAITING_CLOSE;
502       requestSelectInterestChange();
503     }
504 
505     /**
506      * Perform a read into buffer.
507      *
508      * @return true if the read succeeded, false if there was an error or the connection closed.
509      */
internalRead()510     private boolean internalRead() {
511       try {
512         return trans_.read(buffer_) >= 0;
513       } catch (TTransportException e) {
514         LOGGER.warn("Got an Exception in internalRead", e);
515         return false;
516       }
517     }
518 
519     /** We're done writing, so reset our interest ops and change state accordingly. */
prepareRead()520     private void prepareRead() {
521       // we can set our interest directly without using the queue because
522       // we're in the select thread.
523       selectionKey_.interestOps(SelectionKey.OP_READ);
524       // get ready for another go-around
525       buffer_ = ByteBuffer.allocate(4);
526       state_ = FrameBufferState.READING_FRAME_SIZE;
527     }
528 
529     /**
530      * When this FrameBuffer needs to change its select interests and execution might not be in its
531      * select thread, then this method will make sure the interest change gets done when the select
532      * thread wakes back up. When the current thread is this FrameBuffer's select thread, then it
533      * just does the interest change immediately.
534      */
requestSelectInterestChange()535     protected void requestSelectInterestChange() {
536       if (Thread.currentThread() == this.selectThread_) {
537         changeSelectInterests();
538       } else {
539         this.selectThread_.requestSelectInterestChange(this);
540       }
541     }
542   } // FrameBuffer
543 
544   public class AsyncFrameBuffer extends FrameBuffer {
AsyncFrameBuffer( TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread)545     public AsyncFrameBuffer(
546         TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread)
547         throws TTransportException {
548       super(trans, selectionKey, selectThread);
549     }
550 
getInputProtocol()551     public TProtocol getInputProtocol() {
552       return inProt_;
553     }
554 
getOutputProtocol()555     public TProtocol getOutputProtocol() {
556       return outProt_;
557     }
558 
invoke()559     public void invoke() {
560       frameTrans_.reset(buffer_.array());
561       response_.reset();
562 
563       try {
564         if (eventHandler_ != null) {
565           eventHandler_.processContext(context_, inTrans_, outTrans_);
566         }
567         ((TAsyncProcessor) processorFactory_.getProcessor(inTrans_)).process(this);
568         return;
569       } catch (TException te) {
570         LOGGER.warn("Exception while invoking!", te);
571       } catch (Throwable t) {
572         LOGGER.error("Unexpected throwable while invoking!", t);
573       }
574       // This will only be reached when there is a throwable.
575       state_ = FrameBufferState.AWAITING_CLOSE;
576       requestSelectInterestChange();
577     }
578   }
579 }
580