1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #define BOOST_TEST_MODULE TNonblockingServerTest
21 #include <boost/test/unit_test.hpp>
22 #include <memory>
23 
24 #include "thrift/concurrency/Monitor.h"
25 #include "thrift/concurrency/Thread.h"
26 #include "thrift/server/TNonblockingServer.h"
27 #include "thrift/transport/TNonblockingServerSocket.h"
28 
29 #include "gen-cpp/ParentService.h"
30 
31 #include <event.h>
32 
33 using apache::thrift::concurrency::Guard;
34 using apache::thrift::concurrency::Monitor;
35 using apache::thrift::concurrency::Mutex;
36 using apache::thrift::concurrency::ThreadFactory;
37 using apache::thrift::concurrency::Runnable;
38 using apache::thrift::concurrency::Thread;
39 using apache::thrift::concurrency::ThreadFactory;
40 using apache::thrift::server::TServerEventHandler;
41 using std::make_shared;
42 using std::shared_ptr;
43 
44 using namespace apache::thrift;
45 
46 struct Handler : public test::ParentServiceIf {
addStringHandler47   void addString(const std::string& s) override { strings_.push_back(s); }
getStringsHandler48   void getStrings(std::vector<std::string>& _return) override { _return = strings_; }
49   std::vector<std::string> strings_;
50 
51   // dummy overrides not used in this test
incrementGenerationHandler52   int32_t incrementGeneration() override { return 0; }
getGenerationHandler53   int32_t getGeneration() override { return 0; }
getDataWaitHandler54   void getDataWait(std::string&, const int32_t) override {}
onewayWaitHandler55   void onewayWait() override {}
exceptionWaitHandler56   void exceptionWait(const std::string&) override {}
unexpectedExceptionWaitHandler57   void unexpectedExceptionWait(const std::string&) override {}
58 };
59 
60 class Fixture {
61 private:
62   struct ListenEventHandler : public TServerEventHandler {
63     public:
ListenEventHandlerFixture::ListenEventHandler64       ListenEventHandler(Mutex* mutex) : listenMonitor_(mutex), ready_(false) {}
65 
preServeFixture::ListenEventHandler66       void preServe() override /* override */ {
67         Guard g(listenMonitor_.mutex());
68         ready_ = true;
69         listenMonitor_.notify();
70       }
71 
72       Monitor listenMonitor_;
73       bool ready_;
74   };
75 
76   struct Runner : public Runnable {
77     int port;
78     shared_ptr<event_base> userEventBase;
79     shared_ptr<TProcessor> processor;
80     shared_ptr<server::TNonblockingServer> server;
81     shared_ptr<ListenEventHandler> listenHandler;
82     shared_ptr<transport::TNonblockingServerSocket> socket;
83     Mutex mutex_;
84 
RunnerFixture::Runner85     Runner() {
86       port = 0;
87       listenHandler.reset(new ListenEventHandler(&mutex_));
88     }
89 
runFixture::Runner90     void run() override {
91       // When binding to explicit port, allow retrying to workaround bind failures on ports in use
92       int retryCount = port ? 10 : 0;
93       startServer(retryCount);
94     }
95 
readyBarrierFixture::Runner96     void readyBarrier() {
97       // block until server is listening and ready to accept connections
98       Guard g(mutex_);
99       while (!listenHandler->ready_) {
100         listenHandler->listenMonitor_.wait();
101       }
102     }
103   private:
startServerFixture::Runner104     void startServer(int retry_count) {
105       try {
106         socket.reset(new transport::TNonblockingServerSocket(port));
107         server.reset(new server::TNonblockingServer(processor, socket));
108         server->setServerEventHandler(listenHandler);
109         if (userEventBase) {
110           server->registerEvents(userEventBase.get());
111         }
112         server->serve();
113       } catch (const transport::TTransportException&) {
114         if (retry_count > 0) {
115           ++port;
116           startServer(retry_count - 1);
117         } else {
118           throw;
119         }
120       }
121     }
122   };
123 
124   struct EventDeleter {
operator ()Fixture::EventDeleter125     void operator()(event_base* p) { event_base_free(p); }
126   };
127 
128 protected:
Fixture()129   Fixture() : processor(new test::ParentServiceProcessor(make_shared<Handler>())) {}
130 
~Fixture()131   ~Fixture() {
132     if (server) {
133       server->stop();
134     }
135     if (thread) {
136       thread->join();
137     }
138   }
139 
setEventBase(event_base * user_event_base)140   void setEventBase(event_base* user_event_base) {
141     userEventBase_.reset(user_event_base, EventDeleter());
142   }
143 
startServer(int port)144   int startServer(int port) {
145     shared_ptr<Runner> runner(new Runner);
146     runner->port = port;
147     runner->processor = processor;
148     runner->userEventBase = userEventBase_;
149 
150     shared_ptr<ThreadFactory> threadFactory(
151         new ThreadFactory(false));
152     thread = threadFactory->newThread(runner);
153     thread->start();
154     runner->readyBarrier();
155 
156     server = runner->server;
157     return runner->port;
158   }
159 
canCommunicate(int serverPort)160   bool canCommunicate(int serverPort) {
161     shared_ptr<transport::TSocket> socket(new transport::TSocket("localhost", serverPort));
162     socket->open();
163     test::ParentServiceClient client(make_shared<protocol::TBinaryProtocol>(
164         make_shared<transport::TFramedTransport>(socket)));
165     client.addString("foo");
166     std::vector<std::string> strings;
167     client.getStrings(strings);
168     return strings.size() == 1 && !(strings[0].compare("foo"));
169   }
170 
171 private:
172   shared_ptr<event_base> userEventBase_;
173   shared_ptr<test::ParentServiceProcessor> processor;
174 protected:
175   shared_ptr<server::TNonblockingServer> server;
176 private:
177   shared_ptr<apache::thrift::concurrency::Thread> thread;
178 
179 };
180 
181 BOOST_AUTO_TEST_SUITE(TNonblockingServerTest)
182 
BOOST_FIXTURE_TEST_CASE(get_specified_port,Fixture)183 BOOST_FIXTURE_TEST_CASE(get_specified_port, Fixture) {
184   int specified_port = startServer(12345);
185   BOOST_REQUIRE_GE(specified_port, 12345);
186   BOOST_REQUIRE_EQUAL(server->getListenPort(), specified_port);
187   BOOST_CHECK(canCommunicate(specified_port));
188 
189   server->stop();
190 }
191 
BOOST_FIXTURE_TEST_CASE(get_assigned_port,Fixture)192 BOOST_FIXTURE_TEST_CASE(get_assigned_port, Fixture) {
193   int specified_port = startServer(0);
194   BOOST_REQUIRE_EQUAL(specified_port, 0);
195   int assigned_port = server->getListenPort();
196   BOOST_REQUIRE_NE(assigned_port, 0);
197   BOOST_CHECK(canCommunicate(assigned_port));
198 
199   server->stop();
200 }
201 
BOOST_FIXTURE_TEST_CASE(provide_event_base,Fixture)202 BOOST_FIXTURE_TEST_CASE(provide_event_base, Fixture) {
203   event_base* eb = event_base_new();
204   setEventBase(eb);
205   startServer(0);
206 
207   // assert that the server works
208   BOOST_CHECK(canCommunicate(server->getListenPort()));
209 #if LIBEVENT_VERSION_NUMBER > 0x02010400
210   // also assert that the event_base is actually used when it's easy
211   BOOST_CHECK_GT(event_base_get_num_events(eb, EVENT_BASE_COUNT_ADDED), 0);
212 #endif
213 }
214 
215 BOOST_AUTO_TEST_SUITE_END()
216