1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #define BOOST_TEST_MODULE TServerIntegrationTest
21 #include <atomic>
22 #include <boost/test/unit_test.hpp>
23 #include <boost/date_time/posix_time/ptime.hpp>
24 #include <boost/foreach.hpp>
25 #include <boost/format.hpp>
26 #include <boost/thread.hpp>
27 #include <thrift/server/TSimpleServer.h>
28 #include <thrift/server/TThreadPoolServer.h>
29 #include <thrift/server/TThreadedServer.h>
30 #include <memory>
31 #include <thrift/protocol/TBinaryProtocol.h>
32 #include <thrift/transport/TServerSocket.h>
33 #include <thrift/transport/TSocket.h>
34 #include <thrift/transport/TTransport.h>
35 #include "gen-cpp/ParentService.h"
36 #include <string>
37 #include <vector>
38 
39 using apache::thrift::concurrency::Guard;
40 using apache::thrift::concurrency::Monitor;
41 using apache::thrift::concurrency::Mutex;
42 using apache::thrift::concurrency::Synchronized;
43 using apache::thrift::protocol::TBinaryProtocol;
44 using apache::thrift::protocol::TBinaryProtocolFactory;
45 using apache::thrift::protocol::TProtocol;
46 using apache::thrift::protocol::TProtocolFactory;
47 using apache::thrift::transport::TServerSocket;
48 using apache::thrift::transport::TServerTransport;
49 using apache::thrift::transport::TSocket;
50 using apache::thrift::transport::TTransport;
51 using apache::thrift::transport::TTransportException;
52 using apache::thrift::transport::TTransportFactory;
53 using apache::thrift::server::TServer;
54 using apache::thrift::server::TServerEventHandler;
55 using apache::thrift::server::TSimpleServer;
56 using apache::thrift::server::TThreadPoolServer;
57 using apache::thrift::server::TThreadedServer;
58 using std::dynamic_pointer_cast;
59 using std::make_shared;
60 using std::shared_ptr;
61 using apache::thrift::test::ParentServiceClient;
62 using apache::thrift::test::ParentServiceIf;
63 using apache::thrift::test::ParentServiceIfFactory;
64 using apache::thrift::test::ParentServiceIfSingletonFactory;
65 using apache::thrift::test::ParentServiceProcessor;
66 using apache::thrift::test::ParentServiceProcessorFactory;
67 using apache::thrift::TProcessor;
68 using apache::thrift::TProcessorFactory;
69 using boost::posix_time::milliseconds;
70 
71 /**
72  * preServe runs after listen() is successful, when we can connect
73  */
74 class TServerReadyEventHandler : public TServerEventHandler, public Monitor {
75 public:
TServerReadyEventHandler()76   TServerReadyEventHandler() : isListening_(false), accepted_(0) {}
77   ~TServerReadyEventHandler() override = default;
preServe()78   void preServe() override {
79     Synchronized sync(*this);
80     isListening_ = true;
81     notify();
82   }
createContext(shared_ptr<TProtocol> input,shared_ptr<TProtocol> output)83   void* createContext(shared_ptr<TProtocol> input,
84                               shared_ptr<TProtocol> output) override {
85     Synchronized sync(*this);
86     ++accepted_;
87     notify();
88 
89     (void)input;
90     (void)output;
91     return nullptr;
92   }
isListening() const93   bool isListening() const { return isListening_; }
acceptedCount() const94   uint64_t acceptedCount() const { return accepted_; }
95 
96 private:
97   bool isListening_;
98   uint64_t accepted_;
99 };
100 
101 /**
102  * Reusing another generated test, just something to serve up
103  */
104 class ParentHandler : public ParentServiceIf {
105 public:
ParentHandler()106   ParentHandler() : generation_(0) {}
107 
incrementGeneration()108   int32_t incrementGeneration() override {
109     Guard g(mutex_);
110     return ++generation_;
111   }
112 
getGeneration()113   int32_t getGeneration() override {
114     Guard g(mutex_);
115     return generation_;
116   }
117 
addString(const std::string & s)118   void addString(const std::string& s) override {
119     Guard g(mutex_);
120     strings_.push_back(s);
121   }
122 
getStrings(std::vector<std::string> & _return)123   void getStrings(std::vector<std::string>& _return) override {
124     Guard g(mutex_);
125     _return = strings_;
126   }
127 
getDataWait(std::string & _return,const int32_t length)128   void getDataWait(std::string& _return, const int32_t length) override {
129     THRIFT_UNUSED_VARIABLE(_return);
130     THRIFT_UNUSED_VARIABLE(length);
131   }
132 
onewayWait()133   void onewayWait() override {}
134 
exceptionWait(const std::string & message)135   void exceptionWait(const std::string& message) override { THRIFT_UNUSED_VARIABLE(message); }
136 
unexpectedExceptionWait(const std::string & message)137   void unexpectedExceptionWait(const std::string& message) override { THRIFT_UNUSED_VARIABLE(message); }
138 
139 protected:
140   Mutex mutex_;
141   int32_t generation_;
142   std::vector<std::string> strings_;
143 };
144 
autoSocketCloser(TSocket * pSock)145 void autoSocketCloser(TSocket* pSock) {
146   pSock->close();
147   delete pSock;
148 }
149 
150 template <class TServerType>
151 class TServerIntegrationTestFixture {
152 public:
TServerIntegrationTestFixture(const shared_ptr<TProcessorFactory> & _processorFactory)153   TServerIntegrationTestFixture(const shared_ptr<TProcessorFactory>& _processorFactory)
154     : pServer(new TServerType(_processorFactory,
155                               shared_ptr<TServerTransport>(
156                                   new TServerSocket("localhost", 0)),
157                               shared_ptr<TTransportFactory>(new TTransportFactory),
158                               shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
159       pEventHandler(shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
160     bStressDone(false),
161     bStressConnectionCount(0),
162     bStressRequestCount(0) {
163     pServer->setServerEventHandler(pEventHandler);
164   }
165 
TServerIntegrationTestFixture(const shared_ptr<TProcessor> & _processor)166   TServerIntegrationTestFixture(const shared_ptr<TProcessor>& _processor)
167     : pServer(
168           new TServerType(_processor,
169                           shared_ptr<TServerTransport>(new TServerSocket("localhost", 0)),
170                           shared_ptr<TTransportFactory>(new TTransportFactory),
171                           shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory))),
172       pEventHandler(shared_ptr<TServerReadyEventHandler>(new TServerReadyEventHandler)),
173       bStressDone(false),
174     bStressConnectionCount(0),
175     bStressRequestCount(0) {
176     pServer->setServerEventHandler(pEventHandler);
177   }
178 
startServer()179   void startServer() {
180     pServerThread.reset(new boost::thread(std::bind(&TServerType::serve, pServer.get())));
181 
182     // block until listen() completes so clients will be able to connect
183     Synchronized sync(*(pEventHandler.get()));
184     while (!pEventHandler->isListening()) {
185       pEventHandler->wait();
186     }
187 
188     BOOST_TEST_MESSAGE("  server is listening");
189   }
190 
blockUntilAccepted(uint64_t numAccepted)191   void blockUntilAccepted(uint64_t numAccepted) {
192     Synchronized sync(*(pEventHandler.get()));
193     while (pEventHandler->acceptedCount() < numAccepted) {
194       pEventHandler->wait();
195     }
196 
197     BOOST_TEST_MESSAGE(boost::format("  server has accepted %1%") % numAccepted);
198   }
199 
stopServer()200   void stopServer() {
201     if (pServerThread) {
202       pServer->stop();
203       BOOST_TEST_MESSAGE("  server stop completed");
204 
205       pServerThread->join();
206       BOOST_TEST_MESSAGE("  server thread joined");
207       pServerThread.reset();
208     }
209   }
210 
~TServerIntegrationTestFixture()211   ~TServerIntegrationTestFixture() { stopServer(); }
212 
213   /**
214    * Performs a baseline test where some clients are opened and issue a single operation
215    * and then disconnect at different intervals.
216    * \param[in]  numToMake  the number of concurrent clients
217    * \param[in]  expectedHWM  the high water mark we expect of concurrency
218    * \param[in]  purpose  a description of the test for logging purposes
219    */
baseline(int64_t numToMake,int64_t expectedHWM,const std::string & purpose)220   void baseline(int64_t numToMake, int64_t expectedHWM, const std::string& purpose) {
221     BOOST_TEST_MESSAGE(boost::format("Testing %1%: %2% with %3% clients, expect %4% HWM")
222             % typeid(TServerType).name() % purpose % numToMake % expectedHWM);
223 
224     startServer();
225 
226     std::vector<shared_ptr<TSocket> > holdSockets;
227     std::vector<shared_ptr<boost::thread> > holdThreads;
228 
229     for (int64_t i = 0; i < numToMake; ++i) {
230       shared_ptr<TSocket> pClientSock(new TSocket("localhost", getServerPort()),
231                                              autoSocketCloser);
232       holdSockets.push_back(pClientSock);
233       shared_ptr<TProtocol> pClientProtocol(new TBinaryProtocol(pClientSock));
234       ParentServiceClient client(pClientProtocol);
235       pClientSock->open();
236       client.incrementGeneration();
237       holdThreads.push_back(shared_ptr<boost::thread>(
238           new boost::thread(std::bind(&TServerIntegrationTestFixture::delayClose,
239                                         this,
240                                         pClientSock,
241                                         milliseconds(10 * numToMake)))));
242     }
243 
244     BOOST_CHECK_EQUAL(expectedHWM, pServer->getConcurrentClientCountHWM());
245 
246     BOOST_FOREACH (shared_ptr<boost::thread> pThread, holdThreads) { pThread->join(); }
247     holdThreads.clear();
248     holdSockets.clear();
249 
250     stopServer();
251   }
252 
253   /**
254    * Helper method used to close a connection after a delay.
255    * \param[in]  toClose  the connection to close
256    * \param[in]  after  the delay to impose
257    */
delayClose(shared_ptr<TTransport> toClose,boost::posix_time::time_duration after)258   void delayClose(shared_ptr<TTransport> toClose, boost::posix_time::time_duration after) {
259     boost::this_thread::sleep(after);
260     toClose->close();
261   }
262 
263   /**
264    * \returns  the server port number
265    */
getServerPort()266   int getServerPort() {
267     auto* pSock = dynamic_cast<TServerSocket*>(pServer->getServerTransport().get());
268     if (!pSock) { throw std::logic_error("how come?"); }
269     return pSock->getPort();
270   }
271 
272   /**
273    * Performs a stress test by spawning threads that connect, do a number of operations
274    * and disconnect, then a random delay, then do it over again.  This is done for a fixed
275    * period of time to test for concurrency correctness.
276    * \param[in]  numToMake  the number of concurrent clients
277    */
stress(int64_t numToMake,const boost::posix_time::time_duration & duration)278   void stress(int64_t numToMake, const boost::posix_time::time_duration& duration) {
279     BOOST_TEST_MESSAGE(boost::format("Stress testing %1% with %2% clients for %3% seconds")
280         % typeid(TServerType).name() % numToMake % duration.total_seconds());
281 
282     startServer();
283 
284     std::vector<shared_ptr<boost::thread> > holdThreads;
285     for (int64_t i = 0; i < numToMake; ++i) {
286       holdThreads.push_back(shared_ptr<boost::thread>(
287         new boost::thread(std::bind(&TServerIntegrationTestFixture::stressor, this))));
288     }
289 
290     boost::this_thread::sleep(duration);
291     bStressDone = true;
292 
293     BOOST_TEST_MESSAGE(boost::format("  serviced %1% connections (HWM %2%) totaling %3% requests")
294         % bStressConnectionCount % pServer->getConcurrentClientCountHWM() % bStressRequestCount);
295 
296     BOOST_FOREACH (shared_ptr<boost::thread> pThread, holdThreads) { pThread->join(); }
297     holdThreads.clear();
298 
299     BOOST_CHECK(bStressRequestCount > 0);
300 
301     stopServer();
302   }
303 
304   /**
305    * Helper method to stress the system
306    */
stressor()307   void stressor() {
308   while (!bStressDone) {
309       shared_ptr<TSocket> pSocket(new TSocket("localhost", getServerPort()), autoSocketCloser);
310       shared_ptr<TProtocol> pProtocol(new TBinaryProtocol(pSocket));
311       ParentServiceClient client(pProtocol);
312       pSocket->open();
313       bStressConnectionCount.fetch_add(1, std::memory_order_relaxed);
314       for (int i = 0; i < rand() % 1000; ++i) {
315       client.incrementGeneration();
316         bStressRequestCount.fetch_add(1, std::memory_order_relaxed);
317       }
318     }
319   }
320 
321   shared_ptr<TServerType> pServer;
322   shared_ptr<TServerReadyEventHandler> pEventHandler;
323   shared_ptr<boost::thread> pServerThread;
324   std::atomic<bool> bStressDone;
325   std::atomic<int64_t> bStressConnectionCount;
326   std::atomic<int64_t> bStressRequestCount;
327 };
328 
329 template <class TServerType>
330 class TServerIntegrationProcessorFactoryTestFixture
331     : public TServerIntegrationTestFixture<TServerType> {
332 public:
TServerIntegrationProcessorFactoryTestFixture()333   TServerIntegrationProcessorFactoryTestFixture()
334     : TServerIntegrationTestFixture<TServerType>(make_shared<ParentServiceProcessorFactory>(
335           make_shared<ParentServiceIfSingletonFactory>(
336               make_shared<ParentHandler>()))) {}
337 };
338 
339 template <class TServerType>
340 class TServerIntegrationProcessorTestFixture : public TServerIntegrationTestFixture<TServerType> {
341 public:
TServerIntegrationProcessorTestFixture()342   TServerIntegrationProcessorTestFixture()
343     : TServerIntegrationTestFixture<TServerType>(
344           make_shared<ParentServiceProcessor>(make_shared<ParentHandler>())) {}
345 };
346 
347 BOOST_AUTO_TEST_SUITE(constructors)
348 
BOOST_FIXTURE_TEST_CASE(test_simple_factory,TServerIntegrationProcessorFactoryTestFixture<TSimpleServer>)349 BOOST_FIXTURE_TEST_CASE(test_simple_factory,
350                         TServerIntegrationProcessorFactoryTestFixture<TSimpleServer>) {
351   baseline(3, 1, "factory");
352 }
353 
BOOST_FIXTURE_TEST_CASE(test_simple,TServerIntegrationProcessorTestFixture<TSimpleServer>)354 BOOST_FIXTURE_TEST_CASE(test_simple, TServerIntegrationProcessorTestFixture<TSimpleServer>) {
355   baseline(3, 1, "processor");
356 }
357 
BOOST_FIXTURE_TEST_CASE(test_threaded_factory,TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>)358 BOOST_FIXTURE_TEST_CASE(test_threaded_factory,
359                         TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>) {
360   baseline(10, 10, "factory");
361 }
362 
BOOST_FIXTURE_TEST_CASE(test_threaded,TServerIntegrationProcessorTestFixture<TThreadedServer>)363 BOOST_FIXTURE_TEST_CASE(test_threaded, TServerIntegrationProcessorTestFixture<TThreadedServer>) {
364   baseline(10, 10, "processor");
365 }
366 
BOOST_FIXTURE_TEST_CASE(test_threaded_bound,TServerIntegrationProcessorTestFixture<TThreadedServer>)367 BOOST_FIXTURE_TEST_CASE(test_threaded_bound,
368                         TServerIntegrationProcessorTestFixture<TThreadedServer>) {
369   pServer->setConcurrentClientLimit(4);
370   baseline(10, 4, "limit by server framework");
371 }
372 
BOOST_FIXTURE_TEST_CASE(test_threaded_stress,TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>)373 BOOST_FIXTURE_TEST_CASE(test_threaded_stress,
374                         TServerIntegrationProcessorFactoryTestFixture<TThreadedServer>) {
375   stress(10, boost::posix_time::seconds(3));
376 }
377 
BOOST_FIXTURE_TEST_CASE(test_threadpool_factory,TServerIntegrationProcessorFactoryTestFixture<TThreadPoolServer>)378 BOOST_FIXTURE_TEST_CASE(test_threadpool_factory,
379                         TServerIntegrationProcessorFactoryTestFixture<TThreadPoolServer>) {
380   pServer->getThreadManager()->threadFactory(
381       shared_ptr<apache::thrift::concurrency::ThreadFactory>(
382           new apache::thrift::concurrency::ThreadFactory));
383   pServer->getThreadManager()->start();
384 
385   // thread factory has 4 threads as a default
386   // thread factory however is a bad way to limit concurrent clients
387   // as accept() will be called to grab a 5th client socket, in this case
388   // and then the thread factory will block adding the thread to manage
389   // that client.
390   baseline(10, 5, "limit by thread manager");
391 }
392 
BOOST_FIXTURE_TEST_CASE(test_threadpool,TServerIntegrationProcessorTestFixture<TThreadPoolServer>)393 BOOST_FIXTURE_TEST_CASE(test_threadpool,
394                         TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
395   pServer->getThreadManager()->threadFactory(
396       shared_ptr<apache::thrift::concurrency::ThreadFactory>(
397           new apache::thrift::concurrency::ThreadFactory));
398   pServer->getThreadManager()->start();
399 
400   // thread factory has 4 threads as a default
401   // thread factory however is a bad way to limit concurrent clients
402   // as accept() will be called to grab a 5th client socket, in this case
403   // and then the thread factory will block adding the thread to manage
404   // that client.
405   baseline(10, 5, "limit by thread manager");
406 }
407 
BOOST_FIXTURE_TEST_CASE(test_threadpool_bound,TServerIntegrationProcessorTestFixture<TThreadPoolServer>)408 BOOST_FIXTURE_TEST_CASE(test_threadpool_bound,
409                         TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
410   pServer->getThreadManager()->threadFactory(
411       shared_ptr<apache::thrift::concurrency::ThreadFactory>(
412           new apache::thrift::concurrency::ThreadFactory));
413   pServer->getThreadManager()->start();
414   pServer->setConcurrentClientLimit(4);
415 
416   baseline(10, 4, "server framework connection limit");
417 }
418 
BOOST_FIXTURE_TEST_CASE(test_threadpool_stress,TServerIntegrationProcessorTestFixture<TThreadPoolServer>)419 BOOST_FIXTURE_TEST_CASE(test_threadpool_stress,
420                         TServerIntegrationProcessorTestFixture<TThreadPoolServer>) {
421   pServer->getThreadManager()->threadFactory(
422       shared_ptr<apache::thrift::concurrency::ThreadFactory>(
423           new apache::thrift::concurrency::ThreadFactory));
424   pServer->getThreadManager()->start();
425 
426   stress(10, boost::posix_time::seconds(3));
427 }
428 
429 BOOST_AUTO_TEST_SUITE_END()
430 
BOOST_FIXTURE_TEST_SUITE(TServerIntegrationTest,TServerIntegrationProcessorTestFixture<TThreadedServer>)431 BOOST_FIXTURE_TEST_SUITE(TServerIntegrationTest,
432                          TServerIntegrationProcessorTestFixture<TThreadedServer>)
433 
434 BOOST_AUTO_TEST_CASE(test_stop_with_interruptable_clients_connected) {
435   // This tests THRIFT-2441 new behavior: stopping the server disconnects clients
436   BOOST_TEST_MESSAGE("Testing stop with interruptable clients");
437 
438   startServer();
439 
440   shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()),
441                                           autoSocketCloser);
442   pClientSock1->open();
443 
444   shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()),
445                                           autoSocketCloser);
446   pClientSock2->open();
447 
448   // Ensure they have been accepted
449   blockUntilAccepted(2);
450 
451   // The test fixture destructor will force the sockets to disconnect
452   // Prior to THRIFT-2441, pServer->stop() would hang until clients disconnected
453   stopServer();
454 
455   // extra proof the server end disconnected the clients
456   uint8_t buf[1];
457   BOOST_CHECK_EQUAL(0, pClientSock1->read(&buf[0], 1)); // 0 = disconnected
458   BOOST_CHECK_EQUAL(0, pClientSock2->read(&buf[0], 1)); // 0 = disconnected
459 }
460 
BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected)461 BOOST_AUTO_TEST_CASE(test_stop_with_uninterruptable_clients_connected) {
462   // This tests pre-THRIFT-2441 behavior: stopping the server blocks until clients
463   // disconnect.
464     BOOST_TEST_MESSAGE("Testing stop with uninterruptable clients");
465 
466   dynamic_pointer_cast<TServerSocket>(pServer->getServerTransport())
467       ->setInterruptableChildren(false); // returns to pre-THRIFT-2441 behavior
468 
469   startServer();
470 
471   shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()),
472                                           autoSocketCloser);
473   pClientSock1->open();
474 
475   shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()),
476                                           autoSocketCloser);
477   pClientSock2->open();
478 
479   // Ensure they have been accepted
480   blockUntilAccepted(2);
481 
482   boost::thread t1(std::bind(&TServerIntegrationTestFixture::delayClose,
483                                this,
484                                pClientSock1,
485                                milliseconds(250)));
486   boost::thread t2(std::bind(&TServerIntegrationTestFixture::delayClose,
487                                this,
488                                pClientSock2,
489                                milliseconds(250)));
490 
491   // Once the clients disconnect the server will stop
492   stopServer();
493   BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0);
494   t1.join();
495   t2.join();
496 }
497 
BOOST_AUTO_TEST_CASE(test_concurrent_client_limit)498 BOOST_AUTO_TEST_CASE(test_concurrent_client_limit) {
499   startServer();
500   BOOST_TEST_MESSAGE("Testing the concurrent client limit");
501 
502   BOOST_CHECK_EQUAL(INT64_MAX, pServer->getConcurrentClientLimit());
503   pServer->setConcurrentClientLimit(2);
504   BOOST_CHECK_EQUAL(0, pServer->getConcurrentClientCount());
505   BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientLimit());
506 
507   shared_ptr<TSocket> pClientSock1(new TSocket("localhost", getServerPort()),
508                                           autoSocketCloser);
509   pClientSock1->open();
510   blockUntilAccepted(1);
511   BOOST_CHECK_EQUAL(1, pServer->getConcurrentClientCount());
512 
513   shared_ptr<TSocket> pClientSock2(new TSocket("localhost", getServerPort()),
514                                           autoSocketCloser);
515   pClientSock2->open();
516   blockUntilAccepted(2);
517   BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
518 
519   // a third client cannot connect until one of the other two closes
520   boost::thread t2(std::bind(&TServerIntegrationTestFixture::delayClose,
521                                this,
522                                pClientSock2,
523                                milliseconds(250)));
524   shared_ptr<TSocket> pClientSock3(new TSocket("localhost", getServerPort()),
525                                           autoSocketCloser);
526   pClientSock2->open();
527   blockUntilAccepted(2);
528   BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCount());
529   BOOST_CHECK_EQUAL(2, pServer->getConcurrentClientCountHWM());
530 
531   stopServer();
532   BOOST_CHECK(pServer->getConcurrentClientCountHWM() > 0);
533   t2.join();
534 }
535 
536 BOOST_AUTO_TEST_SUITE_END()
537