1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <thrift/server/TNonblockingServer.h>
23 #include <thrift/concurrency/Exception.h>
24 #include <thrift/transport/TSocket.h>
25 #include <thrift/concurrency/ThreadFactory.h>
26 #include <thrift/transport/PlatformSocket.h>
27 
28 #include <algorithm>
29 #include <iostream>
30 
31 #ifdef HAVE_POLL_H
32 #include <poll.h>
33 #elif HAVE_SYS_POLL_H
34 #include <sys/poll.h>
35 #elif HAVE_SYS_SELECT_H
36 #include <sys/select.h>
37 #endif
38 
39 #ifdef HAVE_SYS_SOCKET_H
40 #include <sys/socket.h>
41 #endif
42 
43 #ifdef HAVE_NETINET_IN_H
44 #include <netinet/in.h>
45 #include <netinet/tcp.h>
46 #endif
47 
48 #ifdef HAVE_ARPA_INET_H
49 #include <arpa/inet.h>
50 #endif
51 
52 #ifdef HAVE_NETDB_H
53 #include <netdb.h>
54 #endif
55 
56 #ifdef HAVE_FCNTL_H
57 #include <fcntl.h>
58 #endif
59 
60 #include <assert.h>
61 
62 #ifdef HAVE_SCHED_H
63 #include <sched.h>
64 #endif
65 
66 #ifndef AF_LOCAL
67 #define AF_LOCAL AF_UNIX
68 #endif
69 
70 #ifdef HAVE_INTTYPES_H
71 #include <inttypes.h>
72 #endif
73 
74 #ifdef HAVE_STDINT_H
75 #include <stdint.h>
76 #endif
77 
78 namespace apache {
79 namespace thrift {
80 namespace server {
81 
82 using namespace apache::thrift::protocol;
83 using namespace apache::thrift::transport;
84 using namespace apache::thrift::concurrency;
85 using apache::thrift::transport::TSocket;
86 using apache::thrift::transport::TTransportException;
87 using std::shared_ptr;
88 
89 /// Three states for sockets: recv frame size, recv data, and send mode
90 enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
91 
92 /**
93  * Five states for the nonblocking server:
94  *  1) initialize
95  *  2) read 4 byte frame size
96  *  3) read frame of data
97  *  4) send back data (if any)
98  *  5) force immediate connection close
99  */
100 enum TAppState {
101   APP_INIT,
102   APP_READ_FRAME_SIZE,
103   APP_READ_REQUEST,
104   APP_WAIT_TASK,
105   APP_SEND_RESULT,
106   APP_CLOSE_CONNECTION
107 };
108 
109 /**
110  * Represents a connection that is handled via libevent. This connection
111  * essentially encapsulates a socket that has some associated libevent state.
112  */
113 class TNonblockingServer::TConnection {
114 private:
115   /// Server IO Thread handling this connection
116   TNonblockingIOThread* ioThread_;
117 
118   /// Server handle
119   TNonblockingServer* server_;
120 
121   /// TProcessor
122   std::shared_ptr<TProcessor> processor_;
123 
124   /// Object wrapping network socket
125   std::shared_ptr<TSocket> tSocket_;
126 
127   /// Libevent object
128   struct event event_;
129 
130   /// Libevent flags
131   short eventFlags_;
132 
133   /// Socket mode
134   TSocketState socketState_;
135 
136   /// Application state
137   TAppState appState_;
138 
139   /// How much data needed to read
140   uint32_t readWant_;
141 
142   /// Where in the read buffer are we
143   uint32_t readBufferPos_;
144 
145   /// Read buffer
146   uint8_t* readBuffer_;
147 
148   /// Read buffer size
149   uint32_t readBufferSize_;
150 
151   /// Write buffer
152   uint8_t* writeBuffer_;
153 
154   /// Write buffer size
155   uint32_t writeBufferSize_;
156 
157   /// How far through writing are we?
158   uint32_t writeBufferPos_;
159 
160   /// Largest size of write buffer seen since buffer was constructed
161   size_t largestWriteBufferSize_;
162 
163   /// Count of the number of calls for use with getResizeBufferEveryN().
164   int32_t callsForResize_;
165 
166   /// Transport to read from
167   std::shared_ptr<TMemoryBuffer> inputTransport_;
168 
169   /// Transport that processor writes to
170   std::shared_ptr<TMemoryBuffer> outputTransport_;
171 
172   /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
173   std::shared_ptr<TTransport> factoryInputTransport_;
174   std::shared_ptr<TTransport> factoryOutputTransport_;
175 
176   /// Protocol decoder
177   std::shared_ptr<TProtocol> inputProtocol_;
178 
179   /// Protocol encoder
180   std::shared_ptr<TProtocol> outputProtocol_;
181 
182   /// Server event handler, if any
183   std::shared_ptr<TServerEventHandler> serverEventHandler_;
184 
185   /// Thrift call context, if any
186   void* connectionContext_;
187 
188   /// Go into read mode
setRead()189   void setRead() { setFlags(EV_READ | EV_PERSIST); }
190 
191   /// Go into write mode
setWrite()192   void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }
193 
194   /// Set socket idle
setIdle()195   void setIdle() { setFlags(0); }
196 
197   /**
198    * Set event flags for this connection.
199    *
200    * @param eventFlags flags we pass to libevent for the connection.
201    */
202   void setFlags(short eventFlags);
203 
204   /**
205    * Libevent handler called (via our static wrapper) when the connection
206    * socket had something happen.  Rather than use the flags libevent passed,
207    * we use the connection state to determine whether we need to read or
208    * write the socket.
209    */
210   void workSocket();
211 
212 public:
213   class Task;
214 
215   /// Constructor
TConnection(std::shared_ptr<TSocket> socket,TNonblockingIOThread * ioThread)216   TConnection(std::shared_ptr<TSocket> socket,
217               TNonblockingIOThread* ioThread) {
218     readBuffer_ = nullptr;
219     readBufferSize_ = 0;
220 
221     ioThread_ = ioThread;
222     server_ = ioThread->getServer();
223 
224     // Allocate input and output transports these only need to be allocated
225     // once per TConnection (they don't need to be reallocated on init() call)
226     inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
227     outputTransport_.reset(
228         new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
229 
230     tSocket_ =  socket;
231 
232     init(ioThread);
233   }
234 
~TConnection()235   ~TConnection() { std::free(readBuffer_); }
236 
237   /// Close this connection and free or reset its resources.
238   void close();
239 
240   /**
241     * Check buffers against any size limits and shrink it if exceeded.
242     *
243     * @param readLimit we reduce read buffer size to this (if nonzero).
244     * @param writeLimit if nonzero and write buffer is larger, replace it.
245     */
246   void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
247 
248   /// Initialize
249   void init(TNonblockingIOThread* ioThread);
250 
251   /// set socket for connection
252   void setSocket(std::shared_ptr<TSocket> socket);
253 
254   /**
255    * This is called when the application transitions from one state into
256    * another. This means that it has finished writing the data that it needed
257    * to, or finished receiving the data that it needed to.
258    */
259   void transition();
260 
261   /**
262    * C-callable event handler for connection events.  Provides a callback
263    * that libevent can understand which invokes connection_->workSocket().
264    *
265    * @param fd the descriptor the event occurred on.
266    * @param which the flags associated with the event.
267    * @param v void* callback arg where we placed TConnection's "this".
268    */
eventHandler(evutil_socket_t fd,short,void * v)269   static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
270     assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
271     ((TConnection*)v)->workSocket();
272   }
273 
274   /**
275    * Notification to server that processing has ended on this request.
276    * Can be called either when processing is completed or when a waiting
277    * task has been preemptively terminated (on overload).
278    *
279    * Don't call this from the IO thread itself.
280    *
281    * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
282    */
notifyIOThread()283   bool notifyIOThread() { return ioThread_->notify(this); }
284 
285   /*
286    * Returns the number of this connection's currently assigned IO
287    * thread.
288    */
getIOThreadNumber() const289   int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }
290 
291   /// Force connection shutdown for this connection.
forceClose()292   void forceClose() {
293     appState_ = APP_CLOSE_CONNECTION;
294     if (!notifyIOThread()) {
295       server_->decrementActiveProcessors();
296       close();
297       throw TException("TConnection::forceClose: failed write on notify pipe");
298     }
299   }
300 
301   /// return the server this connection was initialized for.
getServer() const302   TNonblockingServer* getServer() const { return server_; }
303 
304   /// get state of connection.
getState() const305   TAppState getState() const { return appState_; }
306 
307   /// return the TSocket transport wrapping this network connection
getTSocket() const308   std::shared_ptr<TSocket> getTSocket() const { return tSocket_; }
309 
310   /// return the server event handler if any
getServerEventHandler()311   std::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }
312 
313   /// return the Thrift connection context if any
getConnectionContext()314   void* getConnectionContext() { return connectionContext_; }
315 };
316 
317 class TNonblockingServer::TConnection::Task : public Runnable {
318 public:
Task(std::shared_ptr<TProcessor> processor,std::shared_ptr<TProtocol> input,std::shared_ptr<TProtocol> output,TConnection * connection)319   Task(std::shared_ptr<TProcessor> processor,
320        std::shared_ptr<TProtocol> input,
321        std::shared_ptr<TProtocol> output,
322        TConnection* connection)
323     : processor_(processor),
324       input_(input),
325       output_(output),
326       connection_(connection),
327       serverEventHandler_(connection_->getServerEventHandler()),
328       connectionContext_(connection_->getConnectionContext()) {}
329 
run()330   void run() override {
331     try {
332       for (;;) {
333         if (serverEventHandler_) {
334           serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
335         }
336         if (!processor_->process(input_, output_, connectionContext_)
337             || !input_->getTransport()->peek()) {
338           break;
339         }
340       }
341     } catch (const TTransportException& ttx) {
342       GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
343     } catch (const std::bad_alloc&) {
344       GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
345       exit(1);
346     } catch (const std::exception& x) {
347       GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
348                           typeid(x).name(),
349                           x.what());
350     } catch (...) {
351       GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
352     }
353 
354     // Signal completion back to the libevent thread via a pipe
355     if (!connection_->notifyIOThread()) {
356       GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
357       connection_->server_->decrementActiveProcessors();
358       connection_->close();
359       throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
360     }
361   }
362 
getTConnection()363   TConnection* getTConnection() { return connection_; }
364 
365 private:
366   std::shared_ptr<TProcessor> processor_;
367   std::shared_ptr<TProtocol> input_;
368   std::shared_ptr<TProtocol> output_;
369   TConnection* connection_;
370   std::shared_ptr<TServerEventHandler> serverEventHandler_;
371   void* connectionContext_;
372 };
373 
init(TNonblockingIOThread * ioThread)374 void TNonblockingServer::TConnection::init(TNonblockingIOThread* ioThread) {
375   ioThread_ = ioThread;
376   server_ = ioThread->getServer();
377   appState_ = APP_INIT;
378   eventFlags_ = 0;
379 
380   readBufferPos_ = 0;
381   readWant_ = 0;
382 
383   writeBuffer_ = nullptr;
384   writeBufferSize_ = 0;
385   writeBufferPos_ = 0;
386   largestWriteBufferSize_ = 0;
387 
388   socketState_ = SOCKET_RECV_FRAMING;
389   callsForResize_ = 0;
390 
391   // get input/transports
392   factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
393   factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
394 
395   // Create protocol
396   if (server_->getHeaderTransport()) {
397     inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_,
398                                                                      factoryOutputTransport_);
399     outputProtocol_ = inputProtocol_;
400   } else {
401     inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
402     outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
403   }
404 
405   // Set up for any server event handler
406   serverEventHandler_ = server_->getEventHandler();
407   if (serverEventHandler_) {
408     connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
409   } else {
410     connectionContext_ = nullptr;
411   }
412 
413   // Get the processor
414   processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
415 }
416 
setSocket(std::shared_ptr<TSocket> socket)417 void TNonblockingServer::TConnection::setSocket(std::shared_ptr<TSocket> socket) {
418   tSocket_ = socket;
419 }
420 
workSocket()421 void TNonblockingServer::TConnection::workSocket() {
422   while (true) {
423     int got = 0, left = 0, sent = 0;
424     uint32_t fetch = 0;
425 
426     switch (socketState_) {
427     case SOCKET_RECV_FRAMING:
428       union {
429         uint8_t buf[sizeof(uint32_t)];
430         uint32_t size;
431       } framing;
432 
433       // if we've already received some bytes we kept them here
434       framing.size = readWant_;
435       // determine size of this frame
436       try {
437         // Read from the socket
438         fetch = tSocket_->read(&framing.buf[readBufferPos_],
439                                uint32_t(sizeof(framing.size) - readBufferPos_));
440         if (fetch == 0) {
441           // Whenever we get here it means a remote disconnect
442           close();
443           return;
444         }
445         readBufferPos_ += fetch;
446       } catch (TTransportException& te) {
447         //In Nonblocking SSLSocket some operations need to be retried again.
448         //Current approach is parsing exception message, but a better solution needs to be investigated.
449         if(!strstr(te.what(), "retry")) {
450           GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
451           close();
452 
453           return;
454         }
455       }
456 
457       if (readBufferPos_ < sizeof(framing.size)) {
458         // more needed before frame size is known -- save what we have so far
459         readWant_ = framing.size;
460         return;
461       }
462 
463       readWant_ = ntohl(framing.size);
464       if (readWant_ > server_->getMaxFrameSize()) {
465         // Don't allow giant frame sizes.  This prevents bad clients from
466         // causing us to try and allocate a giant buffer.
467         GlobalOutput.printf(
468             "TNonblockingServer: frame size too large "
469             "(%" PRIu32 " > %" PRIu64
470             ") from client %s. "
471             "Remote side not using TFramedTransport?",
472             readWant_,
473             (uint64_t)server_->getMaxFrameSize(),
474             tSocket_->getSocketInfo().c_str());
475         close();
476         return;
477       }
478       // size known; now get the rest of the frame
479       transition();
480 
481       // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
482       // regular sockets, because if there is more data, libevent will fire the event handler registered for read
483       // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
484       // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
485       // that case, not trying another processing cycle here would result in a hang as we will never get to work the socket,
486       // despite having more data.
487       if (tSocket_->hasPendingDataToRead())
488       {
489           continue;
490       }
491 
492       return;
493 
494     case SOCKET_RECV:
495       // It is an error to be in this state if we already have all the data
496       if (!(readBufferPos_ < readWant_)) {
497         GlobalOutput.printf("TNonblockingServer: frame size too short");
498         close();
499         return;
500       }
501 
502       try {
503         // Read from the socket
504         fetch = readWant_ - readBufferPos_;
505         got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
506       } catch (TTransportException& te) {
507         //In Nonblocking SSLSocket some operations need to be retried again.
508         //Current approach is parsing exception message, but a better solution needs to be investigated.
509         if(!strstr(te.what(), "retry")) {
510           GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
511           close();
512         }
513 
514         return;
515       }
516 
517       if (got > 0) {
518         // Move along in the buffer
519         readBufferPos_ += got;
520 
521         // Check that we did not overdo it
522         assert(readBufferPos_ <= readWant_);
523 
524         // We are done reading, move onto the next state
525         if (readBufferPos_ == readWant_) {
526           transition();
527           if (socketState_ == SOCKET_RECV_FRAMING && tSocket_->hasPendingDataToRead())
528           {
529               continue;
530           }
531         }
532         return;
533       }
534 
535       // Whenever we get down here it means a remote disconnect
536       close();
537 
538       return;
539 
540     case SOCKET_SEND:
541       // Should never have position past size
542       assert(writeBufferPos_ <= writeBufferSize_);
543 
544       // If there is no data to send, then let us move on
545       if (writeBufferPos_ == writeBufferSize_) {
546         GlobalOutput("WARNING: Send state with no data to send");
547         transition();
548         return;
549       }
550 
551       try {
552         left = writeBufferSize_ - writeBufferPos_;
553         sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
554       } catch (TTransportException& te) {
555         GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
556         close();
557         return;
558       }
559 
560       writeBufferPos_ += sent;
561 
562       // Did we overdo it?
563       assert(writeBufferPos_ <= writeBufferSize_);
564 
565       // We are done!
566       if (writeBufferPos_ == writeBufferSize_) {
567         transition();
568       }
569 
570       return;
571 
572     default:
573       GlobalOutput.printf("Unexpected Socket State %d", socketState_);
574       assert(0);
575       return;
576     }
577   }
578 }
579 
getHeaderTransport()580 bool TNonblockingServer::getHeaderTransport() {
581   // Currently if there is no output protocol factory,
582   // we assume header transport (without having to create
583   // a new transport and check)
584   return getOutputProtocolFactory() == nullptr;
585 }
586 
587 /**
588  * This is called when the application transitions from one state into
589  * another. This means that it has finished writing the data that it needed
590  * to, or finished receiving the data that it needed to.
591  */
transition()592 void TNonblockingServer::TConnection::transition() {
593   // ensure this connection is active right now
594   assert(ioThread_);
595   assert(server_);
596 
597   // Switch upon the state that we are currently in and move to a new state
598   switch (appState_) {
599 
600   case APP_READ_REQUEST:
601     // We are done reading the request, package the read buffer into transport
602     // and get back some data from the dispatch function
603     if (server_->getHeaderTransport()) {
604       inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
605       outputTransport_->resetBuffer();
606     } else {
607       // We saved room for the framing size in case header transport needed it,
608       // but just skip it for the non-header case
609       inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
610       outputTransport_->resetBuffer();
611 
612       // Prepend four bytes of blank space to the buffer so we can
613       // write the frame size there later.
614       outputTransport_->getWritePtr(4);
615       outputTransport_->wroteBytes(4);
616     }
617 
618     server_->incrementActiveProcessors();
619 
620     if (server_->isThreadPoolProcessing()) {
621       // We are setting up a Task to do this work and we will wait on it
622 
623       // Create task and dispatch to the thread manager
624       std::shared_ptr<Runnable> task = std::shared_ptr<Runnable>(
625           new Task(processor_, inputProtocol_, outputProtocol_, this));
626       // The application is now waiting on the task to finish
627       appState_ = APP_WAIT_TASK;
628 
629       // Set this connection idle so that libevent doesn't process more
630       // data on it while we're still waiting for the threadmanager to
631       // finish this task
632       setIdle();
633 
634       try {
635         server_->addTask(task);
636       } catch (IllegalStateException& ise) {
637         // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
638         GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
639         server_->decrementActiveProcessors();
640         close();
641       } catch (TimedOutException& to) {
642         GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
643         server_->decrementActiveProcessors();
644         close();
645       }
646 
647       return;
648     } else {
649       try {
650         if (serverEventHandler_) {
651           serverEventHandler_->processContext(connectionContext_, getTSocket());
652         }
653         // Invoke the processor
654         processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
655       } catch (const TTransportException& ttx) {
656         GlobalOutput.printf(
657             "TNonblockingServer transport error in "
658             "process(): %s",
659             ttx.what());
660         server_->decrementActiveProcessors();
661         close();
662         return;
663       } catch (const std::exception& x) {
664         GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
665                             typeid(x).name(),
666                             x.what());
667         server_->decrementActiveProcessors();
668         close();
669         return;
670       } catch (...) {
671         GlobalOutput.printf("Server::process() unknown exception");
672         server_->decrementActiveProcessors();
673         close();
674         return;
675       }
676     }
677     // fallthrough
678 
679   // Intentionally fall through here, the call to process has written into
680   // the writeBuffer_
681 
682   case APP_WAIT_TASK:
683     // We have now finished processing a task and the result has been written
684     // into the outputTransport_, so we grab its contents and place them into
685     // the writeBuffer_ for actual writing by the libevent thread
686 
687     server_->decrementActiveProcessors();
688     // Get the result of the operation
689     outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
690 
691     // If the function call generated return data, then move into the send
692     // state and get going
693     // 4 bytes were reserved for frame size
694     if (writeBufferSize_ > 4) {
695 
696       // Move into write state
697       writeBufferPos_ = 0;
698       socketState_ = SOCKET_SEND;
699 
700       // Put the frame size into the write buffer
701       auto frameSize = (int32_t)htonl(writeBufferSize_ - 4);
702       memcpy(writeBuffer_, &frameSize, 4);
703 
704       // Socket into write mode
705       appState_ = APP_SEND_RESULT;
706       setWrite();
707 
708       return;
709     }
710 
711     // In this case, the request was oneway and we should fall through
712     // right back into the read frame header state
713     goto LABEL_APP_INIT;
714 
715   case APP_SEND_RESULT:
716     // it's now safe to perform buffer size housekeeping.
717     if (writeBufferSize_ > largestWriteBufferSize_) {
718       largestWriteBufferSize_ = writeBufferSize_;
719     }
720     if (server_->getResizeBufferEveryN() > 0
721         && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
722       checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
723                               server_->getIdleWriteBufferLimit());
724       callsForResize_ = 0;
725     }
726     // fallthrough
727 
728   // N.B.: We also intentionally fall through here into the INIT state!
729 
730   LABEL_APP_INIT:
731   case APP_INIT:
732 
733     // Clear write buffer variables
734     writeBuffer_ = nullptr;
735     writeBufferPos_ = 0;
736     writeBufferSize_ = 0;
737 
738     // Into read4 state we go
739     socketState_ = SOCKET_RECV_FRAMING;
740     appState_ = APP_READ_FRAME_SIZE;
741 
742     readBufferPos_ = 0;
743 
744     // Register read event
745     setRead();
746 
747     return;
748 
749   case APP_READ_FRAME_SIZE:
750     readWant_ += 4;
751 
752     // We just read the request length
753     // Double the buffer size until it is big enough
754     if (readWant_ > readBufferSize_) {
755       if (readBufferSize_ == 0) {
756         readBufferSize_ = 1;
757       }
758       uint32_t newSize = readBufferSize_;
759       while (readWant_ > newSize) {
760         newSize *= 2;
761       }
762 
763       auto* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
764       if (newBuffer == nullptr) {
765         // nothing else to be done...
766         throw std::bad_alloc();
767       }
768       readBuffer_ = newBuffer;
769       readBufferSize_ = newSize;
770     }
771 
772     readBufferPos_ = 4;
773     *((uint32_t*)readBuffer_) = htonl(readWant_ - 4);
774 
775     // Move into read request state
776     socketState_ = SOCKET_RECV;
777     appState_ = APP_READ_REQUEST;
778 
779     return;
780 
781   case APP_CLOSE_CONNECTION:
782     server_->decrementActiveProcessors();
783     close();
784     return;
785 
786   default:
787     GlobalOutput.printf("Unexpected Application State %d", appState_);
788     assert(0);
789   }
790 }
791 
setFlags(short eventFlags)792 void TNonblockingServer::TConnection::setFlags(short eventFlags) {
793   // Catch the do nothing case
794   if (eventFlags_ == eventFlags) {
795     return;
796   }
797 
798   // Delete a previously existing event
799   if (eventFlags_ && event_del(&event_) == -1) {
800     GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR);
801     return;
802   }
803 
804   // Update in memory structure
805   eventFlags_ = eventFlags;
806 
807   // Do not call event_set if there are no flags
808   if (!eventFlags_) {
809     return;
810   }
811 
812   /*
813    * event_set:
814    *
815    * Prepares the event structure &event to be used in future calls to
816    * event_add() and event_del().  The event will be prepared to call the
817    * eventHandler using the 'sock' file descriptor to monitor events.
818    *
819    * The events can be either EV_READ, EV_WRITE, or both, indicating
820    * that an application can read or write from the file respectively without
821    * blocking.
822    *
823    * The eventHandler will be called with the file descriptor that triggered
824    * the event and the type of event which will be one of: EV_TIMEOUT,
825    * EV_SIGNAL, EV_READ, EV_WRITE.
826    *
827    * The additional flag EV_PERSIST makes an event_add() persistent until
828    * event_del() has been called.
829    *
830    * Once initialized, the &event struct can be used repeatedly with
831    * event_add() and event_del() and does not need to be reinitialized unless
832    * the eventHandler and/or the argument to it are to be changed.  However,
833    * when an ev structure has been added to libevent using event_add() the
834    * structure must persist until the event occurs (assuming EV_PERSIST
835    * is not set) or is removed using event_del().  You may not reuse the same
836    * ev structure for multiple monitored descriptors; each descriptor needs
837    * its own ev.
838    */
839   event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
840   event_base_set(ioThread_->getEventBase(), &event_);
841 
842   // Add the event
843   if (event_add(&event_, nullptr) == -1) {
844     GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR);
845   }
846 }
847 
848 /**
849  * Closes a connection
850  */
close()851 void TNonblockingServer::TConnection::close() {
852   setIdle();
853 
854   if (serverEventHandler_) {
855     serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
856   }
857   ioThread_ = nullptr;
858 
859   // Close the socket
860   tSocket_->close();
861 
862   // close any factory produced transports
863   factoryInputTransport_->close();
864   factoryOutputTransport_->close();
865 
866   // release processor and handler
867   processor_.reset();
868 
869   // Give this object back to the server that owns it
870   server_->returnConnection(this);
871 }
872 
checkIdleBufferMemLimit(size_t readLimit,size_t writeLimit)873 void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
874   if (readLimit > 0 && readBufferSize_ > readLimit) {
875     free(readBuffer_);
876     readBuffer_ = nullptr;
877     readBufferSize_ = 0;
878   }
879 
880   if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
881     // just start over
882     outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
883     largestWriteBufferSize_ = 0;
884   }
885 }
886 
~TNonblockingServer()887 TNonblockingServer::~TNonblockingServer() {
888   // Close any active connections (moves them to the idle connection stack)
889   while (activeConnections_.size()) {
890     activeConnections_.front()->close();
891   }
892   // Clean up unused TConnection objects in connectionStack_
893   while (!connectionStack_.empty()) {
894     TConnection* connection = connectionStack_.top();
895     connectionStack_.pop();
896     delete connection;
897   }
898   // The TNonblockingIOThread objects have shared_ptrs to the Thread
899   // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
900   // objects (as runnable) so these objects will never deallocate without help.
901   while (!ioThreads_.empty()) {
902     std::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
903     ioThreads_.pop_back();
904     iot->setThread(std::shared_ptr<Thread>());
905   }
906 }
907 
908 /**
909  * Creates a new connection either by reusing an object off the stack or
910  * by allocating a new one entirely
911  */
createConnection(std::shared_ptr<TSocket> socket)912 TNonblockingServer::TConnection* TNonblockingServer::createConnection(std::shared_ptr<TSocket> socket) {
913   // Check the stack
914   Guard g(connMutex_);
915 
916   // pick an IO thread to handle this connection -- currently round robin
917   assert(nextIOThread_ < ioThreads_.size());
918   int selectedThreadIdx = nextIOThread_;
919   nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size());
920 
921   TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
922 
923   // Check the connection stack to see if we can re-use
924   TConnection* result = nullptr;
925   if (connectionStack_.empty()) {
926     result = new TConnection(socket, ioThread);
927     ++numTConnections_;
928   } else {
929     result = connectionStack_.top();
930     connectionStack_.pop();
931     result->setSocket(socket);
932     result->init(ioThread);
933   }
934   activeConnections_.push_back(result);
935   return result;
936 }
937 
938 /**
939  * Returns a connection to the stack
940  */
returnConnection(TConnection * connection)941 void TNonblockingServer::returnConnection(TConnection* connection) {
942   Guard g(connMutex_);
943 
944   activeConnections_.erase(std::remove(activeConnections_.begin(),
945                                        activeConnections_.end(),
946                                        connection),
947                            activeConnections_.end());
948 
949   if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
950     delete connection;
951     --numTConnections_;
952   } else {
953     connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
954     connectionStack_.push(connection);
955   }
956 }
957 
958 /**
959  * Server socket had something happen.  We accept all waiting client
960  * connections on fd and assign TConnection objects to handle those requests.
961  */
handleEvent(THRIFT_SOCKET fd,short which)962 void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
963   (void)which;
964   // Make sure that libevent didn't mess up the socket handles
965   assert(fd == serverSocket_);
966 
967   // Going to accept a new client socket
968   std::shared_ptr<TSocket> clientSocket;
969 
970   clientSocket = serverTransport_->accept();
971   if (clientSocket) {
972     // If we're overloaded, take action here
973     if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
974       Guard g(connMutex_);
975       nConnectionsDropped_++;
976       nTotalConnectionsDropped_++;
977       if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
978         clientSocket->close();
979         return;
980       } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
981         if (!drainPendingTask()) {
982           // Nothing left to discard, so we drop connection instead.
983           clientSocket->close();
984           return;
985         }
986       }
987     }
988 
989     // Create a new TConnection for this client socket.
990     TConnection* clientConnection = createConnection(clientSocket);
991 
992     // Fail fast if we could not create a TConnection object
993     if (clientConnection == nullptr) {
994       GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
995       clientSocket->close();
996       return;
997     }
998 
999     /*
1000      * Either notify the ioThread that is assigned this connection to
1001      * start processing, or if it is us, we'll just ask this
1002      * connection to do its initial state change here.
1003      *
1004      * (We need to avoid writing to our own notification pipe, to
1005      * avoid possible deadlocks if the pipe is full.)
1006      *
1007      * The IO thread #0 is the only one that handles these listen
1008      * events, so unless the connection has been assigned to thread #0
1009      * we know it's not on our thread.
1010      */
1011     if (clientConnection->getIOThreadNumber() == 0) {
1012       clientConnection->transition();
1013     } else {
1014       if (!clientConnection->notifyIOThread()) {
1015         GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
1016         clientConnection->close();
1017       }
1018     }
1019   }
1020 }
1021 
1022 /**
1023  * Creates a socket to listen on and binds it to the local port.
1024  */
createAndListenOnSocket()1025 void TNonblockingServer::createAndListenOnSocket() {
1026   serverTransport_->listen();
1027   serverSocket_ = serverTransport_->getSocketFD();
1028 }
1029 
1030 
setThreadManager(std::shared_ptr<ThreadManager> threadManager)1031 void TNonblockingServer::setThreadManager(std::shared_ptr<ThreadManager> threadManager) {
1032   threadManager_ = threadManager;
1033   if (threadManager) {
1034     threadManager->setExpireCallback(
1035         std::bind(&TNonblockingServer::expireClose,
1036                                      this,
1037                                      std::placeholders::_1));
1038     threadPoolProcessing_ = true;
1039   } else {
1040     threadPoolProcessing_ = false;
1041   }
1042 }
1043 
serverOverloaded()1044 bool TNonblockingServer::serverOverloaded() {
1045   size_t activeConnections = numTConnections_ - connectionStack_.size();
1046   if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
1047     if (!overloaded_) {
1048       GlobalOutput.printf("TNonblockingServer: overload condition begun.");
1049       overloaded_ = true;
1050     }
1051   } else {
1052     if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
1053         && (activeConnections <= overloadHysteresis_ * maxConnections_)) {
1054       GlobalOutput.printf(
1055           "TNonblockingServer: overload ended; "
1056           "%u dropped (%llu total)",
1057           nConnectionsDropped_,
1058           nTotalConnectionsDropped_);
1059       nConnectionsDropped_ = 0;
1060       overloaded_ = false;
1061     }
1062   }
1063 
1064   return overloaded_;
1065 }
1066 
drainPendingTask()1067 bool TNonblockingServer::drainPendingTask() {
1068   if (threadManager_) {
1069     std::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1070     if (task) {
1071       TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1072       assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
1073       connection->forceClose();
1074       return true;
1075     }
1076   }
1077   return false;
1078 }
1079 
expireClose(std::shared_ptr<Runnable> task)1080 void TNonblockingServer::expireClose(std::shared_ptr<Runnable> task) {
1081   TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1082   assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
1083   connection->forceClose();
1084 }
1085 
stop()1086 void TNonblockingServer::stop() {
1087   // Breaks the event loop in all threads so that they end ASAP.
1088   for (auto & ioThread : ioThreads_) {
1089     ioThread->stop();
1090   }
1091 }
1092 
registerEvents(event_base * user_event_base)1093 void TNonblockingServer::registerEvents(event_base* user_event_base) {
1094   userEventBase_ = user_event_base;
1095 
1096   // init listen socket
1097   if (serverSocket_ == THRIFT_INVALID_SOCKET)
1098     createAndListenOnSocket();
1099 
1100   // set up the IO threads
1101   assert(ioThreads_.empty());
1102   if (!numIOThreads_) {
1103     numIOThreads_ = DEFAULT_IO_THREADS;
1104   }
1105   // User-provided event-base doesn't works for multi-threaded servers
1106   assert(numIOThreads_ == 1 || !userEventBase_);
1107 
1108   for (uint32_t id = 0; id < numIOThreads_; ++id) {
1109     // the first IO thread also does the listening on server socket
1110     THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
1111 
1112     shared_ptr<TNonblockingIOThread> thread(
1113         new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
1114     ioThreads_.push_back(thread);
1115   }
1116 
1117   // Notify handler of the preServe event
1118   if (eventHandler_) {
1119     eventHandler_->preServe();
1120   }
1121 
1122   // Start all of our helper IO threads. Note that the threads run forever,
1123   // only terminating if stop() is called.
1124   assert(ioThreads_.size() == numIOThreads_);
1125   assert(ioThreads_.size() > 0);
1126 
1127   GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
1128                       ioThreads_.size());
1129 
1130   // Launch all the secondary IO threads in separate threads
1131   if (ioThreads_.size() > 1) {
1132     ioThreadFactory_.reset(new ThreadFactory(
1133         false // detached
1134         ));
1135 
1136     assert(ioThreadFactory_.get());
1137 
1138     // intentionally starting at thread 1, not 0
1139     for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
1140       shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1141       ioThreads_[i]->setThread(thread);
1142       thread->start();
1143     }
1144   }
1145 
1146   // Register the events for the primary (listener) IO thread
1147   ioThreads_[0]->registerEvents();
1148 }
1149 
1150 /**
1151  * Main workhorse function, starts up the server listening on a port and
1152  * loops over the libevent handler.
1153  */
serve()1154 void TNonblockingServer::serve() {
1155 
1156   if (ioThreads_.empty())
1157     registerEvents(nullptr);
1158 
1159   // Run the primary (listener) IO thread loop in our main thread; this will
1160   // only return when the server is shutting down.
1161   ioThreads_[0]->run();
1162 
1163   // Ensure all threads are finished before exiting serve()
1164   for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
1165     ioThreads_[i]->join();
1166     GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1167   }
1168 }
1169 
TNonblockingIOThread(TNonblockingServer * server,int number,THRIFT_SOCKET listenSocket,bool useHighPriority)1170 TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1171                                            int number,
1172                                            THRIFT_SOCKET listenSocket,
1173                                            bool useHighPriority)
1174   : server_(server),
1175     number_(number),
1176     threadId_{},
1177     listenSocket_(listenSocket),
1178     useHighPriority_(useHighPriority),
1179     eventBase_(nullptr),
1180     ownEventBase_(false),
1181     serverEvent_{},
1182     notificationEvent_{} {
1183   notificationPipeFDs_[0] = -1;
1184   notificationPipeFDs_[1] = -1;
1185 }
1186 
~TNonblockingIOThread()1187 TNonblockingIOThread::~TNonblockingIOThread() {
1188   // make sure our associated thread is fully finished
1189   join();
1190 
1191   if (eventBase_ && ownEventBase_) {
1192     event_base_free(eventBase_);
1193     ownEventBase_ = false;
1194   }
1195 
1196   if (listenSocket_ != THRIFT_INVALID_SOCKET) {
1197     if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
1198       GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
1199     }
1200     listenSocket_ = THRIFT_INVALID_SOCKET;
1201   }
1202 
1203   for (auto notificationPipeFD : notificationPipeFDs_) {
1204     if (notificationPipeFD >= 0) {
1205       if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFD)) {
1206         GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
1207                             THRIFT_GET_SOCKET_ERROR);
1208       }
1209       notificationPipeFD = THRIFT_INVALID_SOCKET;
1210     }
1211   }
1212 }
1213 
createNotificationPipe()1214 void TNonblockingIOThread::createNotificationPipe() {
1215   if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
1216     GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
1217     throw TException("can't create notification pipe");
1218   }
1219   if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
1220       || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
1221     ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1222     ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1223     throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
1224   }
1225   for (auto notificationPipeFD : notificationPipeFDs_) {
1226 #if LIBEVENT_VERSION_NUMBER < 0x02000000
1227     int flags;
1228     if ((flags = THRIFT_FCNTL(notificationPipeFD, F_GETFD, 0)) < 0
1229         || THRIFT_FCNTL(notificationPipeFD, F_SETFD, flags | FD_CLOEXEC) < 0) {
1230 #else
1231     if (evutil_make_socket_closeonexec(notificationPipeFD) < 0) {
1232 #endif
1233       ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1234       ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1235       throw TException(
1236           "TNonblockingServer::createNotificationPipe() "
1237           "FD_CLOEXEC");
1238     }
1239   }
1240 }
1241 
1242 /**
1243  * Register the core libevent events onto the proper base.
1244  */
1245 void TNonblockingIOThread::registerEvents() {
1246   threadId_ = Thread::get_current();
1247 
1248   assert(eventBase_ == nullptr);
1249   eventBase_ = getServer()->getUserEventBase();
1250   if (eventBase_ == nullptr) {
1251     eventBase_ = event_base_new();
1252     ownEventBase_ = true;
1253   }
1254 
1255   // Print some libevent stats
1256   if (number_ == 0) {
1257     GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
1258                         event_get_version(),
1259                         event_base_get_method(eventBase_));
1260   }
1261 
1262   if (listenSocket_ != THRIFT_INVALID_SOCKET) {
1263     // Register the server event
1264     event_set(&serverEvent_,
1265               listenSocket_,
1266               EV_READ | EV_PERSIST,
1267               TNonblockingIOThread::listenHandler,
1268               server_);
1269     event_base_set(eventBase_, &serverEvent_);
1270 
1271     // Add the event and start up the server
1272     if (-1 == event_add(&serverEvent_, nullptr)) {
1273       throw TException(
1274           "TNonblockingServer::serve(): "
1275           "event_add() failed on server listen event");
1276     }
1277     GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
1278   }
1279 
1280   createNotificationPipe();
1281 
1282   // Create an event to be notified when a task finishes
1283   event_set(&notificationEvent_,
1284             getNotificationRecvFD(),
1285             EV_READ | EV_PERSIST,
1286             TNonblockingIOThread::notifyHandler,
1287             this);
1288 
1289   // Attach to the base
1290   event_base_set(eventBase_, &notificationEvent_);
1291 
1292   // Add the event and start up the server
1293   if (-1 == event_add(&notificationEvent_, nullptr)) {
1294     throw TException(
1295         "TNonblockingServer::serve(): "
1296         "event_add() failed on task-done notification event");
1297   }
1298   GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
1299 }
1300 
1301 bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
1302   auto fd = getNotificationSendFD();
1303   if (fd < 0) {
1304     return false;
1305   }
1306 
1307   int ret = -1;
1308   long kSize = sizeof(conn);
1309   const char * pos = (const char *)const_cast_sockopt(&conn);
1310 
1311 #if defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H)
1312   struct pollfd pfd = {fd, POLLOUT, 0};
1313 
1314   while (kSize > 0) {
1315     pfd.revents = 0;
1316     ret = poll(&pfd, 1, -1);
1317     if (ret < 0) {
1318       return false;
1319     } else if (ret == 0) {
1320       continue;
1321     }
1322 
1323     if (pfd.revents & POLLHUP || pfd.revents & POLLERR) {
1324       ::THRIFT_CLOSESOCKET(fd);
1325       return false;
1326     }
1327 
1328     if (pfd.revents & POLLOUT) {
1329       ret = send(fd, pos, kSize, 0);
1330       if (ret < 0) {
1331         if (errno == EAGAIN) {
1332           continue;
1333         }
1334 
1335         ::THRIFT_CLOSESOCKET(fd);
1336         return false;
1337       }
1338 
1339       kSize -= ret;
1340       pos += ret;
1341     }
1342   }
1343 #else
1344   fd_set wfds, efds;
1345 
1346   while (kSize > 0) {
1347     FD_ZERO(&wfds);
1348     FD_ZERO(&efds);
1349     FD_SET(fd, &wfds);
1350     FD_SET(fd, &efds);
1351     ret = select(static_cast<int>(fd + 1), nullptr, &wfds, &efds, nullptr);
1352     if (ret < 0) {
1353       return false;
1354     } else if (ret == 0) {
1355       continue;
1356     }
1357 
1358     if (FD_ISSET(fd, &efds)) {
1359       ::THRIFT_CLOSESOCKET(fd);
1360       return false;
1361     }
1362 
1363     if (FD_ISSET(fd, &wfds)) {
1364       ret = send(fd, pos, kSize, 0);
1365       if (ret < 0) {
1366         if (errno == EAGAIN) {
1367           continue;
1368         }
1369 
1370         ::THRIFT_CLOSESOCKET(fd);
1371         return false;
1372       }
1373 
1374       kSize -= ret;
1375       pos += ret;
1376     }
1377   }
1378 #endif
1379 
1380   return true;
1381 }
1382 
1383 /* static */
1384 void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
1385   auto* ioThread = (TNonblockingIOThread*)v;
1386   assert(ioThread);
1387   (void)which;
1388 
1389   while (true) {
1390     TNonblockingServer::TConnection* connection = nullptr;
1391     const int kSize = sizeof(connection);
1392     long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
1393     if (nBytes == kSize) {
1394       if (connection == nullptr) {
1395         // this is the command to stop our thread, exit the handler!
1396         ioThread->breakLoop(false);
1397         return;
1398       }
1399       connection->transition();
1400     } else if (nBytes > 0) {
1401       // throw away these bytes and hope that next time we get a solid read
1402       GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
1403       ioThread->breakLoop(true);
1404       return;
1405     } else if (nBytes == 0) {
1406       GlobalOutput.printf("notifyHandler: Notify socket closed!");
1407       ioThread->breakLoop(false);
1408       // exit the loop
1409       break;
1410     } else { // nBytes < 0
1411       if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
1412           && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
1413         GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
1414         ioThread->breakLoop(true);
1415         return;
1416       }
1417       // exit the loop
1418       break;
1419     }
1420   }
1421 }
1422 
1423 void TNonblockingIOThread::breakLoop(bool error) {
1424   if (error) {
1425     GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
1426     // TODO: figure out something better to do here, but for now kill the
1427     // whole process.
1428     GlobalOutput.printf("TNonblockingServer: aborting process.");
1429     ::abort();
1430   }
1431 
1432   // If we're running in the same thread, we can't use the notify(0)
1433   // mechanism to stop the thread, but happily if we're running in the
1434   // same thread, this means the thread can't be blocking in the event
1435   // loop either.
1436   if (!Thread::is_current(threadId_)) {
1437     notify(nullptr);
1438   } else {
1439     // cause the loop to stop ASAP - even if it has things to do in it
1440     event_base_loopbreak(eventBase_);
1441   }
1442 }
1443 
1444 void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
1445 #ifdef HAVE_SCHED_H
1446   // Start out with a standard, low-priority setup for the sched params.
1447   struct sched_param sp;
1448   memset(static_cast<void*>(&sp), 0, sizeof(sp));
1449   int policy = SCHED_OTHER;
1450 
1451   // If desired, set up high-priority sched params structure.
1452   if (value) {
1453     // FIFO scheduler, ranked above default SCHED_OTHER queue
1454     policy = SCHED_FIFO;
1455     // The priority only compares us to other SCHED_FIFO threads, so we
1456     // just pick a random priority halfway between min & max.
1457     const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
1458 
1459     sp.sched_priority = priority;
1460   }
1461 
1462   // Actually set the sched params for the current thread.
1463   if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
1464     GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
1465   } else {
1466     GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
1467   }
1468 #else
1469   THRIFT_UNUSED_VARIABLE(value);
1470 #endif
1471 }
1472 
1473 void TNonblockingIOThread::run() {
1474   if (eventBase_ == nullptr) {
1475     registerEvents();
1476   }
1477   if (useHighPriority_) {
1478     setCurrentThreadHighPriority(true);
1479   }
1480 
1481   if (eventBase_ != nullptr)
1482   {
1483     GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
1484     // Run libevent engine, never returns, invokes calls to eventHandler
1485     event_base_loop(eventBase_, 0);
1486 
1487     if (useHighPriority_) {
1488       setCurrentThreadHighPriority(false);
1489     }
1490 
1491     // cleans up our registered events
1492     cleanupEvents();
1493   }
1494 
1495   GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
1496 }
1497 
1498 void TNonblockingIOThread::cleanupEvents() {
1499   // stop the listen socket, if any
1500   if (listenSocket_ != THRIFT_INVALID_SOCKET) {
1501     if (event_del(&serverEvent_) == -1) {
1502       GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
1503     }
1504   }
1505 
1506   event_del(&notificationEvent_);
1507 }
1508 
1509 void TNonblockingIOThread::stop() {
1510   // This should cause the thread to fall out of its event loop ASAP.
1511   breakLoop(false);
1512 }
1513 
1514 void TNonblockingIOThread::join() {
1515   // If this was a thread created by a factory (not the thread that called
1516   // serve()), we join() it to make sure we shut down fully.
1517   if (thread_) {
1518     try {
1519       // Note that it is safe to both join() ourselves twice, as well as join
1520       // the current thread as the pthread implementation checks for deadlock.
1521       thread_->join();
1522     } catch (...) {
1523       // swallow everything
1524     }
1525   }
1526 }
1527 }
1528 }
1529 } // apache::thrift::server
1530