1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/thrift-config.h>
21
22 #include <thrift/server/TNonblockingServer.h>
23 #include <thrift/concurrency/Exception.h>
24 #include <thrift/transport/TSocket.h>
25 #include <thrift/concurrency/ThreadFactory.h>
26 #include <thrift/transport/PlatformSocket.h>
27
28 #include <algorithm>
29 #include <iostream>
30
31 #ifdef HAVE_POLL_H
32 #include <poll.h>
33 #elif HAVE_SYS_POLL_H
34 #include <sys/poll.h>
35 #elif HAVE_SYS_SELECT_H
36 #include <sys/select.h>
37 #endif
38
39 #ifdef HAVE_SYS_SOCKET_H
40 #include <sys/socket.h>
41 #endif
42
43 #ifdef HAVE_NETINET_IN_H
44 #include <netinet/in.h>
45 #include <netinet/tcp.h>
46 #endif
47
48 #ifdef HAVE_ARPA_INET_H
49 #include <arpa/inet.h>
50 #endif
51
52 #ifdef HAVE_NETDB_H
53 #include <netdb.h>
54 #endif
55
56 #ifdef HAVE_FCNTL_H
57 #include <fcntl.h>
58 #endif
59
60 #include <assert.h>
61
62 #ifdef HAVE_SCHED_H
63 #include <sched.h>
64 #endif
65
66 #ifndef AF_LOCAL
67 #define AF_LOCAL AF_UNIX
68 #endif
69
70 #ifdef HAVE_INTTYPES_H
71 #include <inttypes.h>
72 #endif
73
74 #ifdef HAVE_STDINT_H
75 #include <stdint.h>
76 #endif
77
78 namespace apache {
79 namespace thrift {
80 namespace server {
81
82 using namespace apache::thrift::protocol;
83 using namespace apache::thrift::transport;
84 using namespace apache::thrift::concurrency;
85 using apache::thrift::transport::TSocket;
86 using apache::thrift::transport::TTransportException;
87 using std::shared_ptr;
88
89 /// Three states for sockets: recv frame size, recv data, and send mode
90 enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND };
91
92 /**
93 * Five states for the nonblocking server:
94 * 1) initialize
95 * 2) read 4 byte frame size
96 * 3) read frame of data
97 * 4) send back data (if any)
98 * 5) force immediate connection close
99 */
100 enum TAppState {
101 APP_INIT,
102 APP_READ_FRAME_SIZE,
103 APP_READ_REQUEST,
104 APP_WAIT_TASK,
105 APP_SEND_RESULT,
106 APP_CLOSE_CONNECTION
107 };
108
109 /**
110 * Represents a connection that is handled via libevent. This connection
111 * essentially encapsulates a socket that has some associated libevent state.
112 */
113 class TNonblockingServer::TConnection {
114 private:
115 /// Server IO Thread handling this connection
116 TNonblockingIOThread* ioThread_;
117
118 /// Server handle
119 TNonblockingServer* server_;
120
121 /// TProcessor
122 std::shared_ptr<TProcessor> processor_;
123
124 /// Object wrapping network socket
125 std::shared_ptr<TSocket> tSocket_;
126
127 /// Libevent object
128 struct event event_;
129
130 /// Libevent flags
131 short eventFlags_;
132
133 /// Socket mode
134 TSocketState socketState_;
135
136 /// Application state
137 TAppState appState_;
138
139 /// How much data needed to read
140 uint32_t readWant_;
141
142 /// Where in the read buffer are we
143 uint32_t readBufferPos_;
144
145 /// Read buffer
146 uint8_t* readBuffer_;
147
148 /// Read buffer size
149 uint32_t readBufferSize_;
150
151 /// Write buffer
152 uint8_t* writeBuffer_;
153
154 /// Write buffer size
155 uint32_t writeBufferSize_;
156
157 /// How far through writing are we?
158 uint32_t writeBufferPos_;
159
160 /// Largest size of write buffer seen since buffer was constructed
161 size_t largestWriteBufferSize_;
162
163 /// Count of the number of calls for use with getResizeBufferEveryN().
164 int32_t callsForResize_;
165
166 /// Transport to read from
167 std::shared_ptr<TMemoryBuffer> inputTransport_;
168
169 /// Transport that processor writes to
170 std::shared_ptr<TMemoryBuffer> outputTransport_;
171
172 /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
173 std::shared_ptr<TTransport> factoryInputTransport_;
174 std::shared_ptr<TTransport> factoryOutputTransport_;
175
176 /// Protocol decoder
177 std::shared_ptr<TProtocol> inputProtocol_;
178
179 /// Protocol encoder
180 std::shared_ptr<TProtocol> outputProtocol_;
181
182 /// Server event handler, if any
183 std::shared_ptr<TServerEventHandler> serverEventHandler_;
184
185 /// Thrift call context, if any
186 void* connectionContext_;
187
188 /// Go into read mode
setRead()189 void setRead() { setFlags(EV_READ | EV_PERSIST); }
190
191 /// Go into write mode
setWrite()192 void setWrite() { setFlags(EV_WRITE | EV_PERSIST); }
193
194 /// Set socket idle
setIdle()195 void setIdle() { setFlags(0); }
196
197 /**
198 * Set event flags for this connection.
199 *
200 * @param eventFlags flags we pass to libevent for the connection.
201 */
202 void setFlags(short eventFlags);
203
204 /**
205 * Libevent handler called (via our static wrapper) when the connection
206 * socket had something happen. Rather than use the flags libevent passed,
207 * we use the connection state to determine whether we need to read or
208 * write the socket.
209 */
210 void workSocket();
211
212 public:
213 class Task;
214
215 /// Constructor
TConnection(std::shared_ptr<TSocket> socket,TNonblockingIOThread * ioThread)216 TConnection(std::shared_ptr<TSocket> socket,
217 TNonblockingIOThread* ioThread) {
218 readBuffer_ = nullptr;
219 readBufferSize_ = 0;
220
221 ioThread_ = ioThread;
222 server_ = ioThread->getServer();
223
224 // Allocate input and output transports these only need to be allocated
225 // once per TConnection (they don't need to be reallocated on init() call)
226 inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
227 outputTransport_.reset(
228 new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
229
230 tSocket_ = socket;
231
232 init(ioThread);
233 }
234
~TConnection()235 ~TConnection() { std::free(readBuffer_); }
236
237 /// Close this connection and free or reset its resources.
238 void close();
239
240 /**
241 * Check buffers against any size limits and shrink it if exceeded.
242 *
243 * @param readLimit we reduce read buffer size to this (if nonzero).
244 * @param writeLimit if nonzero and write buffer is larger, replace it.
245 */
246 void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
247
248 /// Initialize
249 void init(TNonblockingIOThread* ioThread);
250
251 /// set socket for connection
252 void setSocket(std::shared_ptr<TSocket> socket);
253
254 /**
255 * This is called when the application transitions from one state into
256 * another. This means that it has finished writing the data that it needed
257 * to, or finished receiving the data that it needed to.
258 */
259 void transition();
260
261 /**
262 * C-callable event handler for connection events. Provides a callback
263 * that libevent can understand which invokes connection_->workSocket().
264 *
265 * @param fd the descriptor the event occurred on.
266 * @param which the flags associated with the event.
267 * @param v void* callback arg where we placed TConnection's "this".
268 */
eventHandler(evutil_socket_t fd,short,void * v)269 static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
270 assert(fd == static_cast<evutil_socket_t>(((TConnection*)v)->getTSocket()->getSocketFD()));
271 ((TConnection*)v)->workSocket();
272 }
273
274 /**
275 * Notification to server that processing has ended on this request.
276 * Can be called either when processing is completed or when a waiting
277 * task has been preemptively terminated (on overload).
278 *
279 * Don't call this from the IO thread itself.
280 *
281 * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR).
282 */
notifyIOThread()283 bool notifyIOThread() { return ioThread_->notify(this); }
284
285 /*
286 * Returns the number of this connection's currently assigned IO
287 * thread.
288 */
getIOThreadNumber() const289 int getIOThreadNumber() const { return ioThread_->getThreadNumber(); }
290
291 /// Force connection shutdown for this connection.
forceClose()292 void forceClose() {
293 appState_ = APP_CLOSE_CONNECTION;
294 if (!notifyIOThread()) {
295 server_->decrementActiveProcessors();
296 close();
297 throw TException("TConnection::forceClose: failed write on notify pipe");
298 }
299 }
300
301 /// return the server this connection was initialized for.
getServer() const302 TNonblockingServer* getServer() const { return server_; }
303
304 /// get state of connection.
getState() const305 TAppState getState() const { return appState_; }
306
307 /// return the TSocket transport wrapping this network connection
getTSocket() const308 std::shared_ptr<TSocket> getTSocket() const { return tSocket_; }
309
310 /// return the server event handler if any
getServerEventHandler()311 std::shared_ptr<TServerEventHandler> getServerEventHandler() { return serverEventHandler_; }
312
313 /// return the Thrift connection context if any
getConnectionContext()314 void* getConnectionContext() { return connectionContext_; }
315 };
316
317 class TNonblockingServer::TConnection::Task : public Runnable {
318 public:
Task(std::shared_ptr<TProcessor> processor,std::shared_ptr<TProtocol> input,std::shared_ptr<TProtocol> output,TConnection * connection)319 Task(std::shared_ptr<TProcessor> processor,
320 std::shared_ptr<TProtocol> input,
321 std::shared_ptr<TProtocol> output,
322 TConnection* connection)
323 : processor_(processor),
324 input_(input),
325 output_(output),
326 connection_(connection),
327 serverEventHandler_(connection_->getServerEventHandler()),
328 connectionContext_(connection_->getConnectionContext()) {}
329
run()330 void run() override {
331 try {
332 for (;;) {
333 if (serverEventHandler_) {
334 serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
335 }
336 if (!processor_->process(input_, output_, connectionContext_)
337 || !input_->getTransport()->peek()) {
338 break;
339 }
340 }
341 } catch (const TTransportException& ttx) {
342 GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
343 } catch (const std::bad_alloc&) {
344 GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
345 exit(1);
346 } catch (const std::exception& x) {
347 GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
348 typeid(x).name(),
349 x.what());
350 } catch (...) {
351 GlobalOutput.printf("TNonblockingServer: unknown exception while processing.");
352 }
353
354 // Signal completion back to the libevent thread via a pipe
355 if (!connection_->notifyIOThread()) {
356 GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing.");
357 connection_->server_->decrementActiveProcessors();
358 connection_->close();
359 throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
360 }
361 }
362
getTConnection()363 TConnection* getTConnection() { return connection_; }
364
365 private:
366 std::shared_ptr<TProcessor> processor_;
367 std::shared_ptr<TProtocol> input_;
368 std::shared_ptr<TProtocol> output_;
369 TConnection* connection_;
370 std::shared_ptr<TServerEventHandler> serverEventHandler_;
371 void* connectionContext_;
372 };
373
init(TNonblockingIOThread * ioThread)374 void TNonblockingServer::TConnection::init(TNonblockingIOThread* ioThread) {
375 ioThread_ = ioThread;
376 server_ = ioThread->getServer();
377 appState_ = APP_INIT;
378 eventFlags_ = 0;
379
380 readBufferPos_ = 0;
381 readWant_ = 0;
382
383 writeBuffer_ = nullptr;
384 writeBufferSize_ = 0;
385 writeBufferPos_ = 0;
386 largestWriteBufferSize_ = 0;
387
388 socketState_ = SOCKET_RECV_FRAMING;
389 callsForResize_ = 0;
390
391 // get input/transports
392 factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_);
393 factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
394
395 // Create protocol
396 if (server_->getHeaderTransport()) {
397 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_,
398 factoryOutputTransport_);
399 outputProtocol_ = inputProtocol_;
400 } else {
401 inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
402 outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
403 }
404
405 // Set up for any server event handler
406 serverEventHandler_ = server_->getEventHandler();
407 if (serverEventHandler_) {
408 connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
409 } else {
410 connectionContext_ = nullptr;
411 }
412
413 // Get the processor
414 processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
415 }
416
setSocket(std::shared_ptr<TSocket> socket)417 void TNonblockingServer::TConnection::setSocket(std::shared_ptr<TSocket> socket) {
418 tSocket_ = socket;
419 }
420
workSocket()421 void TNonblockingServer::TConnection::workSocket() {
422 while (true) {
423 int got = 0, left = 0, sent = 0;
424 uint32_t fetch = 0;
425
426 switch (socketState_) {
427 case SOCKET_RECV_FRAMING:
428 union {
429 uint8_t buf[sizeof(uint32_t)];
430 uint32_t size;
431 } framing;
432
433 // if we've already received some bytes we kept them here
434 framing.size = readWant_;
435 // determine size of this frame
436 try {
437 // Read from the socket
438 fetch = tSocket_->read(&framing.buf[readBufferPos_],
439 uint32_t(sizeof(framing.size) - readBufferPos_));
440 if (fetch == 0) {
441 // Whenever we get here it means a remote disconnect
442 close();
443 return;
444 }
445 readBufferPos_ += fetch;
446 } catch (TTransportException& te) {
447 //In Nonblocking SSLSocket some operations need to be retried again.
448 //Current approach is parsing exception message, but a better solution needs to be investigated.
449 if(!strstr(te.what(), "retry")) {
450 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
451 close();
452
453 return;
454 }
455 }
456
457 if (readBufferPos_ < sizeof(framing.size)) {
458 // more needed before frame size is known -- save what we have so far
459 readWant_ = framing.size;
460 return;
461 }
462
463 readWant_ = ntohl(framing.size);
464 if (readWant_ > server_->getMaxFrameSize()) {
465 // Don't allow giant frame sizes. This prevents bad clients from
466 // causing us to try and allocate a giant buffer.
467 GlobalOutput.printf(
468 "TNonblockingServer: frame size too large "
469 "(%" PRIu32 " > %" PRIu64
470 ") from client %s. "
471 "Remote side not using TFramedTransport?",
472 readWant_,
473 (uint64_t)server_->getMaxFrameSize(),
474 tSocket_->getSocketInfo().c_str());
475 close();
476 return;
477 }
478 // size known; now get the rest of the frame
479 transition();
480
481 // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for
482 // regular sockets, because if there is more data, libevent will fire the event handler registered for read
483 // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the
484 // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In
485 // that case, not trying another processing cycle here would result in a hang as we will never get to work the socket,
486 // despite having more data.
487 if (tSocket_->hasPendingDataToRead())
488 {
489 continue;
490 }
491
492 return;
493
494 case SOCKET_RECV:
495 // It is an error to be in this state if we already have all the data
496 if (!(readBufferPos_ < readWant_)) {
497 GlobalOutput.printf("TNonblockingServer: frame size too short");
498 close();
499 return;
500 }
501
502 try {
503 // Read from the socket
504 fetch = readWant_ - readBufferPos_;
505 got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
506 } catch (TTransportException& te) {
507 //In Nonblocking SSLSocket some operations need to be retried again.
508 //Current approach is parsing exception message, but a better solution needs to be investigated.
509 if(!strstr(te.what(), "retry")) {
510 GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
511 close();
512 }
513
514 return;
515 }
516
517 if (got > 0) {
518 // Move along in the buffer
519 readBufferPos_ += got;
520
521 // Check that we did not overdo it
522 assert(readBufferPos_ <= readWant_);
523
524 // We are done reading, move onto the next state
525 if (readBufferPos_ == readWant_) {
526 transition();
527 if (socketState_ == SOCKET_RECV_FRAMING && tSocket_->hasPendingDataToRead())
528 {
529 continue;
530 }
531 }
532 return;
533 }
534
535 // Whenever we get down here it means a remote disconnect
536 close();
537
538 return;
539
540 case SOCKET_SEND:
541 // Should never have position past size
542 assert(writeBufferPos_ <= writeBufferSize_);
543
544 // If there is no data to send, then let us move on
545 if (writeBufferPos_ == writeBufferSize_) {
546 GlobalOutput("WARNING: Send state with no data to send");
547 transition();
548 return;
549 }
550
551 try {
552 left = writeBufferSize_ - writeBufferPos_;
553 sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
554 } catch (TTransportException& te) {
555 GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
556 close();
557 return;
558 }
559
560 writeBufferPos_ += sent;
561
562 // Did we overdo it?
563 assert(writeBufferPos_ <= writeBufferSize_);
564
565 // We are done!
566 if (writeBufferPos_ == writeBufferSize_) {
567 transition();
568 }
569
570 return;
571
572 default:
573 GlobalOutput.printf("Unexpected Socket State %d", socketState_);
574 assert(0);
575 return;
576 }
577 }
578 }
579
getHeaderTransport()580 bool TNonblockingServer::getHeaderTransport() {
581 // Currently if there is no output protocol factory,
582 // we assume header transport (without having to create
583 // a new transport and check)
584 return getOutputProtocolFactory() == nullptr;
585 }
586
587 /**
588 * This is called when the application transitions from one state into
589 * another. This means that it has finished writing the data that it needed
590 * to, or finished receiving the data that it needed to.
591 */
transition()592 void TNonblockingServer::TConnection::transition() {
593 // ensure this connection is active right now
594 assert(ioThread_);
595 assert(server_);
596
597 // Switch upon the state that we are currently in and move to a new state
598 switch (appState_) {
599
600 case APP_READ_REQUEST:
601 // We are done reading the request, package the read buffer into transport
602 // and get back some data from the dispatch function
603 if (server_->getHeaderTransport()) {
604 inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
605 outputTransport_->resetBuffer();
606 } else {
607 // We saved room for the framing size in case header transport needed it,
608 // but just skip it for the non-header case
609 inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
610 outputTransport_->resetBuffer();
611
612 // Prepend four bytes of blank space to the buffer so we can
613 // write the frame size there later.
614 outputTransport_->getWritePtr(4);
615 outputTransport_->wroteBytes(4);
616 }
617
618 server_->incrementActiveProcessors();
619
620 if (server_->isThreadPoolProcessing()) {
621 // We are setting up a Task to do this work and we will wait on it
622
623 // Create task and dispatch to the thread manager
624 std::shared_ptr<Runnable> task = std::shared_ptr<Runnable>(
625 new Task(processor_, inputProtocol_, outputProtocol_, this));
626 // The application is now waiting on the task to finish
627 appState_ = APP_WAIT_TASK;
628
629 // Set this connection idle so that libevent doesn't process more
630 // data on it while we're still waiting for the threadmanager to
631 // finish this task
632 setIdle();
633
634 try {
635 server_->addTask(task);
636 } catch (IllegalStateException& ise) {
637 // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
638 GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
639 server_->decrementActiveProcessors();
640 close();
641 } catch (TimedOutException& to) {
642 GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what());
643 server_->decrementActiveProcessors();
644 close();
645 }
646
647 return;
648 } else {
649 try {
650 if (serverEventHandler_) {
651 serverEventHandler_->processContext(connectionContext_, getTSocket());
652 }
653 // Invoke the processor
654 processor_->process(inputProtocol_, outputProtocol_, connectionContext_);
655 } catch (const TTransportException& ttx) {
656 GlobalOutput.printf(
657 "TNonblockingServer transport error in "
658 "process(): %s",
659 ttx.what());
660 server_->decrementActiveProcessors();
661 close();
662 return;
663 } catch (const std::exception& x) {
664 GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
665 typeid(x).name(),
666 x.what());
667 server_->decrementActiveProcessors();
668 close();
669 return;
670 } catch (...) {
671 GlobalOutput.printf("Server::process() unknown exception");
672 server_->decrementActiveProcessors();
673 close();
674 return;
675 }
676 }
677 // fallthrough
678
679 // Intentionally fall through here, the call to process has written into
680 // the writeBuffer_
681
682 case APP_WAIT_TASK:
683 // We have now finished processing a task and the result has been written
684 // into the outputTransport_, so we grab its contents and place them into
685 // the writeBuffer_ for actual writing by the libevent thread
686
687 server_->decrementActiveProcessors();
688 // Get the result of the operation
689 outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
690
691 // If the function call generated return data, then move into the send
692 // state and get going
693 // 4 bytes were reserved for frame size
694 if (writeBufferSize_ > 4) {
695
696 // Move into write state
697 writeBufferPos_ = 0;
698 socketState_ = SOCKET_SEND;
699
700 // Put the frame size into the write buffer
701 auto frameSize = (int32_t)htonl(writeBufferSize_ - 4);
702 memcpy(writeBuffer_, &frameSize, 4);
703
704 // Socket into write mode
705 appState_ = APP_SEND_RESULT;
706 setWrite();
707
708 return;
709 }
710
711 // In this case, the request was oneway and we should fall through
712 // right back into the read frame header state
713 goto LABEL_APP_INIT;
714
715 case APP_SEND_RESULT:
716 // it's now safe to perform buffer size housekeeping.
717 if (writeBufferSize_ > largestWriteBufferSize_) {
718 largestWriteBufferSize_ = writeBufferSize_;
719 }
720 if (server_->getResizeBufferEveryN() > 0
721 && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
722 checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
723 server_->getIdleWriteBufferLimit());
724 callsForResize_ = 0;
725 }
726 // fallthrough
727
728 // N.B.: We also intentionally fall through here into the INIT state!
729
730 LABEL_APP_INIT:
731 case APP_INIT:
732
733 // Clear write buffer variables
734 writeBuffer_ = nullptr;
735 writeBufferPos_ = 0;
736 writeBufferSize_ = 0;
737
738 // Into read4 state we go
739 socketState_ = SOCKET_RECV_FRAMING;
740 appState_ = APP_READ_FRAME_SIZE;
741
742 readBufferPos_ = 0;
743
744 // Register read event
745 setRead();
746
747 return;
748
749 case APP_READ_FRAME_SIZE:
750 readWant_ += 4;
751
752 // We just read the request length
753 // Double the buffer size until it is big enough
754 if (readWant_ > readBufferSize_) {
755 if (readBufferSize_ == 0) {
756 readBufferSize_ = 1;
757 }
758 uint32_t newSize = readBufferSize_;
759 while (readWant_ > newSize) {
760 newSize *= 2;
761 }
762
763 auto* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
764 if (newBuffer == nullptr) {
765 // nothing else to be done...
766 throw std::bad_alloc();
767 }
768 readBuffer_ = newBuffer;
769 readBufferSize_ = newSize;
770 }
771
772 readBufferPos_ = 4;
773 *((uint32_t*)readBuffer_) = htonl(readWant_ - 4);
774
775 // Move into read request state
776 socketState_ = SOCKET_RECV;
777 appState_ = APP_READ_REQUEST;
778
779 return;
780
781 case APP_CLOSE_CONNECTION:
782 server_->decrementActiveProcessors();
783 close();
784 return;
785
786 default:
787 GlobalOutput.printf("Unexpected Application State %d", appState_);
788 assert(0);
789 }
790 }
791
setFlags(short eventFlags)792 void TNonblockingServer::TConnection::setFlags(short eventFlags) {
793 // Catch the do nothing case
794 if (eventFlags_ == eventFlags) {
795 return;
796 }
797
798 // Delete a previously existing event
799 if (eventFlags_ && event_del(&event_) == -1) {
800 GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR);
801 return;
802 }
803
804 // Update in memory structure
805 eventFlags_ = eventFlags;
806
807 // Do not call event_set if there are no flags
808 if (!eventFlags_) {
809 return;
810 }
811
812 /*
813 * event_set:
814 *
815 * Prepares the event structure &event to be used in future calls to
816 * event_add() and event_del(). The event will be prepared to call the
817 * eventHandler using the 'sock' file descriptor to monitor events.
818 *
819 * The events can be either EV_READ, EV_WRITE, or both, indicating
820 * that an application can read or write from the file respectively without
821 * blocking.
822 *
823 * The eventHandler will be called with the file descriptor that triggered
824 * the event and the type of event which will be one of: EV_TIMEOUT,
825 * EV_SIGNAL, EV_READ, EV_WRITE.
826 *
827 * The additional flag EV_PERSIST makes an event_add() persistent until
828 * event_del() has been called.
829 *
830 * Once initialized, the &event struct can be used repeatedly with
831 * event_add() and event_del() and does not need to be reinitialized unless
832 * the eventHandler and/or the argument to it are to be changed. However,
833 * when an ev structure has been added to libevent using event_add() the
834 * structure must persist until the event occurs (assuming EV_PERSIST
835 * is not set) or is removed using event_del(). You may not reuse the same
836 * ev structure for multiple monitored descriptors; each descriptor needs
837 * its own ev.
838 */
839 event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this);
840 event_base_set(ioThread_->getEventBase(), &event_);
841
842 // Add the event
843 if (event_add(&event_, nullptr) == -1) {
844 GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR);
845 }
846 }
847
848 /**
849 * Closes a connection
850 */
close()851 void TNonblockingServer::TConnection::close() {
852 setIdle();
853
854 if (serverEventHandler_) {
855 serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
856 }
857 ioThread_ = nullptr;
858
859 // Close the socket
860 tSocket_->close();
861
862 // close any factory produced transports
863 factoryInputTransport_->close();
864 factoryOutputTransport_->close();
865
866 // release processor and handler
867 processor_.reset();
868
869 // Give this object back to the server that owns it
870 server_->returnConnection(this);
871 }
872
checkIdleBufferMemLimit(size_t readLimit,size_t writeLimit)873 void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) {
874 if (readLimit > 0 && readBufferSize_ > readLimit) {
875 free(readBuffer_);
876 readBuffer_ = nullptr;
877 readBufferSize_ = 0;
878 }
879
880 if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
881 // just start over
882 outputTransport_->resetBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize()));
883 largestWriteBufferSize_ = 0;
884 }
885 }
886
~TNonblockingServer()887 TNonblockingServer::~TNonblockingServer() {
888 // Close any active connections (moves them to the idle connection stack)
889 while (activeConnections_.size()) {
890 activeConnections_.front()->close();
891 }
892 // Clean up unused TConnection objects in connectionStack_
893 while (!connectionStack_.empty()) {
894 TConnection* connection = connectionStack_.top();
895 connectionStack_.pop();
896 delete connection;
897 }
898 // The TNonblockingIOThread objects have shared_ptrs to the Thread
899 // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread
900 // objects (as runnable) so these objects will never deallocate without help.
901 while (!ioThreads_.empty()) {
902 std::shared_ptr<TNonblockingIOThread> iot = ioThreads_.back();
903 ioThreads_.pop_back();
904 iot->setThread(std::shared_ptr<Thread>());
905 }
906 }
907
908 /**
909 * Creates a new connection either by reusing an object off the stack or
910 * by allocating a new one entirely
911 */
createConnection(std::shared_ptr<TSocket> socket)912 TNonblockingServer::TConnection* TNonblockingServer::createConnection(std::shared_ptr<TSocket> socket) {
913 // Check the stack
914 Guard g(connMutex_);
915
916 // pick an IO thread to handle this connection -- currently round robin
917 assert(nextIOThread_ < ioThreads_.size());
918 int selectedThreadIdx = nextIOThread_;
919 nextIOThread_ = static_cast<uint32_t>((nextIOThread_ + 1) % ioThreads_.size());
920
921 TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
922
923 // Check the connection stack to see if we can re-use
924 TConnection* result = nullptr;
925 if (connectionStack_.empty()) {
926 result = new TConnection(socket, ioThread);
927 ++numTConnections_;
928 } else {
929 result = connectionStack_.top();
930 connectionStack_.pop();
931 result->setSocket(socket);
932 result->init(ioThread);
933 }
934 activeConnections_.push_back(result);
935 return result;
936 }
937
938 /**
939 * Returns a connection to the stack
940 */
returnConnection(TConnection * connection)941 void TNonblockingServer::returnConnection(TConnection* connection) {
942 Guard g(connMutex_);
943
944 activeConnections_.erase(std::remove(activeConnections_.begin(),
945 activeConnections_.end(),
946 connection),
947 activeConnections_.end());
948
949 if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) {
950 delete connection;
951 --numTConnections_;
952 } else {
953 connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
954 connectionStack_.push(connection);
955 }
956 }
957
958 /**
959 * Server socket had something happen. We accept all waiting client
960 * connections on fd and assign TConnection objects to handle those requests.
961 */
handleEvent(THRIFT_SOCKET fd,short which)962 void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) {
963 (void)which;
964 // Make sure that libevent didn't mess up the socket handles
965 assert(fd == serverSocket_);
966
967 // Going to accept a new client socket
968 std::shared_ptr<TSocket> clientSocket;
969
970 clientSocket = serverTransport_->accept();
971 if (clientSocket) {
972 // If we're overloaded, take action here
973 if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
974 Guard g(connMutex_);
975 nConnectionsDropped_++;
976 nTotalConnectionsDropped_++;
977 if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
978 clientSocket->close();
979 return;
980 } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
981 if (!drainPendingTask()) {
982 // Nothing left to discard, so we drop connection instead.
983 clientSocket->close();
984 return;
985 }
986 }
987 }
988
989 // Create a new TConnection for this client socket.
990 TConnection* clientConnection = createConnection(clientSocket);
991
992 // Fail fast if we could not create a TConnection object
993 if (clientConnection == nullptr) {
994 GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
995 clientSocket->close();
996 return;
997 }
998
999 /*
1000 * Either notify the ioThread that is assigned this connection to
1001 * start processing, or if it is us, we'll just ask this
1002 * connection to do its initial state change here.
1003 *
1004 * (We need to avoid writing to our own notification pipe, to
1005 * avoid possible deadlocks if the pipe is full.)
1006 *
1007 * The IO thread #0 is the only one that handles these listen
1008 * events, so unless the connection has been assigned to thread #0
1009 * we know it's not on our thread.
1010 */
1011 if (clientConnection->getIOThreadNumber() == 0) {
1012 clientConnection->transition();
1013 } else {
1014 if (!clientConnection->notifyIOThread()) {
1015 GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno);
1016 clientConnection->close();
1017 }
1018 }
1019 }
1020 }
1021
1022 /**
1023 * Creates a socket to listen on and binds it to the local port.
1024 */
createAndListenOnSocket()1025 void TNonblockingServer::createAndListenOnSocket() {
1026 serverTransport_->listen();
1027 serverSocket_ = serverTransport_->getSocketFD();
1028 }
1029
1030
setThreadManager(std::shared_ptr<ThreadManager> threadManager)1031 void TNonblockingServer::setThreadManager(std::shared_ptr<ThreadManager> threadManager) {
1032 threadManager_ = threadManager;
1033 if (threadManager) {
1034 threadManager->setExpireCallback(
1035 std::bind(&TNonblockingServer::expireClose,
1036 this,
1037 std::placeholders::_1));
1038 threadPoolProcessing_ = true;
1039 } else {
1040 threadPoolProcessing_ = false;
1041 }
1042 }
1043
serverOverloaded()1044 bool TNonblockingServer::serverOverloaded() {
1045 size_t activeConnections = numTConnections_ - connectionStack_.size();
1046 if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) {
1047 if (!overloaded_) {
1048 GlobalOutput.printf("TNonblockingServer: overload condition begun.");
1049 overloaded_ = true;
1050 }
1051 } else {
1052 if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_)
1053 && (activeConnections <= overloadHysteresis_ * maxConnections_)) {
1054 GlobalOutput.printf(
1055 "TNonblockingServer: overload ended; "
1056 "%u dropped (%llu total)",
1057 nConnectionsDropped_,
1058 nTotalConnectionsDropped_);
1059 nConnectionsDropped_ = 0;
1060 overloaded_ = false;
1061 }
1062 }
1063
1064 return overloaded_;
1065 }
1066
drainPendingTask()1067 bool TNonblockingServer::drainPendingTask() {
1068 if (threadManager_) {
1069 std::shared_ptr<Runnable> task = threadManager_->removeNextPending();
1070 if (task) {
1071 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1072 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
1073 connection->forceClose();
1074 return true;
1075 }
1076 }
1077 return false;
1078 }
1079
expireClose(std::shared_ptr<Runnable> task)1080 void TNonblockingServer::expireClose(std::shared_ptr<Runnable> task) {
1081 TConnection* connection = static_cast<TConnection::Task*>(task.get())->getTConnection();
1082 assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK);
1083 connection->forceClose();
1084 }
1085
stop()1086 void TNonblockingServer::stop() {
1087 // Breaks the event loop in all threads so that they end ASAP.
1088 for (auto & ioThread : ioThreads_) {
1089 ioThread->stop();
1090 }
1091 }
1092
registerEvents(event_base * user_event_base)1093 void TNonblockingServer::registerEvents(event_base* user_event_base) {
1094 userEventBase_ = user_event_base;
1095
1096 // init listen socket
1097 if (serverSocket_ == THRIFT_INVALID_SOCKET)
1098 createAndListenOnSocket();
1099
1100 // set up the IO threads
1101 assert(ioThreads_.empty());
1102 if (!numIOThreads_) {
1103 numIOThreads_ = DEFAULT_IO_THREADS;
1104 }
1105 // User-provided event-base doesn't works for multi-threaded servers
1106 assert(numIOThreads_ == 1 || !userEventBase_);
1107
1108 for (uint32_t id = 0; id < numIOThreads_; ++id) {
1109 // the first IO thread also does the listening on server socket
1110 THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET);
1111
1112 shared_ptr<TNonblockingIOThread> thread(
1113 new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
1114 ioThreads_.push_back(thread);
1115 }
1116
1117 // Notify handler of the preServe event
1118 if (eventHandler_) {
1119 eventHandler_->preServe();
1120 }
1121
1122 // Start all of our helper IO threads. Note that the threads run forever,
1123 // only terminating if stop() is called.
1124 assert(ioThreads_.size() == numIOThreads_);
1125 assert(ioThreads_.size() > 0);
1126
1127 GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
1128 ioThreads_.size());
1129
1130 // Launch all the secondary IO threads in separate threads
1131 if (ioThreads_.size() > 1) {
1132 ioThreadFactory_.reset(new ThreadFactory(
1133 false // detached
1134 ));
1135
1136 assert(ioThreadFactory_.get());
1137
1138 // intentionally starting at thread 1, not 0
1139 for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
1140 shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
1141 ioThreads_[i]->setThread(thread);
1142 thread->start();
1143 }
1144 }
1145
1146 // Register the events for the primary (listener) IO thread
1147 ioThreads_[0]->registerEvents();
1148 }
1149
1150 /**
1151 * Main workhorse function, starts up the server listening on a port and
1152 * loops over the libevent handler.
1153 */
serve()1154 void TNonblockingServer::serve() {
1155
1156 if (ioThreads_.empty())
1157 registerEvents(nullptr);
1158
1159 // Run the primary (listener) IO thread loop in our main thread; this will
1160 // only return when the server is shutting down.
1161 ioThreads_[0]->run();
1162
1163 // Ensure all threads are finished before exiting serve()
1164 for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
1165 ioThreads_[i]->join();
1166 GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
1167 }
1168 }
1169
TNonblockingIOThread(TNonblockingServer * server,int number,THRIFT_SOCKET listenSocket,bool useHighPriority)1170 TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
1171 int number,
1172 THRIFT_SOCKET listenSocket,
1173 bool useHighPriority)
1174 : server_(server),
1175 number_(number),
1176 threadId_{},
1177 listenSocket_(listenSocket),
1178 useHighPriority_(useHighPriority),
1179 eventBase_(nullptr),
1180 ownEventBase_(false),
1181 serverEvent_{},
1182 notificationEvent_{} {
1183 notificationPipeFDs_[0] = -1;
1184 notificationPipeFDs_[1] = -1;
1185 }
1186
~TNonblockingIOThread()1187 TNonblockingIOThread::~TNonblockingIOThread() {
1188 // make sure our associated thread is fully finished
1189 join();
1190
1191 if (eventBase_ && ownEventBase_) {
1192 event_base_free(eventBase_);
1193 ownEventBase_ = false;
1194 }
1195
1196 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
1197 if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) {
1198 GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR);
1199 }
1200 listenSocket_ = THRIFT_INVALID_SOCKET;
1201 }
1202
1203 for (auto notificationPipeFD : notificationPipeFDs_) {
1204 if (notificationPipeFD >= 0) {
1205 if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFD)) {
1206 GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
1207 THRIFT_GET_SOCKET_ERROR);
1208 }
1209 notificationPipeFD = THRIFT_INVALID_SOCKET;
1210 }
1211 }
1212 }
1213
createNotificationPipe()1214 void TNonblockingIOThread::createNotificationPipe() {
1215 if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
1216 GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
1217 throw TException("can't create notification pipe");
1218 }
1219 if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0
1220 || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) {
1221 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1222 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1223 throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK");
1224 }
1225 for (auto notificationPipeFD : notificationPipeFDs_) {
1226 #if LIBEVENT_VERSION_NUMBER < 0x02000000
1227 int flags;
1228 if ((flags = THRIFT_FCNTL(notificationPipeFD, F_GETFD, 0)) < 0
1229 || THRIFT_FCNTL(notificationPipeFD, F_SETFD, flags | FD_CLOEXEC) < 0) {
1230 #else
1231 if (evutil_make_socket_closeonexec(notificationPipeFD) < 0) {
1232 #endif
1233 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]);
1234 ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]);
1235 throw TException(
1236 "TNonblockingServer::createNotificationPipe() "
1237 "FD_CLOEXEC");
1238 }
1239 }
1240 }
1241
1242 /**
1243 * Register the core libevent events onto the proper base.
1244 */
1245 void TNonblockingIOThread::registerEvents() {
1246 threadId_ = Thread::get_current();
1247
1248 assert(eventBase_ == nullptr);
1249 eventBase_ = getServer()->getUserEventBase();
1250 if (eventBase_ == nullptr) {
1251 eventBase_ = event_base_new();
1252 ownEventBase_ = true;
1253 }
1254
1255 // Print some libevent stats
1256 if (number_ == 0) {
1257 GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
1258 event_get_version(),
1259 event_base_get_method(eventBase_));
1260 }
1261
1262 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
1263 // Register the server event
1264 event_set(&serverEvent_,
1265 listenSocket_,
1266 EV_READ | EV_PERSIST,
1267 TNonblockingIOThread::listenHandler,
1268 server_);
1269 event_base_set(eventBase_, &serverEvent_);
1270
1271 // Add the event and start up the server
1272 if (-1 == event_add(&serverEvent_, nullptr)) {
1273 throw TException(
1274 "TNonblockingServer::serve(): "
1275 "event_add() failed on server listen event");
1276 }
1277 GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_);
1278 }
1279
1280 createNotificationPipe();
1281
1282 // Create an event to be notified when a task finishes
1283 event_set(¬ificationEvent_,
1284 getNotificationRecvFD(),
1285 EV_READ | EV_PERSIST,
1286 TNonblockingIOThread::notifyHandler,
1287 this);
1288
1289 // Attach to the base
1290 event_base_set(eventBase_, ¬ificationEvent_);
1291
1292 // Add the event and start up the server
1293 if (-1 == event_add(¬ificationEvent_, nullptr)) {
1294 throw TException(
1295 "TNonblockingServer::serve(): "
1296 "event_add() failed on task-done notification event");
1297 }
1298 GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_);
1299 }
1300
1301 bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
1302 auto fd = getNotificationSendFD();
1303 if (fd < 0) {
1304 return false;
1305 }
1306
1307 int ret = -1;
1308 long kSize = sizeof(conn);
1309 const char * pos = (const char *)const_cast_sockopt(&conn);
1310
1311 #if defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H)
1312 struct pollfd pfd = {fd, POLLOUT, 0};
1313
1314 while (kSize > 0) {
1315 pfd.revents = 0;
1316 ret = poll(&pfd, 1, -1);
1317 if (ret < 0) {
1318 return false;
1319 } else if (ret == 0) {
1320 continue;
1321 }
1322
1323 if (pfd.revents & POLLHUP || pfd.revents & POLLERR) {
1324 ::THRIFT_CLOSESOCKET(fd);
1325 return false;
1326 }
1327
1328 if (pfd.revents & POLLOUT) {
1329 ret = send(fd, pos, kSize, 0);
1330 if (ret < 0) {
1331 if (errno == EAGAIN) {
1332 continue;
1333 }
1334
1335 ::THRIFT_CLOSESOCKET(fd);
1336 return false;
1337 }
1338
1339 kSize -= ret;
1340 pos += ret;
1341 }
1342 }
1343 #else
1344 fd_set wfds, efds;
1345
1346 while (kSize > 0) {
1347 FD_ZERO(&wfds);
1348 FD_ZERO(&efds);
1349 FD_SET(fd, &wfds);
1350 FD_SET(fd, &efds);
1351 ret = select(static_cast<int>(fd + 1), nullptr, &wfds, &efds, nullptr);
1352 if (ret < 0) {
1353 return false;
1354 } else if (ret == 0) {
1355 continue;
1356 }
1357
1358 if (FD_ISSET(fd, &efds)) {
1359 ::THRIFT_CLOSESOCKET(fd);
1360 return false;
1361 }
1362
1363 if (FD_ISSET(fd, &wfds)) {
1364 ret = send(fd, pos, kSize, 0);
1365 if (ret < 0) {
1366 if (errno == EAGAIN) {
1367 continue;
1368 }
1369
1370 ::THRIFT_CLOSESOCKET(fd);
1371 return false;
1372 }
1373
1374 kSize -= ret;
1375 pos += ret;
1376 }
1377 }
1378 #endif
1379
1380 return true;
1381 }
1382
1383 /* static */
1384 void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
1385 auto* ioThread = (TNonblockingIOThread*)v;
1386 assert(ioThread);
1387 (void)which;
1388
1389 while (true) {
1390 TNonblockingServer::TConnection* connection = nullptr;
1391 const int kSize = sizeof(connection);
1392 long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
1393 if (nBytes == kSize) {
1394 if (connection == nullptr) {
1395 // this is the command to stop our thread, exit the handler!
1396 ioThread->breakLoop(false);
1397 return;
1398 }
1399 connection->transition();
1400 } else if (nBytes > 0) {
1401 // throw away these bytes and hope that next time we get a solid read
1402 GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize);
1403 ioThread->breakLoop(true);
1404 return;
1405 } else if (nBytes == 0) {
1406 GlobalOutput.printf("notifyHandler: Notify socket closed!");
1407 ioThread->breakLoop(false);
1408 // exit the loop
1409 break;
1410 } else { // nBytes < 0
1411 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK
1412 && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) {
1413 GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR);
1414 ioThread->breakLoop(true);
1415 return;
1416 }
1417 // exit the loop
1418 break;
1419 }
1420 }
1421 }
1422
1423 void TNonblockingIOThread::breakLoop(bool error) {
1424 if (error) {
1425 GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_);
1426 // TODO: figure out something better to do here, but for now kill the
1427 // whole process.
1428 GlobalOutput.printf("TNonblockingServer: aborting process.");
1429 ::abort();
1430 }
1431
1432 // If we're running in the same thread, we can't use the notify(0)
1433 // mechanism to stop the thread, but happily if we're running in the
1434 // same thread, this means the thread can't be blocking in the event
1435 // loop either.
1436 if (!Thread::is_current(threadId_)) {
1437 notify(nullptr);
1438 } else {
1439 // cause the loop to stop ASAP - even if it has things to do in it
1440 event_base_loopbreak(eventBase_);
1441 }
1442 }
1443
1444 void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
1445 #ifdef HAVE_SCHED_H
1446 // Start out with a standard, low-priority setup for the sched params.
1447 struct sched_param sp;
1448 memset(static_cast<void*>(&sp), 0, sizeof(sp));
1449 int policy = SCHED_OTHER;
1450
1451 // If desired, set up high-priority sched params structure.
1452 if (value) {
1453 // FIFO scheduler, ranked above default SCHED_OTHER queue
1454 policy = SCHED_FIFO;
1455 // The priority only compares us to other SCHED_FIFO threads, so we
1456 // just pick a random priority halfway between min & max.
1457 const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2;
1458
1459 sp.sched_priority = priority;
1460 }
1461
1462 // Actually set the sched params for the current thread.
1463 if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
1464 GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
1465 } else {
1466 GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR);
1467 }
1468 #else
1469 THRIFT_UNUSED_VARIABLE(value);
1470 #endif
1471 }
1472
1473 void TNonblockingIOThread::run() {
1474 if (eventBase_ == nullptr) {
1475 registerEvents();
1476 }
1477 if (useHighPriority_) {
1478 setCurrentThreadHighPriority(true);
1479 }
1480
1481 if (eventBase_ != nullptr)
1482 {
1483 GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
1484 // Run libevent engine, never returns, invokes calls to eventHandler
1485 event_base_loop(eventBase_, 0);
1486
1487 if (useHighPriority_) {
1488 setCurrentThreadHighPriority(false);
1489 }
1490
1491 // cleans up our registered events
1492 cleanupEvents();
1493 }
1494
1495 GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
1496 }
1497
1498 void TNonblockingIOThread::cleanupEvents() {
1499 // stop the listen socket, if any
1500 if (listenSocket_ != THRIFT_INVALID_SOCKET) {
1501 if (event_del(&serverEvent_) == -1) {
1502 GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR);
1503 }
1504 }
1505
1506 event_del(¬ificationEvent_);
1507 }
1508
1509 void TNonblockingIOThread::stop() {
1510 // This should cause the thread to fall out of its event loop ASAP.
1511 breakLoop(false);
1512 }
1513
1514 void TNonblockingIOThread::join() {
1515 // If this was a thread created by a factory (not the thread that called
1516 // serve()), we join() it to make sure we shut down fully.
1517 if (thread_) {
1518 try {
1519 // Note that it is safe to both join() ourselves twice, as well as join
1520 // the current thread as the pthread implementation checks for deadlock.
1521 thread_->join();
1522 } catch (...) {
1523 // swallow everything
1524 }
1525 }
1526 }
1527 }
1528 }
1529 } // apache::thrift::server
1530