1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 package org.apache.thrift.server; 21 22 import java.io.IOException; 23 import java.nio.ByteBuffer; 24 import java.nio.channels.SelectionKey; 25 import java.nio.channels.Selector; 26 import java.nio.channels.spi.SelectorProvider; 27 import java.util.HashSet; 28 import java.util.Set; 29 import java.util.concurrent.atomic.AtomicLong; 30 import org.apache.thrift.TAsyncProcessor; 31 import org.apache.thrift.TByteArrayOutputStream; 32 import org.apache.thrift.TException; 33 import org.apache.thrift.protocol.TProtocol; 34 import org.apache.thrift.transport.TIOStreamTransport; 35 import org.apache.thrift.transport.TMemoryInputTransport; 36 import org.apache.thrift.transport.TNonblockingServerTransport; 37 import org.apache.thrift.transport.TNonblockingTransport; 38 import org.apache.thrift.transport.TTransport; 39 import org.apache.thrift.transport.TTransportException; 40 import org.apache.thrift.transport.layered.TFramedTransport; 41 import org.slf4j.Logger; 42 import org.slf4j.LoggerFactory; 43 44 /** Provides common methods and classes used by nonblocking TServer implementations. */ 45 public abstract class AbstractNonblockingServer extends TServer { 46 protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); 47 48 public abstract static class AbstractNonblockingServerArgs< 49 T extends AbstractNonblockingServerArgs<T>> 50 extends AbstractServerArgs<T> { 51 public long maxReadBufferBytes = 256 * 1024 * 1024; 52 AbstractNonblockingServerArgs(TNonblockingServerTransport transport)53 public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) { 54 super(transport); 55 transportFactory(new TFramedTransport.Factory()); 56 } 57 } 58 59 /** 60 * The maximum amount of memory we will allocate to client IO buffers at a time. Without this 61 * limit, the server will gladly allocate client buffers right into an out of memory exception, 62 * rather than waiting. 63 */ 64 final long MAX_READ_BUFFER_BYTES; 65 66 /** How many bytes are currently allocated to read buffers. */ 67 final AtomicLong readBufferBytesAllocated = new AtomicLong(0); 68 AbstractNonblockingServer(AbstractNonblockingServerArgs<?> args)69 public AbstractNonblockingServer(AbstractNonblockingServerArgs<?> args) { 70 super(args); 71 MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes; 72 } 73 74 /** Begin accepting connections and processing invocations. */ serve()75 public void serve() { 76 // start any IO threads 77 if (!startThreads()) { 78 return; 79 } 80 81 // start listening, or exit 82 if (!startListening()) { 83 return; 84 } 85 86 setServing(true); 87 88 // this will block while we serve 89 waitForShutdown(); 90 91 setServing(false); 92 93 // do a little cleanup 94 stopListening(); 95 } 96 97 /** 98 * Starts any threads required for serving. 99 * 100 * @return true if everything went ok, false if threads could not be started. 101 */ startThreads()102 protected abstract boolean startThreads(); 103 104 /** A method that will block until when threads handling the serving have been shut down. */ waitForShutdown()105 protected abstract void waitForShutdown(); 106 107 /** 108 * Have the server transport start accepting connections. 109 * 110 * @return true if we started listening successfully, false if something went wrong. 111 */ startListening()112 protected boolean startListening() { 113 try { 114 serverTransport_.listen(); 115 return true; 116 } catch (TTransportException ttx) { 117 LOGGER.error("Failed to start listening on server socket!", ttx); 118 return false; 119 } 120 } 121 122 /** Stop listening for connections. */ stopListening()123 protected void stopListening() { 124 serverTransport_.close(); 125 } 126 127 /** 128 * Perform an invocation. This method could behave several different ways - invoke immediately 129 * inline, queue for separate execution, etc. 130 * 131 * @return true if invocation was successfully requested, which is not a guarantee that invocation 132 * has completed. False if the request failed. 133 */ requestInvoke(FrameBuffer frameBuffer)134 protected abstract boolean requestInvoke(FrameBuffer frameBuffer); 135 136 /** 137 * An abstract thread that handles selecting on a set of transports and {@link FrameBuffer 138 * FrameBuffers} associated with selected keys corresponding to requests. 139 */ 140 protected abstract class AbstractSelectThread extends Thread { 141 protected Selector selector; 142 143 // List of FrameBuffers that want to change their selection interests. 144 protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>(); 145 AbstractSelectThread()146 public AbstractSelectThread() throws IOException { 147 this.selector = SelectorProvider.provider().openSelector(); 148 } 149 150 /** If the selector is blocked, wake it up. */ wakeupSelector()151 public void wakeupSelector() { 152 selector.wakeup(); 153 } 154 155 /** 156 * Add FrameBuffer to the list of select interest changes and wake up the selector if it's 157 * blocked. When the select() call exits, it'll give the FrameBuffer a chance to change its 158 * interests. 159 */ requestSelectInterestChange(FrameBuffer frameBuffer)160 public void requestSelectInterestChange(FrameBuffer frameBuffer) { 161 synchronized (selectInterestChanges) { 162 selectInterestChanges.add(frameBuffer); 163 } 164 // wakeup the selector, if it's currently blocked. 165 selector.wakeup(); 166 } 167 168 /** 169 * Check to see if there are any FrameBuffers that have switched their interest type from read 170 * to write or vice versa. 171 */ processInterestChanges()172 protected void processInterestChanges() { 173 synchronized (selectInterestChanges) { 174 for (FrameBuffer fb : selectInterestChanges) { 175 fb.changeSelectInterests(); 176 } 177 selectInterestChanges.clear(); 178 } 179 } 180 181 /** 182 * Do the work required to read from a readable client. If the frame is fully read, then invoke 183 * the method call. 184 */ handleRead(SelectionKey key)185 protected void handleRead(SelectionKey key) { 186 FrameBuffer buffer = (FrameBuffer) key.attachment(); 187 if (!buffer.read()) { 188 cleanupSelectionKey(key); 189 return; 190 } 191 192 // if the buffer's frame read is complete, invoke the method. 193 if (buffer.isFrameFullyRead() && !requestInvoke(buffer)) { 194 cleanupSelectionKey(key); 195 } 196 } 197 198 /** Let a writable client get written, if there's data to be written. */ handleWrite(SelectionKey key)199 protected void handleWrite(SelectionKey key) { 200 FrameBuffer buffer = (FrameBuffer) key.attachment(); 201 if (!buffer.write()) { 202 cleanupSelectionKey(key); 203 } 204 } 205 206 /** Do connection-close cleanup on a given SelectionKey. */ cleanupSelectionKey(SelectionKey key)207 protected void cleanupSelectionKey(SelectionKey key) { 208 // remove the records from the two maps 209 FrameBuffer buffer = (FrameBuffer) key.attachment(); 210 if (buffer != null) { 211 // close the buffer 212 buffer.close(); 213 } 214 // cancel the selection key 215 key.cancel(); 216 } 217 } // SelectThread 218 219 /** Possible states for the FrameBuffer state machine. */ 220 private enum FrameBufferState { 221 // in the midst of reading the frame size off the wire 222 READING_FRAME_SIZE, 223 // reading the actual frame data now, but not all the way done yet 224 READING_FRAME, 225 // completely read the frame, so an invocation can now happen 226 READ_FRAME_COMPLETE, 227 // waiting to get switched to listening for write events 228 AWAITING_REGISTER_WRITE, 229 // started writing response data, not fully complete yet 230 WRITING, 231 // another thread wants this framebuffer to go back to reading 232 AWAITING_REGISTER_READ, 233 // we want our transport and selection key invalidated in the selector 234 // thread 235 AWAITING_CLOSE 236 } 237 238 /** 239 * Class that implements a sort of state machine around the interaction with a client and an 240 * invoker. It manages reading the frame size and frame data, getting it handed off as wrapped 241 * transports, and then the writing of response data back to the client. In the process it manages 242 * flipping the read and write bits on the selection key for its client. 243 */ 244 public class FrameBuffer { 245 private final Logger LOGGER = LoggerFactory.getLogger(getClass().getName()); 246 247 // the actual transport hooked up to the client. 248 protected final TNonblockingTransport trans_; 249 250 // the SelectionKey that corresponds to our transport 251 protected final SelectionKey selectionKey_; 252 253 // the SelectThread that owns the registration of our transport 254 protected final AbstractSelectThread selectThread_; 255 256 // where in the process of reading/writing are we? 257 protected FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE; 258 259 // the ByteBuffer we'll be using to write and read, depending on the state 260 protected ByteBuffer buffer_; 261 262 protected final TByteArrayOutputStream response_; 263 264 // the frame that the TTransport should wrap. 265 protected final TMemoryInputTransport frameTrans_; 266 267 // the transport that should be used to connect to clients 268 protected final TTransport inTrans_; 269 270 protected final TTransport outTrans_; 271 272 // the input protocol to use on frames 273 protected final TProtocol inProt_; 274 275 // the output protocol to use on frames 276 protected final TProtocol outProt_; 277 278 // context associated with this connection 279 protected final ServerContext context_; 280 FrameBuffer( final TNonblockingTransport trans, final SelectionKey selectionKey, final AbstractSelectThread selectThread)281 public FrameBuffer( 282 final TNonblockingTransport trans, 283 final SelectionKey selectionKey, 284 final AbstractSelectThread selectThread) 285 throws TTransportException { 286 trans_ = trans; 287 selectionKey_ = selectionKey; 288 selectThread_ = selectThread; 289 buffer_ = ByteBuffer.allocate(4); 290 291 frameTrans_ = new TMemoryInputTransport(); 292 response_ = new TByteArrayOutputStream(); 293 inTrans_ = inputTransportFactory_.getTransport(frameTrans_); 294 outTrans_ = outputTransportFactory_.getTransport(new TIOStreamTransport(response_)); 295 inProt_ = inputProtocolFactory_.getProtocol(inTrans_); 296 outProt_ = outputProtocolFactory_.getProtocol(outTrans_); 297 298 if (eventHandler_ != null) { 299 context_ = eventHandler_.createContext(inProt_, outProt_); 300 } else { 301 context_ = null; 302 } 303 } 304 305 /** 306 * Give this FrameBuffer a chance to read. The selector loop should have received a read event 307 * for this FrameBuffer. 308 * 309 * @return true if the connection should live on, false if it should be closed 310 */ read()311 public boolean read() { 312 if (state_ == FrameBufferState.READING_FRAME_SIZE) { 313 // try to read the frame size completely 314 if (!internalRead()) { 315 return false; 316 } 317 318 // if the frame size has been read completely, then prepare to read the 319 // actual frame. 320 if (buffer_.remaining() == 0) { 321 // pull out the frame size as an integer. 322 int frameSize = buffer_.getInt(0); 323 if (frameSize <= 0) { 324 LOGGER.error( 325 "Read an invalid frame size of " 326 + frameSize 327 + ". Are you using TFramedTransport on the client side?"); 328 return false; 329 } 330 331 // if this frame will always be too large for this server, log the 332 // error and close the connection. 333 if (frameSize > trans_.getMaxFrameSize()) { 334 LOGGER.error( 335 "Read a frame size of " 336 + frameSize 337 + ", which is bigger than the maximum allowable frame size " 338 + trans_.getMaxFrameSize() 339 + " for ALL connections."); 340 return false; 341 } 342 343 // if this frame will push us over the memory limit, then return. 344 // with luck, more memory will free up the next time around. 345 if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) { 346 return true; 347 } 348 349 // increment the amount of memory allocated to read buffers 350 readBufferBytesAllocated.addAndGet(frameSize + 4); 351 352 // reallocate the readbuffer as a frame-sized buffer 353 buffer_ = ByteBuffer.allocate(frameSize + 4); 354 buffer_.putInt(frameSize); 355 356 state_ = FrameBufferState.READING_FRAME; 357 } else { 358 // this skips the check of READING_FRAME state below, since we can't 359 // possibly go on to that state if there's data left to be read at 360 // this one. 361 return true; 362 } 363 } 364 365 // it is possible to fall through from the READING_FRAME_SIZE section 366 // to READING_FRAME if there's already some frame data available once 367 // READING_FRAME_SIZE is complete. 368 369 if (state_ == FrameBufferState.READING_FRAME) { 370 if (!internalRead()) { 371 return false; 372 } 373 374 // since we're already in the select loop here for sure, we can just 375 // modify our selection key directly. 376 if (buffer_.remaining() == 0) { 377 // get rid of the read select interests 378 selectionKey_.interestOps(0); 379 state_ = FrameBufferState.READ_FRAME_COMPLETE; 380 } 381 382 return true; 383 } 384 385 // if we fall through to this point, then the state must be invalid. 386 LOGGER.error("Read was called but state is invalid (" + state_ + ")"); 387 return false; 388 } 389 390 /** Give this FrameBuffer a chance to write its output to the final client. */ write()391 public boolean write() { 392 if (state_ == FrameBufferState.WRITING) { 393 try { 394 if (trans_.write(buffer_) < 0) { 395 return false; 396 } 397 } catch (TTransportException e) { 398 LOGGER.warn("Got an Exception during write", e); 399 return false; 400 } 401 402 // we're done writing. now we need to switch back to reading. 403 if (buffer_.remaining() == 0) { 404 prepareRead(); 405 } 406 return true; 407 } 408 409 LOGGER.error("Write was called, but state is invalid (" + state_ + ")"); 410 return false; 411 } 412 413 /** Give this FrameBuffer a chance to set its interest to write, once data has come in. */ changeSelectInterests()414 public void changeSelectInterests() { 415 switch (state_) { 416 case AWAITING_REGISTER_WRITE: 417 // set the OP_WRITE interest 418 selectionKey_.interestOps(SelectionKey.OP_WRITE); 419 state_ = FrameBufferState.WRITING; 420 break; 421 case AWAITING_REGISTER_READ: 422 prepareRead(); 423 break; 424 case AWAITING_CLOSE: 425 close(); 426 selectionKey_.cancel(); 427 break; 428 default: 429 LOGGER.error("changeSelectInterest was called, but state is invalid ({})", state_); 430 } 431 } 432 433 /** Shut the connection down. */ close()434 public void close() { 435 // if we're being closed due to an error, we might have allocated a 436 // buffer that we need to subtract for our memory accounting. 437 if (state_ == FrameBufferState.READING_FRAME 438 || state_ == FrameBufferState.READ_FRAME_COMPLETE 439 || state_ == FrameBufferState.AWAITING_CLOSE) { 440 readBufferBytesAllocated.addAndGet(-buffer_.array().length); 441 } 442 trans_.close(); 443 if (eventHandler_ != null) { 444 eventHandler_.deleteContext(context_, inProt_, outProt_); 445 } 446 } 447 448 /** Check if this FrameBuffer has a full frame read. */ isFrameFullyRead()449 public boolean isFrameFullyRead() { 450 return state_ == FrameBufferState.READ_FRAME_COMPLETE; 451 } 452 453 /** 454 * After the processor has processed the invocation, whatever thread is managing invocations 455 * should call this method on this FrameBuffer so we know it's time to start trying to write 456 * again. Also, if it turns out that there actually isn't any data in the response buffer, we'll 457 * skip trying to write and instead go back to reading. 458 */ responseReady()459 public void responseReady() { 460 // the read buffer is definitely no longer in use, so we will decrement 461 // our read buffer count. we do this here as well as in close because 462 // we'd like to free this read memory up as quickly as possible for other 463 // clients. 464 readBufferBytesAllocated.addAndGet(-buffer_.array().length); 465 466 if (response_.len() == 0) { 467 // go straight to reading again. this was probably an oneway method 468 state_ = FrameBufferState.AWAITING_REGISTER_READ; 469 buffer_ = null; 470 } else { 471 buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len()); 472 473 // set state that we're waiting to be switched to write. we do this 474 // asynchronously through requestSelectInterestChange() because there is 475 // a possibility that we're not in the main thread, and thus currently 476 // blocked in select(). (this functionality is in place for the sake of 477 // the HsHa server.) 478 state_ = FrameBufferState.AWAITING_REGISTER_WRITE; 479 } 480 requestSelectInterestChange(); 481 } 482 483 /** Actually invoke the method signified by this FrameBuffer. */ invoke()484 public void invoke() { 485 frameTrans_.reset(buffer_.array()); 486 response_.reset(); 487 488 try { 489 if (eventHandler_ != null) { 490 eventHandler_.processContext(context_, inTrans_, outTrans_); 491 } 492 processorFactory_.getProcessor(inTrans_).process(inProt_, outProt_); 493 responseReady(); 494 return; 495 } catch (TException te) { 496 LOGGER.warn("Exception while invoking!", te); 497 } catch (Throwable t) { 498 LOGGER.error("Unexpected throwable while invoking!", t); 499 } 500 // This will only be reached when there is a throwable. 501 state_ = FrameBufferState.AWAITING_CLOSE; 502 requestSelectInterestChange(); 503 } 504 505 /** 506 * Perform a read into buffer. 507 * 508 * @return true if the read succeeded, false if there was an error or the connection closed. 509 */ internalRead()510 private boolean internalRead() { 511 try { 512 return trans_.read(buffer_) >= 0; 513 } catch (TTransportException e) { 514 LOGGER.warn("Got an Exception in internalRead", e); 515 return false; 516 } 517 } 518 519 /** We're done writing, so reset our interest ops and change state accordingly. */ prepareRead()520 private void prepareRead() { 521 // we can set our interest directly without using the queue because 522 // we're in the select thread. 523 selectionKey_.interestOps(SelectionKey.OP_READ); 524 // get ready for another go-around 525 buffer_ = ByteBuffer.allocate(4); 526 state_ = FrameBufferState.READING_FRAME_SIZE; 527 } 528 529 /** 530 * When this FrameBuffer needs to change its select interests and execution might not be in its 531 * select thread, then this method will make sure the interest change gets done when the select 532 * thread wakes back up. When the current thread is this FrameBuffer's select thread, then it 533 * just does the interest change immediately. 534 */ requestSelectInterestChange()535 protected void requestSelectInterestChange() { 536 if (Thread.currentThread() == this.selectThread_) { 537 changeSelectInterests(); 538 } else { 539 this.selectThread_.requestSelectInterestChange(this); 540 } 541 } 542 } // FrameBuffer 543 544 public class AsyncFrameBuffer extends FrameBuffer { AsyncFrameBuffer( TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread)545 public AsyncFrameBuffer( 546 TNonblockingTransport trans, SelectionKey selectionKey, AbstractSelectThread selectThread) 547 throws TTransportException { 548 super(trans, selectionKey, selectThread); 549 } 550 getInputProtocol()551 public TProtocol getInputProtocol() { 552 return inProt_; 553 } 554 getOutputProtocol()555 public TProtocol getOutputProtocol() { 556 return outProt_; 557 } 558 invoke()559 public void invoke() { 560 frameTrans_.reset(buffer_.array()); 561 response_.reset(); 562 563 try { 564 if (eventHandler_ != null) { 565 eventHandler_.processContext(context_, inTrans_, outTrans_); 566 } 567 ((TAsyncProcessor) processorFactory_.getProcessor(inTrans_)).process(this); 568 return; 569 } catch (TException te) { 570 LOGGER.warn("Exception while invoking!", te); 571 } catch (Throwable t) { 572 LOGGER.error("Unexpected throwable while invoking!", t); 573 } 574 // This will only be reached when there is a throwable. 575 state_ = FrameBufferState.AWAITING_CLOSE; 576 requestSelectInterestChange(); 577 } 578 } 579 } 580