/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include #include #include #include #include #include #ifdef HAVE_POLL_H #include #elif HAVE_SYS_POLL_H #include #elif HAVE_SYS_SELECT_H #include #endif #ifdef HAVE_SYS_SOCKET_H #include #endif #ifdef HAVE_NETINET_IN_H #include #include #endif #ifdef HAVE_ARPA_INET_H #include #endif #ifdef HAVE_NETDB_H #include #endif #ifdef HAVE_FCNTL_H #include #endif #include #ifdef HAVE_SCHED_H #include #endif #ifndef AF_LOCAL #define AF_LOCAL AF_UNIX #endif #ifdef HAVE_INTTYPES_H #include #endif #ifdef HAVE_STDINT_H #include #endif namespace apache { namespace thrift { namespace server { using namespace apache::thrift::protocol; using namespace apache::thrift::transport; using namespace apache::thrift::concurrency; using apache::thrift::transport::TSocket; using apache::thrift::transport::TTransportException; using std::shared_ptr; /// Three states for sockets: recv frame size, recv data, and send mode enum TSocketState { SOCKET_RECV_FRAMING, SOCKET_RECV, SOCKET_SEND }; /** * Five states for the nonblocking server: * 1) initialize * 2) read 4 byte frame size * 3) read frame of data * 4) send back data (if any) * 5) force immediate connection close */ enum TAppState { APP_INIT, APP_READ_FRAME_SIZE, APP_READ_REQUEST, APP_WAIT_TASK, APP_SEND_RESULT, APP_CLOSE_CONNECTION }; /** * Represents a connection that is handled via libevent. This connection * essentially encapsulates a socket that has some associated libevent state. */ class TNonblockingServer::TConnection { private: /// Server IO Thread handling this connection TNonblockingIOThread* ioThread_; /// Server handle TNonblockingServer* server_; /// TProcessor std::shared_ptr processor_; /// Object wrapping network socket std::shared_ptr tSocket_; /// Libevent object struct event event_; /// Libevent flags short eventFlags_; /// Socket mode TSocketState socketState_; /// Application state TAppState appState_; /// How much data needed to read uint32_t readWant_; /// Where in the read buffer are we uint32_t readBufferPos_; /// Read buffer uint8_t* readBuffer_; /// Read buffer size uint32_t readBufferSize_; /// Write buffer uint8_t* writeBuffer_; /// Write buffer size uint32_t writeBufferSize_; /// How far through writing are we? uint32_t writeBufferPos_; /// Largest size of write buffer seen since buffer was constructed size_t largestWriteBufferSize_; /// Count of the number of calls for use with getResizeBufferEveryN(). int32_t callsForResize_; /// Transport to read from std::shared_ptr inputTransport_; /// Transport that processor writes to std::shared_ptr outputTransport_; /// extra transport generated by transport factory (e.g. BufferedRouterTransport) std::shared_ptr factoryInputTransport_; std::shared_ptr factoryOutputTransport_; /// Protocol decoder std::shared_ptr inputProtocol_; /// Protocol encoder std::shared_ptr outputProtocol_; /// Server event handler, if any std::shared_ptr serverEventHandler_; /// Thrift call context, if any void* connectionContext_; /// Go into read mode void setRead() { setFlags(EV_READ | EV_PERSIST); } /// Go into write mode void setWrite() { setFlags(EV_WRITE | EV_PERSIST); } /// Set socket idle void setIdle() { setFlags(0); } /** * Set event flags for this connection. * * @param eventFlags flags we pass to libevent for the connection. */ void setFlags(short eventFlags); /** * Libevent handler called (via our static wrapper) when the connection * socket had something happen. Rather than use the flags libevent passed, * we use the connection state to determine whether we need to read or * write the socket. */ void workSocket(); public: class Task; /// Constructor TConnection(std::shared_ptr socket, TNonblockingIOThread* ioThread) { readBuffer_ = nullptr; readBufferSize_ = 0; ioThread_ = ioThread; server_ = ioThread->getServer(); // Allocate input and output transports these only need to be allocated // once per TConnection (they don't need to be reallocated on init() call) inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_)); outputTransport_.reset( new TMemoryBuffer(static_cast(server_->getWriteBufferDefaultSize()))); tSocket_ = socket; init(ioThread); } ~TConnection() { std::free(readBuffer_); } /// Close this connection and free or reset its resources. void close(); /** * Check buffers against any size limits and shrink it if exceeded. * * @param readLimit we reduce read buffer size to this (if nonzero). * @param writeLimit if nonzero and write buffer is larger, replace it. */ void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit); /// Initialize void init(TNonblockingIOThread* ioThread); /// set socket for connection void setSocket(std::shared_ptr socket); /** * This is called when the application transitions from one state into * another. This means that it has finished writing the data that it needed * to, or finished receiving the data that it needed to. */ void transition(); /** * C-callable event handler for connection events. Provides a callback * that libevent can understand which invokes connection_->workSocket(). * * @param fd the descriptor the event occurred on. * @param which the flags associated with the event. * @param v void* callback arg where we placed TConnection's "this". */ static void eventHandler(evutil_socket_t fd, short /* which */, void* v) { assert(fd == static_cast(((TConnection*)v)->getTSocket()->getSocketFD())); ((TConnection*)v)->workSocket(); } /** * Notification to server that processing has ended on this request. * Can be called either when processing is completed or when a waiting * task has been preemptively terminated (on overload). * * Don't call this from the IO thread itself. * * @return true if successful, false if unable to notify (check THRIFT_GET_SOCKET_ERROR). */ bool notifyIOThread() { return ioThread_->notify(this); } /* * Returns the number of this connection's currently assigned IO * thread. */ int getIOThreadNumber() const { return ioThread_->getThreadNumber(); } /// Force connection shutdown for this connection. void forceClose() { appState_ = APP_CLOSE_CONNECTION; if (!notifyIOThread()) { server_->decrementActiveProcessors(); close(); throw TException("TConnection::forceClose: failed write on notify pipe"); } } /// return the server this connection was initialized for. TNonblockingServer* getServer() const { return server_; } /// get state of connection. TAppState getState() const { return appState_; } /// return the TSocket transport wrapping this network connection std::shared_ptr getTSocket() const { return tSocket_; } /// return the server event handler if any std::shared_ptr getServerEventHandler() { return serverEventHandler_; } /// return the Thrift connection context if any void* getConnectionContext() { return connectionContext_; } }; class TNonblockingServer::TConnection::Task : public Runnable { public: Task(std::shared_ptr processor, std::shared_ptr input, std::shared_ptr output, TConnection* connection) : processor_(processor), input_(input), output_(output), connection_(connection), serverEventHandler_(connection_->getServerEventHandler()), connectionContext_(connection_->getConnectionContext()) {} void run() override { try { for (;;) { if (serverEventHandler_) { serverEventHandler_->processContext(connectionContext_, connection_->getTSocket()); } if (!processor_->process(input_, output_, connectionContext_) || !input_->getTransport()->peek()) { break; } } } catch (const TTransportException& ttx) { GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what()); } catch (const std::bad_alloc&) { GlobalOutput("TNonblockingServer: caught bad_alloc exception."); exit(1); } catch (const std::exception& x) { GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s", typeid(x).name(), x.what()); } catch (...) { GlobalOutput.printf("TNonblockingServer: unknown exception while processing."); } // Signal completion back to the libevent thread via a pipe if (!connection_->notifyIOThread()) { GlobalOutput.printf("TNonblockingServer: failed to notifyIOThread, closing."); connection_->server_->decrementActiveProcessors(); connection_->close(); throw TException("TNonblockingServer::Task::run: failed write on notify pipe"); } } TConnection* getTConnection() { return connection_; } private: std::shared_ptr processor_; std::shared_ptr input_; std::shared_ptr output_; TConnection* connection_; std::shared_ptr serverEventHandler_; void* connectionContext_; }; void TNonblockingServer::TConnection::init(TNonblockingIOThread* ioThread) { ioThread_ = ioThread; server_ = ioThread->getServer(); appState_ = APP_INIT; eventFlags_ = 0; readBufferPos_ = 0; readWant_ = 0; writeBuffer_ = nullptr; writeBufferSize_ = 0; writeBufferPos_ = 0; largestWriteBufferSize_ = 0; socketState_ = SOCKET_RECV_FRAMING; callsForResize_ = 0; // get input/transports factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(inputTransport_); factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_); // Create protocol if (server_->getHeaderTransport()) { inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_, factoryOutputTransport_); outputProtocol_ = inputProtocol_; } else { inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_); outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_); } // Set up for any server event handler serverEventHandler_ = server_->getEventHandler(); if (serverEventHandler_) { connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_); } else { connectionContext_ = nullptr; } // Get the processor processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_); } void TNonblockingServer::TConnection::setSocket(std::shared_ptr socket) { tSocket_ = socket; } void TNonblockingServer::TConnection::workSocket() { while (true) { int got = 0, left = 0, sent = 0; uint32_t fetch = 0; switch (socketState_) { case SOCKET_RECV_FRAMING: union { uint8_t buf[sizeof(uint32_t)]; uint32_t size; } framing; // if we've already received some bytes we kept them here framing.size = readWant_; // determine size of this frame try { // Read from the socket fetch = tSocket_->read(&framing.buf[readBufferPos_], uint32_t(sizeof(framing.size) - readBufferPos_)); if (fetch == 0) { // Whenever we get here it means a remote disconnect close(); return; } readBufferPos_ += fetch; } catch (TTransportException& te) { //In Nonblocking SSLSocket some operations need to be retried again. //Current approach is parsing exception message, but a better solution needs to be investigated. if(!strstr(te.what(), "retry")) { GlobalOutput.printf("TConnection::workSocket(): %s", te.what()); close(); return; } } if (readBufferPos_ < sizeof(framing.size)) { // more needed before frame size is known -- save what we have so far readWant_ = framing.size; return; } readWant_ = ntohl(framing.size); if (readWant_ > server_->getMaxFrameSize()) { // Don't allow giant frame sizes. This prevents bad clients from // causing us to try and allocate a giant buffer. GlobalOutput.printf( "TNonblockingServer: frame size too large " "(%" PRIu32 " > %" PRIu64 ") from client %s. " "Remote side not using TFramedTransport?", readWant_, (uint64_t)server_->getMaxFrameSize(), tSocket_->getSocketInfo().c_str()); close(); return; } // size known; now get the rest of the frame transition(); // If the socket has more data than the frame header, continue to work on it. This is not strictly necessary for // regular sockets, because if there is more data, libevent will fire the event handler registered for read // readiness, which will in turn call workSocket(). However, some socket types (such as TSSLSocket) may have the // data sitting in their internal buffers and from libevent's perspective, there is no further data available. In // that case, not trying another processing cycle here would result in a hang as we will never get to work the socket, // despite having more data. if (tSocket_->hasPendingDataToRead()) { continue; } return; case SOCKET_RECV: // It is an error to be in this state if we already have all the data if (!(readBufferPos_ < readWant_)) { GlobalOutput.printf("TNonblockingServer: frame size too short"); close(); return; } try { // Read from the socket fetch = readWant_ - readBufferPos_; got = tSocket_->read(readBuffer_ + readBufferPos_, fetch); } catch (TTransportException& te) { //In Nonblocking SSLSocket some operations need to be retried again. //Current approach is parsing exception message, but a better solution needs to be investigated. if(!strstr(te.what(), "retry")) { GlobalOutput.printf("TConnection::workSocket(): %s", te.what()); close(); } return; } if (got > 0) { // Move along in the buffer readBufferPos_ += got; // Check that we did not overdo it assert(readBufferPos_ <= readWant_); // We are done reading, move onto the next state if (readBufferPos_ == readWant_) { transition(); if (socketState_ == SOCKET_RECV_FRAMING && tSocket_->hasPendingDataToRead()) { continue; } } return; } // Whenever we get down here it means a remote disconnect close(); return; case SOCKET_SEND: // Should never have position past size assert(writeBufferPos_ <= writeBufferSize_); // If there is no data to send, then let us move on if (writeBufferPos_ == writeBufferSize_) { GlobalOutput("WARNING: Send state with no data to send"); transition(); return; } try { left = writeBufferSize_ - writeBufferPos_; sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left); } catch (TTransportException& te) { GlobalOutput.printf("TConnection::workSocket(): %s ", te.what()); close(); return; } writeBufferPos_ += sent; // Did we overdo it? assert(writeBufferPos_ <= writeBufferSize_); // We are done! if (writeBufferPos_ == writeBufferSize_) { transition(); } return; default: GlobalOutput.printf("Unexpected Socket State %d", socketState_); assert(0); return; } } } bool TNonblockingServer::getHeaderTransport() { // Currently if there is no output protocol factory, // we assume header transport (without having to create // a new transport and check) return getOutputProtocolFactory() == nullptr; } /** * This is called when the application transitions from one state into * another. This means that it has finished writing the data that it needed * to, or finished receiving the data that it needed to. */ void TNonblockingServer::TConnection::transition() { // ensure this connection is active right now assert(ioThread_); assert(server_); // Switch upon the state that we are currently in and move to a new state switch (appState_) { case APP_READ_REQUEST: // We are done reading the request, package the read buffer into transport // and get back some data from the dispatch function if (server_->getHeaderTransport()) { inputTransport_->resetBuffer(readBuffer_, readBufferPos_); outputTransport_->resetBuffer(); } else { // We saved room for the framing size in case header transport needed it, // but just skip it for the non-header case inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4); outputTransport_->resetBuffer(); // Prepend four bytes of blank space to the buffer so we can // write the frame size there later. outputTransport_->getWritePtr(4); outputTransport_->wroteBytes(4); } server_->incrementActiveProcessors(); if (server_->isThreadPoolProcessing()) { // We are setting up a Task to do this work and we will wait on it // Create task and dispatch to the thread manager std::shared_ptr task = std::shared_ptr( new Task(processor_, inputProtocol_, outputProtocol_, this)); // The application is now waiting on the task to finish appState_ = APP_WAIT_TASK; // Set this connection idle so that libevent doesn't process more // data on it while we're still waiting for the threadmanager to // finish this task setIdle(); try { server_->addTask(task); } catch (IllegalStateException& ise) { // The ThreadManager is not ready to handle any more tasks (it's probably shutting down). GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what()); server_->decrementActiveProcessors(); close(); } catch (TimedOutException& to) { GlobalOutput.printf("[ERROR] TimedOutException: Server::process() %s", to.what()); server_->decrementActiveProcessors(); close(); } return; } else { try { if (serverEventHandler_) { serverEventHandler_->processContext(connectionContext_, getTSocket()); } // Invoke the processor processor_->process(inputProtocol_, outputProtocol_, connectionContext_); } catch (const TTransportException& ttx) { GlobalOutput.printf( "TNonblockingServer transport error in " "process(): %s", ttx.what()); server_->decrementActiveProcessors(); close(); return; } catch (const std::exception& x) { GlobalOutput.printf("Server::process() uncaught exception: %s: %s", typeid(x).name(), x.what()); server_->decrementActiveProcessors(); close(); return; } catch (...) { GlobalOutput.printf("Server::process() unknown exception"); server_->decrementActiveProcessors(); close(); return; } } // fallthrough // Intentionally fall through here, the call to process has written into // the writeBuffer_ case APP_WAIT_TASK: // We have now finished processing a task and the result has been written // into the outputTransport_, so we grab its contents and place them into // the writeBuffer_ for actual writing by the libevent thread server_->decrementActiveProcessors(); // Get the result of the operation outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_); // If the function call generated return data, then move into the send // state and get going // 4 bytes were reserved for frame size if (writeBufferSize_ > 4) { // Move into write state writeBufferPos_ = 0; socketState_ = SOCKET_SEND; // Put the frame size into the write buffer auto frameSize = (int32_t)htonl(writeBufferSize_ - 4); memcpy(writeBuffer_, &frameSize, 4); // Socket into write mode appState_ = APP_SEND_RESULT; setWrite(); return; } // In this case, the request was oneway and we should fall through // right back into the read frame header state goto LABEL_APP_INIT; case APP_SEND_RESULT: // it's now safe to perform buffer size housekeeping. if (writeBufferSize_ > largestWriteBufferSize_) { largestWriteBufferSize_ = writeBufferSize_; } if (server_->getResizeBufferEveryN() > 0 && ++callsForResize_ >= server_->getResizeBufferEveryN()) { checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(), server_->getIdleWriteBufferLimit()); callsForResize_ = 0; } // fallthrough // N.B.: We also intentionally fall through here into the INIT state! LABEL_APP_INIT: case APP_INIT: // Clear write buffer variables writeBuffer_ = nullptr; writeBufferPos_ = 0; writeBufferSize_ = 0; // Into read4 state we go socketState_ = SOCKET_RECV_FRAMING; appState_ = APP_READ_FRAME_SIZE; readBufferPos_ = 0; // Register read event setRead(); return; case APP_READ_FRAME_SIZE: readWant_ += 4; // We just read the request length // Double the buffer size until it is big enough if (readWant_ > readBufferSize_) { if (readBufferSize_ == 0) { readBufferSize_ = 1; } uint32_t newSize = readBufferSize_; while (readWant_ > newSize) { newSize *= 2; } auto* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize); if (newBuffer == nullptr) { // nothing else to be done... throw std::bad_alloc(); } readBuffer_ = newBuffer; readBufferSize_ = newSize; } readBufferPos_ = 4; *((uint32_t*)readBuffer_) = htonl(readWant_ - 4); // Move into read request state socketState_ = SOCKET_RECV; appState_ = APP_READ_REQUEST; return; case APP_CLOSE_CONNECTION: server_->decrementActiveProcessors(); close(); return; default: GlobalOutput.printf("Unexpected Application State %d", appState_); assert(0); } } void TNonblockingServer::TConnection::setFlags(short eventFlags) { // Catch the do nothing case if (eventFlags_ == eventFlags) { return; } // Delete a previously existing event if (eventFlags_ && event_del(&event_) == -1) { GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR); return; } // Update in memory structure eventFlags_ = eventFlags; // Do not call event_set if there are no flags if (!eventFlags_) { return; } /* * event_set: * * Prepares the event structure &event to be used in future calls to * event_add() and event_del(). The event will be prepared to call the * eventHandler using the 'sock' file descriptor to monitor events. * * The events can be either EV_READ, EV_WRITE, or both, indicating * that an application can read or write from the file respectively without * blocking. * * The eventHandler will be called with the file descriptor that triggered * the event and the type of event which will be one of: EV_TIMEOUT, * EV_SIGNAL, EV_READ, EV_WRITE. * * The additional flag EV_PERSIST makes an event_add() persistent until * event_del() has been called. * * Once initialized, the &event struct can be used repeatedly with * event_add() and event_del() and does not need to be reinitialized unless * the eventHandler and/or the argument to it are to be changed. However, * when an ev structure has been added to libevent using event_add() the * structure must persist until the event occurs (assuming EV_PERSIST * is not set) or is removed using event_del(). You may not reuse the same * ev structure for multiple monitored descriptors; each descriptor needs * its own ev. */ event_set(&event_, tSocket_->getSocketFD(), eventFlags_, TConnection::eventHandler, this); event_base_set(ioThread_->getEventBase(), &event_); // Add the event if (event_add(&event_, nullptr) == -1) { GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR); } } /** * Closes a connection */ void TNonblockingServer::TConnection::close() { setIdle(); if (serverEventHandler_) { serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_); } ioThread_ = nullptr; // Close the socket tSocket_->close(); // close any factory produced transports factoryInputTransport_->close(); factoryOutputTransport_->close(); // release processor and handler processor_.reset(); // Give this object back to the server that owns it server_->returnConnection(this); } void TNonblockingServer::TConnection::checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit) { if (readLimit > 0 && readBufferSize_ > readLimit) { free(readBuffer_); readBuffer_ = nullptr; readBufferSize_ = 0; } if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) { // just start over outputTransport_->resetBuffer(static_cast(server_->getWriteBufferDefaultSize())); largestWriteBufferSize_ = 0; } } TNonblockingServer::~TNonblockingServer() { // Close any active connections (moves them to the idle connection stack) while (activeConnections_.size()) { activeConnections_.front()->close(); } // Clean up unused TConnection objects in connectionStack_ while (!connectionStack_.empty()) { TConnection* connection = connectionStack_.top(); connectionStack_.pop(); delete connection; } // The TNonblockingIOThread objects have shared_ptrs to the Thread // objects and the Thread objects have shared_ptrs to the TNonblockingIOThread // objects (as runnable) so these objects will never deallocate without help. while (!ioThreads_.empty()) { std::shared_ptr iot = ioThreads_.back(); ioThreads_.pop_back(); iot->setThread(std::shared_ptr()); } } /** * Creates a new connection either by reusing an object off the stack or * by allocating a new one entirely */ TNonblockingServer::TConnection* TNonblockingServer::createConnection(std::shared_ptr socket) { // Check the stack Guard g(connMutex_); // pick an IO thread to handle this connection -- currently round robin assert(nextIOThread_ < ioThreads_.size()); int selectedThreadIdx = nextIOThread_; nextIOThread_ = static_cast((nextIOThread_ + 1) % ioThreads_.size()); TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get(); // Check the connection stack to see if we can re-use TConnection* result = nullptr; if (connectionStack_.empty()) { result = new TConnection(socket, ioThread); ++numTConnections_; } else { result = connectionStack_.top(); connectionStack_.pop(); result->setSocket(socket); result->init(ioThread); } activeConnections_.push_back(result); return result; } /** * Returns a connection to the stack */ void TNonblockingServer::returnConnection(TConnection* connection) { Guard g(connMutex_); activeConnections_.erase(std::remove(activeConnections_.begin(), activeConnections_.end(), connection), activeConnections_.end()); if (connectionStackLimit_ && (connectionStack_.size() >= connectionStackLimit_)) { delete connection; --numTConnections_; } else { connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_); connectionStack_.push(connection); } } /** * Server socket had something happen. We accept all waiting client * connections on fd and assign TConnection objects to handle those requests. */ void TNonblockingServer::handleEvent(THRIFT_SOCKET fd, short which) { (void)which; // Make sure that libevent didn't mess up the socket handles assert(fd == serverSocket_); // Going to accept a new client socket std::shared_ptr clientSocket; clientSocket = serverTransport_->accept(); if (clientSocket) { // If we're overloaded, take action here if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) { Guard g(connMutex_); nConnectionsDropped_++; nTotalConnectionsDropped_++; if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) { clientSocket->close(); return; } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) { if (!drainPendingTask()) { // Nothing left to discard, so we drop connection instead. clientSocket->close(); return; } } } // Create a new TConnection for this client socket. TConnection* clientConnection = createConnection(clientSocket); // Fail fast if we could not create a TConnection object if (clientConnection == nullptr) { GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory"); clientSocket->close(); return; } /* * Either notify the ioThread that is assigned this connection to * start processing, or if it is us, we'll just ask this * connection to do its initial state change here. * * (We need to avoid writing to our own notification pipe, to * avoid possible deadlocks if the pipe is full.) * * The IO thread #0 is the only one that handles these listen * events, so unless the connection has been assigned to thread #0 * we know it's not on our thread. */ if (clientConnection->getIOThreadNumber() == 0) { clientConnection->transition(); } else { if (!clientConnection->notifyIOThread()) { GlobalOutput.perror("[ERROR] notifyIOThread failed on fresh connection, closing", errno); clientConnection->close(); } } } } /** * Creates a socket to listen on and binds it to the local port. */ void TNonblockingServer::createAndListenOnSocket() { serverTransport_->listen(); serverSocket_ = serverTransport_->getSocketFD(); } void TNonblockingServer::setThreadManager(std::shared_ptr threadManager) { threadManager_ = threadManager; if (threadManager) { threadManager->setExpireCallback( std::bind(&TNonblockingServer::expireClose, this, std::placeholders::_1)); threadPoolProcessing_ = true; } else { threadPoolProcessing_ = false; } } bool TNonblockingServer::serverOverloaded() { size_t activeConnections = numTConnections_ - connectionStack_.size(); if (numActiveProcessors_ > maxActiveProcessors_ || activeConnections > maxConnections_) { if (!overloaded_) { GlobalOutput.printf("TNonblockingServer: overload condition begun."); overloaded_ = true; } } else { if (overloaded_ && (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) && (activeConnections <= overloadHysteresis_ * maxConnections_)) { GlobalOutput.printf( "TNonblockingServer: overload ended; " "%u dropped (%llu total)", nConnectionsDropped_, nTotalConnectionsDropped_); nConnectionsDropped_ = 0; overloaded_ = false; } } return overloaded_; } bool TNonblockingServer::drainPendingTask() { if (threadManager_) { std::shared_ptr task = threadManager_->removeNextPending(); if (task) { TConnection* connection = static_cast(task.get())->getTConnection(); assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK); connection->forceClose(); return true; } } return false; } void TNonblockingServer::expireClose(std::shared_ptr task) { TConnection* connection = static_cast(task.get())->getTConnection(); assert(connection && connection->getServer() && connection->getState() == APP_WAIT_TASK); connection->forceClose(); } void TNonblockingServer::stop() { // Breaks the event loop in all threads so that they end ASAP. for (auto & ioThread : ioThreads_) { ioThread->stop(); } } void TNonblockingServer::registerEvents(event_base* user_event_base) { userEventBase_ = user_event_base; // init listen socket if (serverSocket_ == THRIFT_INVALID_SOCKET) createAndListenOnSocket(); // set up the IO threads assert(ioThreads_.empty()); if (!numIOThreads_) { numIOThreads_ = DEFAULT_IO_THREADS; } // User-provided event-base doesn't works for multi-threaded servers assert(numIOThreads_ == 1 || !userEventBase_); for (uint32_t id = 0; id < numIOThreads_; ++id) { // the first IO thread also does the listening on server socket THRIFT_SOCKET listenFd = (id == 0 ? serverSocket_ : THRIFT_INVALID_SOCKET); shared_ptr thread( new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_)); ioThreads_.push_back(thread); } // Notify handler of the preServe event if (eventHandler_) { eventHandler_->preServe(); } // Start all of our helper IO threads. Note that the threads run forever, // only terminating if stop() is called. assert(ioThreads_.size() == numIOThreads_); assert(ioThreads_.size() > 0); GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.", ioThreads_.size()); // Launch all the secondary IO threads in separate threads if (ioThreads_.size() > 1) { ioThreadFactory_.reset(new ThreadFactory( false // detached )); assert(ioThreadFactory_.get()); // intentionally starting at thread 1, not 0 for (uint32_t i = 1; i < ioThreads_.size(); ++i) { shared_ptr thread = ioThreadFactory_->newThread(ioThreads_[i]); ioThreads_[i]->setThread(thread); thread->start(); } } // Register the events for the primary (listener) IO thread ioThreads_[0]->registerEvents(); } /** * Main workhorse function, starts up the server listening on a port and * loops over the libevent handler. */ void TNonblockingServer::serve() { if (ioThreads_.empty()) registerEvents(nullptr); // Run the primary (listener) IO thread loop in our main thread; this will // only return when the server is shutting down. ioThreads_[0]->run(); // Ensure all threads are finished before exiting serve() for (uint32_t i = 0; i < ioThreads_.size(); ++i) { ioThreads_[i]->join(); GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i); } } TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server, int number, THRIFT_SOCKET listenSocket, bool useHighPriority) : server_(server), number_(number), threadId_{}, listenSocket_(listenSocket), useHighPriority_(useHighPriority), eventBase_(nullptr), ownEventBase_(false), serverEvent_{}, notificationEvent_{} { notificationPipeFDs_[0] = -1; notificationPipeFDs_[1] = -1; } TNonblockingIOThread::~TNonblockingIOThread() { // make sure our associated thread is fully finished join(); if (eventBase_ && ownEventBase_) { event_base_free(eventBase_); ownEventBase_ = false; } if (listenSocket_ != THRIFT_INVALID_SOCKET) { if (0 != ::THRIFT_CLOSESOCKET(listenSocket_)) { GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ", THRIFT_GET_SOCKET_ERROR); } listenSocket_ = THRIFT_INVALID_SOCKET; } for (auto notificationPipeFD : notificationPipeFDs_) { if (notificationPipeFD >= 0) { if (0 != ::THRIFT_CLOSESOCKET(notificationPipeFD)) { GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ", THRIFT_GET_SOCKET_ERROR); } notificationPipeFD = THRIFT_INVALID_SOCKET; } } } void TNonblockingIOThread::createNotificationPipe() { if (evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) { GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR()); throw TException("can't create notification pipe"); } if (evutil_make_socket_nonblocking(notificationPipeFDs_[0]) < 0 || evutil_make_socket_nonblocking(notificationPipeFDs_[1]) < 0) { ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]); ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]); throw TException("TNonblockingServer::createNotificationPipe() THRIFT_O_NONBLOCK"); } for (auto notificationPipeFD : notificationPipeFDs_) { #if LIBEVENT_VERSION_NUMBER < 0x02000000 int flags; if ((flags = THRIFT_FCNTL(notificationPipeFD, F_GETFD, 0)) < 0 || THRIFT_FCNTL(notificationPipeFD, F_SETFD, flags | FD_CLOEXEC) < 0) { #else if (evutil_make_socket_closeonexec(notificationPipeFD) < 0) { #endif ::THRIFT_CLOSESOCKET(notificationPipeFDs_[0]); ::THRIFT_CLOSESOCKET(notificationPipeFDs_[1]); throw TException( "TNonblockingServer::createNotificationPipe() " "FD_CLOEXEC"); } } } /** * Register the core libevent events onto the proper base. */ void TNonblockingIOThread::registerEvents() { threadId_ = Thread::get_current(); assert(eventBase_ == nullptr); eventBase_ = getServer()->getUserEventBase(); if (eventBase_ == nullptr) { eventBase_ = event_base_new(); ownEventBase_ = true; } // Print some libevent stats if (number_ == 0) { GlobalOutput.printf("TNonblockingServer: using libevent %s method %s", event_get_version(), event_base_get_method(eventBase_)); } if (listenSocket_ != THRIFT_INVALID_SOCKET) { // Register the server event event_set(&serverEvent_, listenSocket_, EV_READ | EV_PERSIST, TNonblockingIOThread::listenHandler, server_); event_base_set(eventBase_, &serverEvent_); // Add the event and start up the server if (-1 == event_add(&serverEvent_, nullptr)) { throw TException( "TNonblockingServer::serve(): " "event_add() failed on server listen event"); } GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.", number_); } createNotificationPipe(); // Create an event to be notified when a task finishes event_set(¬ificationEvent_, getNotificationRecvFD(), EV_READ | EV_PERSIST, TNonblockingIOThread::notifyHandler, this); // Attach to the base event_base_set(eventBase_, ¬ificationEvent_); // Add the event and start up the server if (-1 == event_add(¬ificationEvent_, nullptr)) { throw TException( "TNonblockingServer::serve(): " "event_add() failed on task-done notification event"); } GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.", number_); } bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) { auto fd = getNotificationSendFD(); if (fd < 0) { return false; } int ret = -1; long kSize = sizeof(conn); const char * pos = (const char *)const_cast_sockopt(&conn); #if defined(HAVE_POLL_H) || defined(HAVE_SYS_POLL_H) struct pollfd pfd = {fd, POLLOUT, 0}; while (kSize > 0) { pfd.revents = 0; ret = poll(&pfd, 1, -1); if (ret < 0) { return false; } else if (ret == 0) { continue; } if (pfd.revents & POLLHUP || pfd.revents & POLLERR) { ::THRIFT_CLOSESOCKET(fd); return false; } if (pfd.revents & POLLOUT) { ret = send(fd, pos, kSize, 0); if (ret < 0) { if (errno == EAGAIN) { continue; } ::THRIFT_CLOSESOCKET(fd); return false; } kSize -= ret; pos += ret; } } #else fd_set wfds, efds; while (kSize > 0) { FD_ZERO(&wfds); FD_ZERO(&efds); FD_SET(fd, &wfds); FD_SET(fd, &efds); ret = select(static_cast(fd + 1), nullptr, &wfds, &efds, nullptr); if (ret < 0) { return false; } else if (ret == 0) { continue; } if (FD_ISSET(fd, &efds)) { ::THRIFT_CLOSESOCKET(fd); return false; } if (FD_ISSET(fd, &wfds)) { ret = send(fd, pos, kSize, 0); if (ret < 0) { if (errno == EAGAIN) { continue; } ::THRIFT_CLOSESOCKET(fd); return false; } kSize -= ret; pos += ret; } } #endif return true; } /* static */ void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) { auto* ioThread = (TNonblockingIOThread*)v; assert(ioThread); (void)which; while (true) { TNonblockingServer::TConnection* connection = nullptr; const int kSize = sizeof(connection); long nBytes = recv(fd, cast_sockopt(&connection), kSize, 0); if (nBytes == kSize) { if (connection == nullptr) { // this is the command to stop our thread, exit the handler! ioThread->breakLoop(false); return; } connection->transition(); } else if (nBytes > 0) { // throw away these bytes and hope that next time we get a solid read GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d", nBytes, kSize); ioThread->breakLoop(true); return; } else if (nBytes == 0) { GlobalOutput.printf("notifyHandler: Notify socket closed!"); ioThread->breakLoop(false); // exit the loop break; } else { // nBytes < 0 if (THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK && THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN) { GlobalOutput.perror("TNonblocking: notifyHandler read() failed: ", THRIFT_GET_SOCKET_ERROR); ioThread->breakLoop(true); return; } // exit the loop break; } } } void TNonblockingIOThread::breakLoop(bool error) { if (error) { GlobalOutput.printf("TNonblockingServer: IO thread #%d exiting with error.", number_); // TODO: figure out something better to do here, but for now kill the // whole process. GlobalOutput.printf("TNonblockingServer: aborting process."); ::abort(); } // If we're running in the same thread, we can't use the notify(0) // mechanism to stop the thread, but happily if we're running in the // same thread, this means the thread can't be blocking in the event // loop either. if (!Thread::is_current(threadId_)) { notify(nullptr); } else { // cause the loop to stop ASAP - even if it has things to do in it event_base_loopbreak(eventBase_); } } void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) { #ifdef HAVE_SCHED_H // Start out with a standard, low-priority setup for the sched params. struct sched_param sp; memset(static_cast(&sp), 0, sizeof(sp)); int policy = SCHED_OTHER; // If desired, set up high-priority sched params structure. if (value) { // FIFO scheduler, ranked above default SCHED_OTHER queue policy = SCHED_FIFO; // The priority only compares us to other SCHED_FIFO threads, so we // just pick a random priority halfway between min & max. const int priority = (sched_get_priority_max(policy) + sched_get_priority_min(policy)) / 2; sp.sched_priority = priority; } // Actually set the sched params for the current thread. if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) { GlobalOutput.printf("TNonblocking: IO Thread #%d using high-priority scheduler!", number_); } else { GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", THRIFT_GET_SOCKET_ERROR); } #else THRIFT_UNUSED_VARIABLE(value); #endif } void TNonblockingIOThread::run() { if (eventBase_ == nullptr) { registerEvents(); } if (useHighPriority_) { setCurrentThreadHighPriority(true); } if (eventBase_ != nullptr) { GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_); // Run libevent engine, never returns, invokes calls to eventHandler event_base_loop(eventBase_, 0); if (useHighPriority_) { setCurrentThreadHighPriority(false); } // cleans up our registered events cleanupEvents(); } GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_); } void TNonblockingIOThread::cleanupEvents() { // stop the listen socket, if any if (listenSocket_ != THRIFT_INVALID_SOCKET) { if (event_del(&serverEvent_) == -1) { GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", THRIFT_GET_SOCKET_ERROR); } } event_del(¬ificationEvent_); } void TNonblockingIOThread::stop() { // This should cause the thread to fall out of its event loop ASAP. breakLoop(false); } void TNonblockingIOThread::join() { // If this was a thread created by a factory (not the thread that called // serve()), we join() it to make sure we shut down fully. if (thread_) { try { // Note that it is safe to both join() ourselves twice, as well as join // the current thread as the pthread implementation checks for deadlock. thread_->join(); } catch (...) { // swallow everything } } } } } } // apache::thrift::server