1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
21 #define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
22
23 #include <thrift/Thrift.h>
24 #include <memory>
25 #include <thrift/server/TServer.h>
26 #include <thrift/transport/PlatformSocket.h>
27 #include <thrift/transport/TBufferTransports.h>
28 #include <thrift/transport/TSocket.h>
29 #include <thrift/transport/TNonblockingServerTransport.h>
30 #include <thrift/concurrency/ThreadManager.h>
31 #include <climits>
32 #include <thrift/concurrency/Thread.h>
33 #include <thrift/concurrency/ThreadFactory.h>
34 #include <thrift/concurrency/Mutex.h>
35 #include <stack>
36 #include <vector>
37 #include <string>
38 #include <cstdlib>
39 #ifdef HAVE_UNISTD_H
40 #include <unistd.h>
41 #endif
42 #include <event.h>
43 #include <event2/event_compat.h>
44 #include <event2/event_struct.h>
45
46 namespace apache {
47 namespace thrift {
48 namespace server {
49
50 using apache::thrift::transport::TMemoryBuffer;
51 using apache::thrift::transport::TSocket;
52 using apache::thrift::transport::TNonblockingServerTransport;
53 using apache::thrift::protocol::TProtocol;
54 using apache::thrift::concurrency::Runnable;
55 using apache::thrift::concurrency::ThreadManager;
56 using apache::thrift::concurrency::ThreadFactory;
57 using apache::thrift::concurrency::Thread;
58 using apache::thrift::concurrency::Mutex;
59 using apache::thrift::concurrency::Guard;
60
61 #ifdef LIBEVENT_VERSION_NUMBER
62 #define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
63 #define LIBEVENT_VERSION_MINOR ((LIBEVENT_VERSION_NUMBER >> 16) & 0xFF)
64 #define LIBEVENT_VERSION_REL ((LIBEVENT_VERSION_NUMBER >> 8) & 0xFF)
65 #else
66 // assume latest version 1 series
67 #define LIBEVENT_VERSION_MAJOR 1
68 #define LIBEVENT_VERSION_MINOR 14
69 #define LIBEVENT_VERSION_REL 13
70 #define LIBEVENT_VERSION_NUMBER \
71 ((LIBEVENT_VERSION_MAJOR << 24) | (LIBEVENT_VERSION_MINOR << 16) | (LIBEVENT_VERSION_REL << 8))
72 #endif
73
74 #if LIBEVENT_VERSION_NUMBER < 0x02000000
75 typedef THRIFT_SOCKET evutil_socket_t;
76 #endif
77
78 #ifndef SOCKOPT_CAST_T
79 #ifndef _WIN32
80 #define SOCKOPT_CAST_T void
81 #else
82 #define SOCKOPT_CAST_T char
83 #endif // _WIN32
84 #endif
85
86 template <class T>
const_cast_sockopt(const T * v)87 inline const SOCKOPT_CAST_T* const_cast_sockopt(const T* v) {
88 return reinterpret_cast<const SOCKOPT_CAST_T*>(v);
89 }
90
91 template <class T>
cast_sockopt(T * v)92 inline SOCKOPT_CAST_T* cast_sockopt(T* v) {
93 return reinterpret_cast<SOCKOPT_CAST_T*>(v);
94 }
95
96 /**
97 * This is a non-blocking server in C++ for high performance that
98 * operates a set of IO threads (by default only one). It assumes that
99 * all incoming requests are framed with a 4 byte length indicator and
100 * writes out responses using the same framing.
101 */
102
103 /// Overload condition actions.
104 enum TOverloadAction {
105 T_OVERLOAD_NO_ACTION, ///< Don't handle overload */
106 T_OVERLOAD_CLOSE_ON_ACCEPT, ///< Drop new connections immediately */
107 T_OVERLOAD_DRAIN_TASK_QUEUE ///< Drop some tasks from head of task queue */
108 };
109
110 class TNonblockingIOThread;
111
112 class TNonblockingServer : public TServer {
113 private:
114 class TConnection;
115
116 friend class TNonblockingIOThread;
117
118 private:
119 /// Listen backlog
120 static const int LISTEN_BACKLOG = 1024;
121
122 /// Default limit on size of idle connection pool
123 static const size_t CONNECTION_STACK_LIMIT = 1024;
124
125 /// Default limit on frame size
126 static const int MAX_FRAME_SIZE = 256 * 1024 * 1024;
127
128 /// Default limit on total number of connected sockets
129 static const int MAX_CONNECTIONS = INT_MAX;
130
131 /// Default limit on connections in handler/task processing
132 static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
133
134 /// Default size of write buffer
135 static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
136
137 /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
138 static const int IDLE_READ_BUFFER_LIMIT = 1024;
139
140 /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
141 static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
142
143 /// # of calls before resizing oversized buffers (0 = check only on close)
144 static const int RESIZE_BUFFER_EVERY_N = 512;
145
146 /// # of IO threads to use by default
147 static const int DEFAULT_IO_THREADS = 1;
148
149 /// # of IO threads this server will use
150 size_t numIOThreads_;
151
152 /// Whether to set high scheduling priority for IO threads
153 bool useHighPriorityIOThreads_;
154
155 /// Server socket file descriptor
156 THRIFT_SOCKET serverSocket_;
157
158 /// The optional user-provided event-base (for single-thread servers)
159 event_base* userEventBase_;
160
161 /// For processing via thread pool, may be nullptr
162 std::shared_ptr<ThreadManager> threadManager_;
163
164 /// Is thread pool processing?
165 bool threadPoolProcessing_;
166
167 // Factory to create the IO threads
168 std::shared_ptr<ThreadFactory> ioThreadFactory_;
169
170 // Vector of IOThread objects that will handle our IO
171 std::vector<std::shared_ptr<TNonblockingIOThread> > ioThreads_;
172
173 // Index of next IO Thread to be used (for round-robin)
174 uint32_t nextIOThread_;
175
176 // Synchronizes access to connection stack and similar data
177 Mutex connMutex_;
178
179 /// Number of TConnection object we've created
180 size_t numTConnections_;
181
182 /// Number of Connections processing or waiting to process
183 size_t numActiveProcessors_;
184
185 /// Limit for how many TConnection objects to cache
186 size_t connectionStackLimit_;
187
188 /// Limit for number of connections processing or waiting to process
189 size_t maxActiveProcessors_;
190
191 /// Limit for number of open connections
192 size_t maxConnections_;
193
194 /// Limit for frame size
195 size_t maxFrameSize_;
196
197 /// Time in milliseconds before an unperformed task expires (0 == infinite).
198 int64_t taskExpireTime_;
199
200 /**
201 * Hysteresis for overload state. This is the fraction of the overload
202 * value that needs to be reached before the overload state is cleared;
203 * must be <= 1.0.
204 */
205 double overloadHysteresis_;
206
207 /// Action to take when we're overloaded.
208 TOverloadAction overloadAction_;
209
210 /**
211 * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
212 * and found to be exceeded, reinitialized) to this size.
213 */
214 size_t writeBufferDefaultSize_;
215
216 /**
217 * Max read buffer size for an idle TConnection. When we place an idle
218 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
219 * we will free the buffer (such that it will be reinitialized by the next
220 * received frame) if it has exceeded this limit. 0 disables this check.
221 */
222 size_t idleReadBufferLimit_;
223
224 /**
225 * Max write buffer size for an idle connection. When we place an idle
226 * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
227 * we insure that its write buffer is <= to this size; otherwise we
228 * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
229 * idle connections don't hog memory. 0 disables this check.
230 */
231 size_t idleWriteBufferLimit_;
232
233 /**
234 * Every N calls we check the buffer size limits on a connected TConnection.
235 * 0 disables (i.e. the checks are only done when a connection closes).
236 */
237 int32_t resizeBufferEveryN_;
238
239 /// Set if we are currently in an overloaded state.
240 bool overloaded_;
241
242 /// Count of connections dropped since overload started
243 uint32_t nConnectionsDropped_;
244
245 /// Count of connections dropped on overload since server started
246 uint64_t nTotalConnectionsDropped_;
247
248 /**
249 * This is a stack of all the objects that have been created but that
250 * are NOT currently in use. When we close a connection, we place it on this
251 * stack so that the object can be reused later, rather than freeing the
252 * memory and reallocating a new object later.
253 */
254 std::stack<TConnection*> connectionStack_;
255
256 /**
257 * This container holds pointers to all active connections. This container
258 * allows the server to clean up unlcosed connection objects at destruction,
259 * which in turn allows their transports, protocols, processors and handlers
260 * to deallocate and clean up correctly.
261 */
262 std::vector<TConnection*> activeConnections_;
263
264 /*
265 */
266 std::shared_ptr<TNonblockingServerTransport> serverTransport_;
267
268 /**
269 * Called when server socket had something happen. We accept all waiting
270 * client connections on listen socket fd and assign TConnection objects
271 * to handle those requests.
272 *
273 * @param which the event flag that triggered the handler.
274 */
275 void handleEvent(THRIFT_SOCKET fd, short which);
276
init()277 void init() {
278 serverSocket_ = THRIFT_INVALID_SOCKET;
279 numIOThreads_ = DEFAULT_IO_THREADS;
280 nextIOThread_ = 0;
281 useHighPriorityIOThreads_ = false;
282 userEventBase_ = nullptr;
283 threadPoolProcessing_ = false;
284 numTConnections_ = 0;
285 numActiveProcessors_ = 0;
286 connectionStackLimit_ = CONNECTION_STACK_LIMIT;
287 maxActiveProcessors_ = MAX_ACTIVE_PROCESSORS;
288 maxConnections_ = MAX_CONNECTIONS;
289 maxFrameSize_ = MAX_FRAME_SIZE;
290 taskExpireTime_ = 0;
291 overloadHysteresis_ = 0.8;
292 overloadAction_ = T_OVERLOAD_NO_ACTION;
293 writeBufferDefaultSize_ = WRITE_BUFFER_DEFAULT_SIZE;
294 idleReadBufferLimit_ = IDLE_READ_BUFFER_LIMIT;
295 idleWriteBufferLimit_ = IDLE_WRITE_BUFFER_LIMIT;
296 resizeBufferEveryN_ = RESIZE_BUFFER_EVERY_N;
297 overloaded_ = false;
298 nConnectionsDropped_ = 0;
299 nTotalConnectionsDropped_ = 0;
300 }
301
302 public:
TNonblockingServer(const std::shared_ptr<TProcessorFactory> & processorFactory,const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport> & serverTransport)303 TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
304 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
305 : TServer(processorFactory), serverTransport_(serverTransport) {
306 init();
307 }
308
TNonblockingServer(const std::shared_ptr<TProcessor> & processor,const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport> & serverTransport)309 TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
310 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport)
311 : TServer(processor), serverTransport_(serverTransport) {
312 init();
313 }
314
315
316 TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
317 const std::shared_ptr<TProtocolFactory>& protocolFactory,
318 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
319 const std::shared_ptr<ThreadManager>& threadManager
320 = std::shared_ptr<ThreadManager>())
TServer(processorFactory)321 : TServer(processorFactory), serverTransport_(serverTransport) {
322 init();
323
324 setInputProtocolFactory(protocolFactory);
325 setOutputProtocolFactory(protocolFactory);
326 setThreadManager(threadManager);
327 }
328
329 TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
330 const std::shared_ptr<TProtocolFactory>& protocolFactory,
331 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
332 const std::shared_ptr<ThreadManager>& threadManager
333 = std::shared_ptr<ThreadManager>())
TServer(processor)334 : TServer(processor), serverTransport_(serverTransport) {
335 init();
336
337 setInputProtocolFactory(protocolFactory);
338 setOutputProtocolFactory(protocolFactory);
339 setThreadManager(threadManager);
340 }
341
342 TNonblockingServer(const std::shared_ptr<TProcessorFactory>& processorFactory,
343 const std::shared_ptr<TTransportFactory>& inputTransportFactory,
344 const std::shared_ptr<TTransportFactory>& outputTransportFactory,
345 const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
346 const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
347 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
348 const std::shared_ptr<ThreadManager>& threadManager
349 = std::shared_ptr<ThreadManager>())
TServer(processorFactory)350 : TServer(processorFactory), serverTransport_(serverTransport) {
351 init();
352
353 setInputTransportFactory(inputTransportFactory);
354 setOutputTransportFactory(outputTransportFactory);
355 setInputProtocolFactory(inputProtocolFactory);
356 setOutputProtocolFactory(outputProtocolFactory);
357 setThreadManager(threadManager);
358 }
359
360 TNonblockingServer(const std::shared_ptr<TProcessor>& processor,
361 const std::shared_ptr<TTransportFactory>& inputTransportFactory,
362 const std::shared_ptr<TTransportFactory>& outputTransportFactory,
363 const std::shared_ptr<TProtocolFactory>& inputProtocolFactory,
364 const std::shared_ptr<TProtocolFactory>& outputProtocolFactory,
365 const std::shared_ptr<apache::thrift::transport::TNonblockingServerTransport>& serverTransport,
366 const std::shared_ptr<ThreadManager>& threadManager
367 = std::shared_ptr<ThreadManager>())
TServer(processor)368 : TServer(processor), serverTransport_(serverTransport) {
369 init();
370
371 setInputTransportFactory(inputTransportFactory);
372 setOutputTransportFactory(outputTransportFactory);
373 setInputProtocolFactory(inputProtocolFactory);
374 setOutputProtocolFactory(outputProtocolFactory);
375 setThreadManager(threadManager);
376 }
377
378 ~TNonblockingServer() override;
379
380 void setThreadManager(std::shared_ptr<ThreadManager> threadManager);
381
getListenPort()382 int getListenPort() { return serverTransport_->getListenPort(); }
383
getThreadManager()384 std::shared_ptr<ThreadManager> getThreadManager() { return threadManager_; }
385
386 /**
387 * Sets the number of IO threads used by this server. Can only be used before
388 * the call to serve() and has no effect afterwards.
389 */
setNumIOThreads(size_t numThreads)390 void setNumIOThreads(size_t numThreads) {
391 numIOThreads_ = numThreads;
392 // User-provided event-base doesn't works for multi-threaded servers
393 assert(numIOThreads_ <= 1 || !userEventBase_);
394 }
395
396 /** Return whether the IO threads will get high scheduling priority */
useHighPriorityIOThreads()397 bool useHighPriorityIOThreads() const { return useHighPriorityIOThreads_; }
398
399 /** Set whether the IO threads will get high scheduling priority. */
setUseHighPriorityIOThreads(bool val)400 void setUseHighPriorityIOThreads(bool val) { useHighPriorityIOThreads_ = val; }
401
402 /** Return the number of IO threads used by this server. */
getNumIOThreads()403 size_t getNumIOThreads() const { return numIOThreads_; }
404
405 /**
406 * Get the maximum number of unused TConnection we will hold in reserve.
407 *
408 * @return the current limit on TConnection pool size.
409 */
getConnectionStackLimit()410 size_t getConnectionStackLimit() const { return connectionStackLimit_; }
411
412 /**
413 * Set the maximum number of unused TConnection we will hold in reserve.
414 *
415 * @param sz the new limit for TConnection pool size.
416 */
setConnectionStackLimit(size_t sz)417 void setConnectionStackLimit(size_t sz) { connectionStackLimit_ = sz; }
418
isThreadPoolProcessing()419 bool isThreadPoolProcessing() const { return threadPoolProcessing_; }
420
addTask(std::shared_ptr<Runnable> task)421 void addTask(std::shared_ptr<Runnable> task) {
422 threadManager_->add(task, 0LL, taskExpireTime_);
423 }
424
425 /**
426 * Return the count of sockets currently connected to.
427 *
428 * @return count of connected sockets.
429 */
getNumConnections()430 size_t getNumConnections() const { return numTConnections_; }
431
432 /**
433 * Return the count of sockets currently connected to.
434 *
435 * @return count of connected sockets.
436 */
getNumActiveConnections()437 size_t getNumActiveConnections() const { return getNumConnections() - getNumIdleConnections(); }
438
439 /**
440 * Return the count of connection objects allocated but not in use.
441 *
442 * @return count of idle connection objects.
443 */
getNumIdleConnections()444 size_t getNumIdleConnections() const { return connectionStack_.size(); }
445
446 /**
447 * Return count of number of connections which are currently processing.
448 * This is defined as a connection where all data has been received and
449 * either assigned a task (when threading) or passed to a handler (when
450 * not threading), and where the handler has not yet returned.
451 *
452 * @return # of connections currently processing.
453 */
getNumActiveProcessors()454 size_t getNumActiveProcessors() const { return numActiveProcessors_; }
455
456 /// Increment the count of connections currently processing.
incrementActiveProcessors()457 void incrementActiveProcessors() {
458 Guard g(connMutex_);
459 ++numActiveProcessors_;
460 }
461
462 /// Decrement the count of connections currently processing.
decrementActiveProcessors()463 void decrementActiveProcessors() {
464 Guard g(connMutex_);
465 if (numActiveProcessors_ > 0) {
466 --numActiveProcessors_;
467 }
468 }
469
470 /**
471 * Get the maximum # of connections allowed before overload.
472 *
473 * @return current setting.
474 */
getMaxConnections()475 size_t getMaxConnections() const { return maxConnections_; }
476
477 /**
478 * Set the maximum # of connections allowed before overload.
479 *
480 * @param maxConnections new setting for maximum # of connections.
481 */
setMaxConnections(size_t maxConnections)482 void setMaxConnections(size_t maxConnections) { maxConnections_ = maxConnections; }
483
484 /**
485 * Get the maximum # of connections waiting in handler/task before overload.
486 *
487 * @return current setting.
488 */
getMaxActiveProcessors()489 size_t getMaxActiveProcessors() const { return maxActiveProcessors_; }
490
491 /**
492 * Set the maximum # of connections waiting in handler/task before overload.
493 *
494 * @param maxActiveProcessors new setting for maximum # of active processes.
495 */
setMaxActiveProcessors(size_t maxActiveProcessors)496 void setMaxActiveProcessors(size_t maxActiveProcessors) {
497 maxActiveProcessors_ = maxActiveProcessors;
498 }
499
500 /**
501 * Get the maximum allowed frame size.
502 *
503 * If a client tries to send a message larger than this limit,
504 * its connection will be closed.
505 *
506 * @return Maxium frame size, in bytes.
507 */
getMaxFrameSize()508 size_t getMaxFrameSize() const { return maxFrameSize_; }
509
510 /**
511 * Set the maximum allowed frame size.
512 *
513 * @param maxFrameSize The new maximum frame size.
514 */
setMaxFrameSize(size_t maxFrameSize)515 void setMaxFrameSize(size_t maxFrameSize) { maxFrameSize_ = maxFrameSize; }
516
517 /**
518 * Get fraction of maximum limits before an overload condition is cleared.
519 *
520 * @return hysteresis fraction
521 */
getOverloadHysteresis()522 double getOverloadHysteresis() const { return overloadHysteresis_; }
523
524 /**
525 * Set fraction of maximum limits before an overload condition is cleared.
526 * A good value would probably be between 0.5 and 0.9.
527 *
528 * @param hysteresisFraction fraction <= 1.0.
529 */
setOverloadHysteresis(double hysteresisFraction)530 void setOverloadHysteresis(double hysteresisFraction) {
531 if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
532 overloadHysteresis_ = hysteresisFraction;
533 }
534 }
535
536 /**
537 * Get the action the server will take on overload.
538 *
539 * @return a TOverloadAction enum value for the currently set action.
540 */
getOverloadAction()541 TOverloadAction getOverloadAction() const { return overloadAction_; }
542
543 /**
544 * Set the action the server is to take on overload.
545 *
546 * @param overloadAction a TOverloadAction enum value for the action.
547 */
setOverloadAction(TOverloadAction overloadAction)548 void setOverloadAction(TOverloadAction overloadAction) { overloadAction_ = overloadAction; }
549
550 /**
551 * Get the time in milliseconds after which a task expires (0 == infinite).
552 *
553 * @return a 64-bit time in milliseconds.
554 */
getTaskExpireTime()555 int64_t getTaskExpireTime() const { return taskExpireTime_; }
556
557 /**
558 * Set the time in milliseconds after which a task expires (0 == infinite).
559 *
560 * @param taskExpireTime a 64-bit time in milliseconds.
561 */
setTaskExpireTime(int64_t taskExpireTime)562 void setTaskExpireTime(int64_t taskExpireTime) { taskExpireTime_ = taskExpireTime; }
563
564 /**
565 * Determine if the server is currently overloaded.
566 * This function checks the maximums for open connections and connections
567 * currently in processing, and sets an overload condition if they are
568 * exceeded. The overload will persist until both values are below the
569 * current hysteresis fraction of their maximums.
570 *
571 * @return true if an overload condition exists, false if not.
572 */
573 bool serverOverloaded();
574
575 /** Pop and discard next task on threadpool wait queue.
576 *
577 * @return true if a task was discarded, false if the wait queue was empty.
578 */
579 bool drainPendingTask();
580
581 /**
582 * Get the starting size of a TConnection object's write buffer.
583 *
584 * @return # bytes we initialize a TConnection object's write buffer to.
585 */
getWriteBufferDefaultSize()586 size_t getWriteBufferDefaultSize() const { return writeBufferDefaultSize_; }
587
588 /**
589 * Set the starting size of a TConnection object's write buffer.
590 *
591 * @param size # bytes we initialize a TConnection object's write buffer to.
592 */
setWriteBufferDefaultSize(size_t size)593 void setWriteBufferDefaultSize(size_t size) { writeBufferDefaultSize_ = size; }
594
595 /**
596 * Get the maximum size of read buffer allocated to idle TConnection objects.
597 *
598 * @return # bytes beyond which we will dealloc idle buffer.
599 */
getIdleReadBufferLimit()600 size_t getIdleReadBufferLimit() const { return idleReadBufferLimit_; }
601
602 /**
603 * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
604 * Get the maximum size of read buffer allocated to idle TConnection objects.
605 *
606 * @return # bytes beyond which we will dealloc idle buffer.
607 */
getIdleBufferMemLimit()608 size_t getIdleBufferMemLimit() const { return idleReadBufferLimit_; }
609
610 /**
611 * Set the maximum size read buffer allocated to idle TConnection objects.
612 * If a TConnection object is found (either on connection close or between
613 * calls when resizeBufferEveryN_ is set) with more than this much memory
614 * allocated to its read buffer, we free it and allow it to be reinitialized
615 * on the next received frame.
616 *
617 * @param limit of bytes beyond which we will shrink buffers when checked.
618 */
setIdleReadBufferLimit(size_t limit)619 void setIdleReadBufferLimit(size_t limit) { idleReadBufferLimit_ = limit; }
620
621 /**
622 * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
623 * Set the maximum size read buffer allocated to idle TConnection objects.
624 * If a TConnection object is found (either on connection close or between
625 * calls when resizeBufferEveryN_ is set) with more than this much memory
626 * allocated to its read buffer, we free it and allow it to be reinitialized
627 * on the next received frame.
628 *
629 * @param limit of bytes beyond which we will shrink buffers when checked.
630 */
setIdleBufferMemLimit(size_t limit)631 void setIdleBufferMemLimit(size_t limit) { idleReadBufferLimit_ = limit; }
632
633 /**
634 * Get the maximum size of write buffer allocated to idle TConnection objects.
635 *
636 * @return # bytes beyond which we will reallocate buffers when checked.
637 */
getIdleWriteBufferLimit()638 size_t getIdleWriteBufferLimit() const { return idleWriteBufferLimit_; }
639
640 /**
641 * Set the maximum size write buffer allocated to idle TConnection objects.
642 * If a TConnection object is found (either on connection close or between
643 * calls when resizeBufferEveryN_ is set) with more than this much memory
644 * allocated to its write buffer, we destroy and construct that buffer with
645 * writeBufferDefaultSize_ bytes.
646 *
647 * @param limit of bytes beyond which we will shrink buffers when idle.
648 */
setIdleWriteBufferLimit(size_t limit)649 void setIdleWriteBufferLimit(size_t limit) { idleWriteBufferLimit_ = limit; }
650
651 /**
652 * Get # of calls made between buffer size checks. 0 means disabled.
653 *
654 * @return # of calls between buffer size checks.
655 */
getResizeBufferEveryN()656 int32_t getResizeBufferEveryN() const { return resizeBufferEveryN_; }
657
658 /**
659 * Check buffer sizes every "count" calls. This allows buffer limits
660 * to be enforced for persistent connections with a controllable degree
661 * of overhead. 0 disables checks except at connection close.
662 *
663 * @param count the number of calls between checks, or 0 to disable
664 */
setResizeBufferEveryN(int32_t count)665 void setResizeBufferEveryN(int32_t count) { resizeBufferEveryN_ = count; }
666
667 /**
668 * Main workhorse function, starts up the server listening on a port and
669 * loops over the libevent handler.
670 */
671 void serve() override;
672
673 /**
674 * Causes the server to terminate gracefully (can be called from any thread).
675 */
676 void stop() override;
677
678 /// Creates a socket to listen on and binds it to the local port.
679 void createAndListenOnSocket();
680
681 /**
682 * Register the optional user-provided event-base (for single-thread servers)
683 *
684 * This method should be used when the server is running in a single-thread
685 * mode, and the event base is provided by the user (i.e., the caller).
686 *
687 * @param user_event_base the user-provided event-base. The user is
688 * responsible for freeing the event base memory.
689 */
690 void registerEvents(event_base* user_event_base);
691
692 /**
693 * Returns the optional user-provided event-base (for single-thread servers).
694 */
getUserEventBase()695 event_base* getUserEventBase() const { return userEventBase_; }
696
697 /** Some transports, like THeaderTransport, require passing through
698 * the framing size instead of stripping it.
699 */
700 bool getHeaderTransport();
701
702 private:
703 /**
704 * Callback function that the threadmanager calls when a task reaches
705 * its expiration time. It is needed to clean up the expired connection.
706 *
707 * @param task the runnable associated with the expired task.
708 */
709 void expireClose(std::shared_ptr<Runnable> task);
710
711 /**
712 * Return an initialized connection object. Creates or recovers from
713 * pool a TConnection and initializes it with the provided socket FD
714 * and flags.
715 *
716 * @param socket FD of socket associated with this connection.
717 * @param addr the sockaddr of the client
718 * @param addrLen the length of addr
719 * @return pointer to initialized TConnection object.
720 */
721 TConnection* createConnection(std::shared_ptr<TSocket> socket);
722
723 /**
724 * Returns a connection to pool or deletion. If the connection pool
725 * (a stack) isn't full, place the connection object on it, otherwise
726 * just delete it.
727 *
728 * @param connection the TConection being returned.
729 */
730 void returnConnection(TConnection* connection);
731 };
732
733 class TNonblockingIOThread : public Runnable {
734 public:
735 // Creates an IO thread and sets up the event base. The listenSocket should
736 // be a valid FD on which listen() has already been called. If the
737 // listenSocket is < 0, accepting will not be done.
738 TNonblockingIOThread(TNonblockingServer* server,
739 int number,
740 THRIFT_SOCKET listenSocket,
741 bool useHighPriority);
742
743 ~TNonblockingIOThread() override;
744
745 // Returns the event-base for this thread.
getEventBase()746 event_base* getEventBase() const { return eventBase_; }
747
748 // Returns the server for this thread.
getServer()749 TNonblockingServer* getServer() const { return server_; }
750
751 // Returns the number of this IO thread.
getThreadNumber()752 int getThreadNumber() const { return number_; }
753
754 // Returns the thread id associated with this object. This should
755 // only be called after the thread has been started.
getThreadId()756 Thread::id_t getThreadId() const { return threadId_; }
757
758 // Returns the send-fd for task complete notifications.
getNotificationSendFD()759 evutil_socket_t getNotificationSendFD() const { return notificationPipeFDs_[1]; }
760
761 // Returns the read-fd for task complete notifications.
getNotificationRecvFD()762 evutil_socket_t getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
763
764 // Returns the actual thread object associated with this IO thread.
getThread()765 std::shared_ptr<Thread> getThread() const { return thread_; }
766
767 // Sets the actual thread object associated with this IO thread.
setThread(const std::shared_ptr<Thread> & t)768 void setThread(const std::shared_ptr<Thread>& t) { thread_ = t; }
769
770 // Used by TConnection objects to indicate processing has finished.
771 bool notify(TNonblockingServer::TConnection* conn);
772
773 // Enters the event loop and does not return until a call to stop().
774 void run() override;
775
776 // Exits the event loop as soon as possible.
777 void stop();
778
779 // Ensures that the event-loop thread is fully finished and shut down.
780 void join();
781
782 /// Registers the events for the notification & listen sockets
783 void registerEvents();
784
785 private:
786 /**
787 * C-callable event handler for signaling task completion. Provides a
788 * callback that libevent can understand that will read a connection
789 * object's address from a pipe and call connection->transition() for
790 * that object.
791 *
792 * @param fd the descriptor the event occurred on.
793 */
794 static void notifyHandler(evutil_socket_t fd, short which, void* v);
795
796 /**
797 * C-callable event handler for listener events. Provides a callback
798 * that libevent can understand which invokes server->handleEvent().
799 *
800 * @param fd the descriptor the event occurred on.
801 * @param which the flags associated with the event.
802 * @param v void* callback arg where we placed TNonblockingServer's "this".
803 */
listenHandler(evutil_socket_t fd,short which,void * v)804 static void listenHandler(evutil_socket_t fd, short which, void* v) {
805 ((TNonblockingServer*)v)->handleEvent(fd, which);
806 }
807
808 /// Exits the loop ASAP in case of shutdown or error.
809 void breakLoop(bool error);
810
811 /// Create the pipe used to notify I/O process of task completion.
812 void createNotificationPipe();
813
814 /// Unregisters our events for notification and listen sockets.
815 void cleanupEvents();
816
817 /// Sets (or clears) high priority scheduling status for the current thread.
818 void setCurrentThreadHighPriority(bool value);
819
820 private:
821 /// associated server
822 TNonblockingServer* server_;
823
824 /// thread number (for debugging).
825 const int number_;
826
827 /// The actual physical thread id.
828 Thread::id_t threadId_;
829
830 /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
831 THRIFT_SOCKET listenSocket_;
832
833 /// Sets a high scheduling priority when running
834 bool useHighPriority_;
835
836 /// pointer to eventbase to be used for looping
837 event_base* eventBase_;
838
839 /// Set to true if this class is responsible for freeing the event base
840 /// memory.
841 bool ownEventBase_;
842
843 /// Used with eventBase_ for connection events (only in listener thread)
844 struct event serverEvent_;
845
846 /// Used with eventBase_ for task completion notification
847 struct event notificationEvent_;
848
849 /// File descriptors for pipe used for task completion notification.
850 evutil_socket_t notificationPipeFDs_[2];
851
852 /// Actual IO Thread
853 std::shared_ptr<Thread> thread_;
854 };
855 }
856 }
857 } // apache::thrift::server
858
859 #endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
860