1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #define BOOST_TEST_MODULE TNonblockingServerTest
21 #include <boost/test/unit_test.hpp>
22 #include <memory>
23
24 #include "thrift/concurrency/Monitor.h"
25 #include "thrift/concurrency/Thread.h"
26 #include "thrift/server/TNonblockingServer.h"
27 #include "thrift/transport/TNonblockingServerSocket.h"
28
29 #include "gen-cpp/ParentService.h"
30
31 #include <event.h>
32
33 using apache::thrift::concurrency::Guard;
34 using apache::thrift::concurrency::Monitor;
35 using apache::thrift::concurrency::Mutex;
36 using apache::thrift::concurrency::ThreadFactory;
37 using apache::thrift::concurrency::Runnable;
38 using apache::thrift::concurrency::Thread;
39 using apache::thrift::concurrency::ThreadFactory;
40 using apache::thrift::server::TServerEventHandler;
41 using std::make_shared;
42 using std::shared_ptr;
43
44 using namespace apache::thrift;
45
46 struct Handler : public test::ParentServiceIf {
addStringHandler47 void addString(const std::string& s) override { strings_.push_back(s); }
getStringsHandler48 void getStrings(std::vector<std::string>& _return) override { _return = strings_; }
49 std::vector<std::string> strings_;
50
51 // dummy overrides not used in this test
incrementGenerationHandler52 int32_t incrementGeneration() override { return 0; }
getGenerationHandler53 int32_t getGeneration() override { return 0; }
getDataWaitHandler54 void getDataWait(std::string&, const int32_t) override {}
onewayWaitHandler55 void onewayWait() override {}
exceptionWaitHandler56 void exceptionWait(const std::string&) override {}
unexpectedExceptionWaitHandler57 void unexpectedExceptionWait(const std::string&) override {}
58 };
59
60 class Fixture {
61 private:
62 struct ListenEventHandler : public TServerEventHandler {
63 public:
ListenEventHandlerFixture::ListenEventHandler64 ListenEventHandler(Mutex* mutex) : listenMonitor_(mutex), ready_(false) {}
65
preServeFixture::ListenEventHandler66 void preServe() override /* override */ {
67 Guard g(listenMonitor_.mutex());
68 ready_ = true;
69 listenMonitor_.notify();
70 }
71
72 Monitor listenMonitor_;
73 bool ready_;
74 };
75
76 struct Runner : public Runnable {
77 int port;
78 shared_ptr<event_base> userEventBase;
79 shared_ptr<TProcessor> processor;
80 shared_ptr<server::TNonblockingServer> server;
81 shared_ptr<ListenEventHandler> listenHandler;
82 shared_ptr<transport::TNonblockingServerSocket> socket;
83 Mutex mutex_;
84
RunnerFixture::Runner85 Runner() {
86 port = 0;
87 listenHandler.reset(new ListenEventHandler(&mutex_));
88 }
89
runFixture::Runner90 void run() override {
91 // When binding to explicit port, allow retrying to workaround bind failures on ports in use
92 int retryCount = port ? 10 : 0;
93 startServer(retryCount);
94 }
95
readyBarrierFixture::Runner96 void readyBarrier() {
97 // block until server is listening and ready to accept connections
98 Guard g(mutex_);
99 while (!listenHandler->ready_) {
100 listenHandler->listenMonitor_.wait();
101 }
102 }
103 private:
startServerFixture::Runner104 void startServer(int retry_count) {
105 try {
106 socket.reset(new transport::TNonblockingServerSocket(port));
107 server.reset(new server::TNonblockingServer(processor, socket));
108 server->setServerEventHandler(listenHandler);
109 if (userEventBase) {
110 server->registerEvents(userEventBase.get());
111 }
112 server->serve();
113 } catch (const transport::TTransportException&) {
114 if (retry_count > 0) {
115 ++port;
116 startServer(retry_count - 1);
117 } else {
118 throw;
119 }
120 }
121 }
122 };
123
124 struct EventDeleter {
operator ()Fixture::EventDeleter125 void operator()(event_base* p) { event_base_free(p); }
126 };
127
128 protected:
Fixture()129 Fixture() : processor(new test::ParentServiceProcessor(make_shared<Handler>())) {}
130
~Fixture()131 ~Fixture() {
132 if (server) {
133 server->stop();
134 }
135 if (thread) {
136 thread->join();
137 }
138 }
139
setEventBase(event_base * user_event_base)140 void setEventBase(event_base* user_event_base) {
141 userEventBase_.reset(user_event_base, EventDeleter());
142 }
143
startServer(int port)144 int startServer(int port) {
145 shared_ptr<Runner> runner(new Runner);
146 runner->port = port;
147 runner->processor = processor;
148 runner->userEventBase = userEventBase_;
149
150 shared_ptr<ThreadFactory> threadFactory(
151 new ThreadFactory(false));
152 thread = threadFactory->newThread(runner);
153 thread->start();
154 runner->readyBarrier();
155
156 server = runner->server;
157 return runner->port;
158 }
159
canCommunicate(int serverPort)160 bool canCommunicate(int serverPort) {
161 shared_ptr<transport::TSocket> socket(new transport::TSocket("localhost", serverPort));
162 socket->open();
163 test::ParentServiceClient client(make_shared<protocol::TBinaryProtocol>(
164 make_shared<transport::TFramedTransport>(socket)));
165 client.addString("foo");
166 std::vector<std::string> strings;
167 client.getStrings(strings);
168 return strings.size() == 1 && !(strings[0].compare("foo"));
169 }
170
171 private:
172 shared_ptr<event_base> userEventBase_;
173 shared_ptr<test::ParentServiceProcessor> processor;
174 protected:
175 shared_ptr<server::TNonblockingServer> server;
176 private:
177 shared_ptr<apache::thrift::concurrency::Thread> thread;
178
179 };
180
181 BOOST_AUTO_TEST_SUITE(TNonblockingServerTest)
182
BOOST_FIXTURE_TEST_CASE(get_specified_port,Fixture)183 BOOST_FIXTURE_TEST_CASE(get_specified_port, Fixture) {
184 int specified_port = startServer(12345);
185 BOOST_REQUIRE_GE(specified_port, 12345);
186 BOOST_REQUIRE_EQUAL(server->getListenPort(), specified_port);
187 BOOST_CHECK(canCommunicate(specified_port));
188
189 server->stop();
190 }
191
BOOST_FIXTURE_TEST_CASE(get_assigned_port,Fixture)192 BOOST_FIXTURE_TEST_CASE(get_assigned_port, Fixture) {
193 int specified_port = startServer(0);
194 BOOST_REQUIRE_EQUAL(specified_port, 0);
195 int assigned_port = server->getListenPort();
196 BOOST_REQUIRE_NE(assigned_port, 0);
197 BOOST_CHECK(canCommunicate(assigned_port));
198
199 server->stop();
200 }
201
BOOST_FIXTURE_TEST_CASE(provide_event_base,Fixture)202 BOOST_FIXTURE_TEST_CASE(provide_event_base, Fixture) {
203 event_base* eb = event_base_new();
204 setEventBase(eb);
205 startServer(0);
206
207 // assert that the server works
208 BOOST_CHECK(canCommunicate(server->getListenPort()));
209 #if LIBEVENT_VERSION_NUMBER > 0x02010400
210 // also assert that the event_base is actually used when it's easy
211 BOOST_CHECK_GT(event_base_get_num_events(eb, EVENT_BASE_COUNT_ADDED), 0);
212 #endif
213 }
214
215 BOOST_AUTO_TEST_SUITE_END()
216