1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #define BOOST_TEST_MODULE TServerIntegrationTest
21 #include <atomic>
22 #include <boost/test/unit_test.hpp>
23 #include <boost/date_time/posix_time/ptime.hpp>
24 #include <boost/foreach.hpp>
25 #include <boost/format.hpp>
26 #include <boost/thread.hpp>
27 #include <thrift/server/TSimpleServer.h>
28 #include <thrift/server/TThreadPoolServer.h>
29 #include <thrift/server/TThreadedServer.h>
30 #include <memory>
31 #include <thrift/protocol/TBinaryProtocol.h>
32 #include <thrift/transport/TServerSocket.h>
33 #include <thrift/transport/TSocket.h>
34 #include <thrift/transport/TTransport.h>
35 #include "gen-cpp/ParentService.h"
36 #include <string>
37 #include <vector>
38
39 using apache::thrift::concurrency::Guard;
40 using apache::thrift::concurrency::Monitor;
41 using apache::thrift::concurrency::Mutex;
42 using apache::thrift::concurrency::Synchronized;
43 using apache::thrift::protocol::TBinaryProtocol;
44 using apache::thrift::protocol::TBinaryProtocolFactory;
45 using apache::thrift::protocol::TProtocol;
46 using apache::thrift::protocol::TProtocolFactory;
47 using apache::thrift::transport::TServerSocket;
48 using apache::thrift::transport::TServerTransport;
49 using apache::thrift::transport::TSocket;
50 using apache::thrift::transport::TTransport;
51 using apache::thrift::transport::TTransportException;
52 using apache::thrift::transport::TTransportFactory;
53 using apache::thrift::server::TServer;
54 using apache::thrift::server::TServerEventHandler;
55 using apache::thrift::server::TSimpleServer;
56 using apache::thrift::server::TThreadPoolServer;
57 using apache::thrift::server::TThreadedServer;
58 using std::dynamic_pointer_cast;
59 using std::make_shared;
60 using std::shared_ptr;
61 using apache::thrift::test::ParentServiceClient;
62 using apache::thrift::test::ParentServiceIf;
63 using apache::thrift::test::ParentServiceIfFactory;
64 using apache::thrift::test::ParentServiceIfSingletonFactory;
65 using apache::thrift::test::ParentServiceProcessor;
66 using apache::thrift::test::ParentServiceProcessorFactory;
67 using apache::thrift::TProcessor;
68 using apache::thrift::TProcessorFactory;
69 using boost::posix_time::milliseconds;
70
71 /**
72 * preServe runs after listen() is successful, when we can connect
73 */
74 class TServerReadyEventHandler : public TServerEventHandler, public Monitor {
75 public:
TServerReadyEventHandler()76 TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
77 ~TServerReadyEventHandler() override = default;
preServe()78 void preServe() override {
79 Synchronized sync(*this);
80 isListening_ = true;
81 notify();
82 }
createContext(shared_ptr<TProtocol> input,shared_ptr<TProtocol> output)83 void* createContext(shared_ptr<TProtocol> input,
84 shared_ptr<TProtocol> output) override {
85 Synchronized sync(*this);
86 ++accepted_;
87 notify();
88
89 (void)input;
90 (void)output;
91 return nullptr;
92 }
isListening() const93 bool isListening() const { return isListening_; }
acceptedCount() const94 uint64_t acceptedCount() const { return accepted_; }
95
96 private:
97 bool isListening_;
98 uint64_t accepted_;
99 };
100
101 /**
102 * Reusing another generated test, just something to serve up
103 */
104 class ParentHandler : public ParentServiceIf {
105 public:
ParentHandler()106 ParentHandler() : generation_(0) {}
107
incrementGeneration()108 int32_t incrementGeneration() override {
109 Guard g(mutex_);
110 return ++generation_;
111 }
112
getGeneration()113 int32_t getGeneration() override {
114 Guard g(mutex_);
115 return generation_;
116 }
117
addString(const std::string & s)118 void addString(const std::string& s) override {
119 Guard g(mutex_);
120 strings_.push_back(s);
121 }
122
getStrings(std::vector<std::string> & _return)123 void getStrings(std::vector<std::string>& _return) override {
124 Guard g(mutex_);
125 _return = strings_;
126 }
127
getDataWait(std::string & _return,const int32_t length)128 void getDataWait(std::string& _return, const int32_t length) override {
129 THRIFT_UNUSED_VARIABLE(_return);
130 THRIFT_UNUSED_VARIABLE(length);
131 }
132
onewayWait()133 void onewayWait() override {}
134
exceptionWait(const std::string & message)135 void exceptionWait(const std::string& message) override { THRIFT_UNUSED_VARIABLE(message); }
136
unexpectedExceptionWait(const std::string & message)137 void unexpectedExceptionWait(const std::string& message) override { THRIFT_UNUSED_VARIABLE(message); }
138
139 protected:
140 Mutex mutex_;
141 int32_t generation_;
142 std::vector<std::string> strings_;
143 };
144
autoSocketCloser(TSocket * pSock)145 void autoSocketCloser(TSocket* pSock) {
146 pSock->close();
147 delete pSock;
148 }
149
150 template <class TServerType>
151 class TServerIntegrationTestFixture {
152 public:
TServerIntegrationTestFixture(const shared_ptr<TProcessorFactory> & _processorFactory)153 TServerIntegrationTestFixture(const shared_ptr<TProcessorFactory>& _processorFactory)
154 : pServer(new TServerType(_processorFactory,
155 shared_ptr<TServerTransport>(
156 new TServerSocket("localhost", 0)),
157 shared_ptr<TTransportFactory>(new TTransportFactory),
158 shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
159 pEventHandler(shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
160 bStressDone(false),
161 bStressConnectionCount(0),
162 bStressRequestCount(0) {
163 pServer->setServerEventHandler(pEventHandler);
164 }
165
TServerIntegrationTestFixture(const shared_ptr<TProcessor> & _processor)166 TServerIntegrationTestFixture(const shared_ptr<TProcessor>& _processor)
167 : pServer(
168 new TServerType(_processor,
169 shared_ptr<TServerTransport>(new TServerSocket("localhost", 0)),
170 shared_ptr<TTransportFactory>(new TTransportFactory),
171 shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
172 pEventHandler(shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
173 bStressDone(false),
174 bStressConnectionCount(0),
175 bStressRequestCount(0) {
176 pServer->setServerEventHandler(pEventHandler);
177 }
178
startServer()179 void startServer() {
180 pServerThread.reset(new boost::thread(std::bind(&TServerType::serve, pServer.get())));
181
182 // block until listen() completes so clients will be able to connect
183 Synchronized sync(*(pEventHandler.get()));
184 while (!pEventHandler->isListening()) {
185 pEventHandler->wait();
186 }
187
188 BOOST_TEST_MESSAGE(" server is listening");
189 }
190
blockUntilAccepted(uint64_t numAccepted)191 void blockUntilAccepted(uint64_t numAccepted) {
192 Synchronized sync(*(pEventHandler.get()));
193 while (pEventHandler->acceptedCount() < numAccepted) {
194 pEventHandler->wait();
195 }
196
197 BOOST_TEST_MESSAGE(boost::format(" server has accepted %1%") % numAccepted);
198 }
199
stopServer()200 void stopServer() {
201 if (pServerThread) {
202 pServer->stop();
203 BOOST_TEST_MESSAGE(" server stop completed");
204
205 pServerThread->join();
206 BOOST_TEST_MESSAGE(" server thread joined");
207 pServerThread.reset();
208 }
209 }
210
~TServerIntegrationTestFixture()211 ~TServerIntegrationTestFixture() { stopServer(); }
212
213 /**
214 * Performs a baseline test where some clients are opened and issue a single operation
215 * and then disconnect at different intervals.
216 * \param[in] numToMake the number of concurrent clients
217 * \param[in] expectedHWM the high water mark we expect of concurrency
218 * \param[in] purpose a description of the test for logging purposes
219 */
baseline(int64_t numToMake,int64_t expectedHWM,const std::string & purpose)220 void baseline(int64_t numToMake, int64_t expectedHWM, const std::string& purpose) {
221 BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM")
222 % typeid(TServerType).name() % purpose % numToMake % expectedHWM);
223
224 startServer();
225
226 std::vector<shared_ptr<TSocket> > holdSockets;
227 std::vector<shared_ptr<boost::thread> > holdThreads;
228
229 for (int64_t i = 0; i < numToMake; ++i) {
230 shared_ptr<TSocket> pClientSock(new TSocket("localhost", getServerPort()),
231 autoSocketCloser);
232 holdSockets.push_back(pClientSock);
233 shared_ptr<TProtocol> pClientProtocol(new TBinaryProtocol(pClientSock));
234 ParentServiceClient client(pClientProtocol);
235 pClientSock->open();
236 client.incrementGeneration();
237 holdThreads.push_back(shared_ptr<boost::thread>(
238 new boost::thread(std::bind(&TServerIntegrationTestFixture::delayClose,
239 this,
240 pClientSock,
241 milliseconds(10 * numToMake)))));
242 }
243
244 BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM());
245
246 BOOST_FOREACH (shared_ptr<boost::thread> pThread, holdThreads) { pThread->join(); }
247 holdThreads.clear();
248 holdSockets.clear();
249
250 stopServer();
251 }
252
253 /**
254 * Helper method used to close a connection after a delay.
255 * \param[in] toClose the connection to close
256 * \param[in] after the delay to impose
257 */
delayClose(shared_ptr<TTransport> toClose,boost::posix_time::time_duration after)258 void delayClose(shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
259 boost::this_thread::sleep(after);
260 toClose->close();
261 }
262
263 /**
264 * \returns the server port number
265 */
getServerPort()266 int getServerPort() {
267 auto* pSock = dynamic_cast<TServerSocket*>(pServer->getServerTransport().get());
268 if (!pSock) { throw std::logic_error("how come?"); }
269 return pSock->getPort();
270 }
271
272 /**
273 * Performs a stress test by spawning threads that connect, do a number of operations
274 * and disconnect, then a random delay, then do it over again. This is done for a fixed
275 * period of time to test for concurrency correctness.
276 * \param[in] numToMake the number of concurrent clients
277 */
stress(int64_t numToMake,const boost::posix_time::time_duration & duration)278 void stress(int64_t numToMake, const boost::posix_time::time_duration& duration) {
279 BOOST_TEST_MESSAGE(boost::format("Stress testing %1% with %2% clients for %3% seconds")
280 % typeid(TServerType).name() % numToMake % duration.total_seconds());
281
282 startServer();
283
284 std::vector<shared_ptr<boost::thread> > holdThreads;
285 for (int64_t i = 0; i < numToMake; ++i) {
286 holdThreads.push_back(shared_ptr<boost::thread>(
287 new boost::thread(std::bind(&TServerIntegrationTestFixture::stressor, this))));
288 }
289
290 boost::this_thread::sleep(duration);
291 bStressDone = true;
292
293 BOOST_TEST_MESSAGE(boost::format(" serviced %1% connections (HWM %2%) totaling %3% requests")
294 % bStressConnectionCount % pServer->getConcurrentClientCountHWM() % bStressRequestCount);
295
296 BOOST_FOREACH (shared_ptr<boost::thread> pThread, holdThreads) { pThread->join(); }
297 holdThreads.clear();
298
299 BOOST_CHECK(bStressRequestCount > 0);
300
301 stopServer();
302 }
303
304 /**
305 * Helper method to stress the system
306 */
stressor()307 void stressor() {
308 while (!bStressDone) {
309 shared_ptr<TSocket> pSocket(new TSocket("localhost", getServerPort()), autoSocketCloser);
310 shared_ptr<TProtocol> pProtocol(new TBinaryProtocol(pSocket));
311 ParentServiceClient client(pProtocol);
312 pSocket->open();
313 bStressConnectionCount.fetch_add(1, std::memory_order_relaxed);
314 for (int i = 0; i < rand() % 1000; ++i) {
315 client.incrementGeneration();
316 bStressRequestCount.fetch_add(1, std::memory_order_relaxed);
317 }
318 }
319 }
320
321 shared_ptr<TServerType> pServer;
322 shared_ptr<TServerReadyEventHandler> pEventHandler;
323 shared_ptr<boost::thread> pServerThread;
324 std::atomic<bool> bStressDone;
325 std::atomic<int64_t> bStressConnectionCount;
326 std::atomic<int64_t> bStressRequestCount;
327 };
328
329 template <class TServerType>
330 class TServerIntegrationProcessorFactoryTestFixture
331 : public TServerIntegrationTestFixture<TServerType> {
332 public:
TServerIntegrationProcessorFactoryTestFixture()333 TServerIntegrationProcessorFactoryTestFixture()
334 : TServerIntegrationTestFixture<TServerType>(make_shared<ParentServiceProcessorFactory>(
335 make_shared<ParentServiceIfSingletonFactory>(
336 make_shared<ParentHandler>()))) {}
337 };
338
339 template <class TServerType>
340 class TServerIntegrationProcessorTestFixture : public TServerIntegrationTestFixture<TServerType> {
341 public:
TServerIntegrationProcessorTestFixture()342 TServerIntegrationProcessorTestFixture()
343 : TServerIntegrationTestFixture<TServerType>(
344 make_shared<ParentServiceProcessor>(make_shared<ParentHandler>())) {}
345 };
346
347 BOOST_AUTO_TEST_SUITE(constructors)
348
BOOST_FIXTURE_TEST_CASE(test_simple_factory,TServerIntegrationProcessorFactoryTestFixture<TSimpleServer>)349 BOOST_FIXTURE_TEST_CASE(test_simple_factory,
350 TServerIntegrationProcessorFactoryTestFixture<TSimpleServer>) {
351 baseline(3, 1, "factory");
352 }
353
BOOST_FIXTURE_TEST_CASE(test_simple,TServerIntegrationProcessorTestFixture<TSimpleServer>)354 BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationProcessorTestFixture<TSimpleServer>) {
355 baseline(3, 1, "processor");
356 }
357
BOOST_FIXTURE_TEST_CASE(test_threaded_factory,TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>)358 BOOST_FIXTURE_TEST_CASE(test_threaded_factory,
359 TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>) {
360 baseline(10, 10, "factory");
361 }
362
BOOST_FIXTURE_TEST_CASE(test_threaded,TServerIntegrationProcessorTestFixture<TThreadedServer>)363 BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationProcessorTestFixture<TThreadedServer>) {
364 baseline(10, 10, "processor");
365 }
366
BOOST_FIXTURE_TEST_CASE(test_threaded_bound,TServerIntegrationProcessorTestFixture<TThreadedServer>)367 BOOST_FIXTURE_TEST_CASE(test_threaded_bound,
368 TServerIntegrationProcessorTestFixture<TThreadedServer>) {
369 pServer->setConcurrentClientLimit(4);
370 baseline(10, 4, "limit by server framework");
371 }
372
BOOST_FIXTURE_TEST_CASE(test_threaded_stress,TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>)373 BOOST_FIXTURE_TEST_CASE(test_threaded_stress,
374 TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>) {
375 stress(10, boost::posix_time::seconds(3));
376 }
377
BOOST_FIXTURE_TEST_CASE(test_threadpool_factory,TServerIntegrationProcessorFactoryTestFixture<TThreadPoolServer>)378 BOOST_FIXTURE_TEST_CASE(test_threadpool_factory,
379 TServerIntegrationProcessorFactoryTestFixture<TThreadPoolServer>) {
380 pServer->getThreadManager()->threadFactory(
381 shared_ptr<apache::thrift::concurrency::ThreadFactory>(
382 new apache::thrift::concurrency::ThreadFactory));
383 pServer->getThreadManager()->start();
384
385 // thread factory has 4 threads as a default
386 // thread factory however is a bad way to limit concurrent clients
387 // as accept() will be called to grab a 5th client socket, in this case
388 // and then the thread factory will block adding the thread to manage
389 // that client.
390 baseline(10, 5, "limit by thread manager");
391 }
392
BOOST_FIXTURE_TEST_CASE(test_threadpool,TServerIntegrationProcessorTestFixture<TThreadPoolServer>)393 BOOST_FIXTURE_TEST_CASE(test_threadpool,
394 TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
395 pServer->getThreadManager()->threadFactory(
396 shared_ptr<apache::thrift::concurrency::ThreadFactory>(
397 new apache::thrift::concurrency::ThreadFactory));
398 pServer->getThreadManager()->start();
399
400 // thread factory has 4 threads as a default
401 // thread factory however is a bad way to limit concurrent clients
402 // as accept() will be called to grab a 5th client socket, in this case
403 // and then the thread factory will block adding the thread to manage
404 // that client.
405 baseline(10, 5, "limit by thread manager");
406 }
407
BOOST_FIXTURE_TEST_CASE(test_threadpool_bound,TServerIntegrationProcessorTestFixture<TThreadPoolServer>)408 BOOST_FIXTURE_TEST_CASE(test_threadpool_bound,
409 TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
410 pServer->getThreadManager()->threadFactory(
411 shared_ptr<apache::thrift::concurrency::ThreadFactory>(
412 new apache::thrift::concurrency::ThreadFactory));
413 pServer->getThreadManager()->start();
414 pServer->setConcurrentClientLimit(4);
415
416 baseline(10, 4, "server framework connection limit");
417 }
418
BOOST_FIXTURE_TEST_CASE(test_threadpool_stress,TServerIntegrationProcessorTestFixture<TThreadPoolServer>)419 BOOST_FIXTURE_TEST_CASE(test_threadpool_stress,
420 TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
421 pServer->getThreadManager()->threadFactory(
422 shared_ptr<apache::thrift::concurrency::ThreadFactory>(
423 new apache::thrift::concurrency::ThreadFactory));
424 pServer->getThreadManager()->start();
425
426 stress(10, boost::posix_time::seconds(3));
427 }
428
429 BOOST_AUTO_TEST_SUITE_END()
430
BOOST_FIXTURE_TEST_SUITE(TServerIntegrationTest,TServerIntegrationProcessorTestFixture<TThreadedServer>)431 BOOST_FIXTURE_TEST_SUITE(TServerIntegrationTest,
432 TServerIntegrationProcessorTestFixture<TThreadedServer>)
433
434 BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected) {
435 // This tests THRIFT-2441 new behavior: stopping the server disconnects clients
436 BOOST_TEST_MESSAGE("Testing stop with interruptable clients");
437
438 startServer();
439
440 shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()),
441 autoSocketCloser);
442 pClientSock1->open();
443
444 shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()),
445 autoSocketCloser);
446 pClientSock2->open();
447
448 // Ensure they have been accepted
449 blockUntilAccepted(2);
450
451 // The test fixture destructor will force the sockets to disconnect
452 // Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected
453 stopServer();
454
455 // extra proof the server end disconnected the clients
456 uint8_t buf[1];
457 BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected
458 BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected
459 }
460
BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)461 BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected) {
462 // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
463 // disconnect.
464 BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients");
465
466 dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())
467 ->setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
468
469 startServer();
470
471 shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()),
472 autoSocketCloser);
473 pClientSock1->open();
474
475 shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()),
476 autoSocketCloser);
477 pClientSock2->open();
478
479 // Ensure they have been accepted
480 blockUntilAccepted(2);
481
482 boost::thread t1(std::bind(&TServerIntegrationTestFixture::delayClose,
483 this,
484 pClientSock1,
485 milliseconds(250)));
486 boost::thread t2(std::bind(&TServerIntegrationTestFixture::delayClose,
487 this,
488 pClientSock2,
489 milliseconds(250)));
490
491 // Once the clients disconnect the server will stop
492 stopServer();
493 BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0);
494 t1.join();
495 t2.join();
496 }
497
BOOST_AUTO_TEST_CASE(test_concurrent_client_limit)498 BOOST_AUTO_TEST_CASE(test_concurrent_client_limit) {
499 startServer();
500 BOOST_TEST_MESSAGE("Testing the concurrent client limit");
501
502 BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit());
503 pServer->setConcurrentClientLimit(2);
504 BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount());
505 BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit());
506
507 shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()),
508 autoSocketCloser);
509 pClientSock1->open();
510 blockUntilAccepted(1);
511 BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount());
512
513 shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()),
514 autoSocketCloser);
515 pClientSock2->open();
516 blockUntilAccepted(2);
517 BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
518
519 // a third client cannot connect until one of the other two closes
520 boost::thread t2(std::bind(&TServerIntegrationTestFixture::delayClose,
521 this,
522 pClientSock2,
523 milliseconds(250)));
524 shared_ptr<TSocket> pClientSock3(new TSocket("localhost", getServerPort()),
525 autoSocketCloser);
526 pClientSock2->open();
527 blockUntilAccepted(2);
528 BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
529 BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM());
530
531 stopServer();
532 BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0);
533 t2.join();
534 }
535
536 BOOST_AUTO_TEST_SUITE_END()
537