1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/async/TAsyncBufferProcessor.h>
21 #include <thrift/async/TAsyncProtocolProcessor.h>
22 #include <thrift/async/TEvhttpServer.h>
23 #include <thrift/concurrency/ThreadFactory.h>
24 #include <thrift/concurrency/ThreadManager.h>
25 #include <thrift/processor/TMultiplexedProcessor.h>
26 #include <thrift/protocol/TBinaryProtocol.h>
27 #include <thrift/protocol/TCompactProtocol.h>
28 #include <thrift/protocol/THeaderProtocol.h>
29 #include <thrift/protocol/TJSONProtocol.h>
30 #include <thrift/server/TNonblockingServer.h>
31 #include <thrift/server/TSimpleServer.h>
32 #include <thrift/server/TThreadPoolServer.h>
33 #include <thrift/server/TThreadedServer.h>
34 #include <thrift/transport/THttpServer.h>
35 #include <thrift/transport/THttpTransport.h>
36 #include <thrift/transport/TNonblockingSSLServerSocket.h>
37 #include <thrift/transport/TNonblockingServerSocket.h>
38 #include <thrift/transport/TSSLServerSocket.h>
39 #include <thrift/transport/TSSLSocket.h>
40 #include <thrift/transport/TServerSocket.h>
41 #include <thrift/transport/TTransportUtils.h>
42 #include <thrift/transport/TWebSocketServer.h>
43 #include <thrift/transport/TZlibTransport.h>
44
45 #include "SecondService.h"
46 #include "ThriftTest.h"
47
48 #ifdef HAVE_STDINT_H
49 #include <stdint.h>
50 #endif
51 #ifdef HAVE_INTTYPES_H
52 #include <inttypes.h>
53 #endif
54 #ifdef HAVE_SIGNAL_H
55 #include <signal.h>
56 #endif
57
58 #include <iostream>
59 #include <stdexcept>
60 #include <sstream>
61
62 #include <boost/algorithm/string.hpp>
63 #include <boost/program_options.hpp>
64 #include <boost/filesystem.hpp>
65
66 #if _WIN32
67 #include <thrift/windows/TWinsockSingleton.h>
68 #endif
69
70 using namespace std;
71
72 using namespace apache::thrift;
73 using namespace apache::thrift::async;
74 using namespace apache::thrift::concurrency;
75 using namespace apache::thrift::protocol;
76 using namespace apache::thrift::transport;
77 using namespace apache::thrift::server;
78
79 using namespace thrift::test;
80
81 // to handle a controlled shutdown, signal handling is mandatory
82 #ifdef HAVE_SIGNAL_H
83 apache::thrift::concurrency::Monitor gMonitor;
signal_handler(int signum)84 void signal_handler(int signum)
85 {
86 if (signum == SIGINT) {
87 gMonitor.notifyAll();
88 }
89 }
90 #endif
91
92 class TestHandler : public ThriftTestIf {
93 public:
94 TestHandler() = default;
95
testVoid()96 void testVoid() override { printf("testVoid()\n"); }
97
testString(string & out,const string & thing)98 void testString(string& out, const string& thing) override {
99 printf("testString(\"%s\")\n", thing.c_str());
100 out = thing;
101 }
102
testBool(const bool thing)103 bool testBool(const bool thing) override {
104 printf("testBool(%s)\n", thing ? "true" : "false");
105 return thing;
106 }
107
testByte(const int8_t thing)108 int8_t testByte(const int8_t thing) override {
109 printf("testByte(%d)\n", (int)thing);
110 return thing;
111 }
112
testI32(const int32_t thing)113 int32_t testI32(const int32_t thing) override {
114 printf("testI32(%d)\n", thing);
115 return thing;
116 }
117
testI64(const int64_t thing)118 int64_t testI64(const int64_t thing) override {
119 printf("testI64(%" PRId64 ")\n", thing);
120 return thing;
121 }
122
testDouble(const double thing)123 double testDouble(const double thing) override {
124 printf("testDouble(%f)\n", thing);
125 return thing;
126 }
127
testBinary(std::string & _return,const std::string & thing)128 void testBinary(std::string& _return, const std::string& thing) override {
129 std::ostringstream hexstr;
130 hexstr << std::hex << thing;
131 printf("testBinary(%lu: %s)\n", safe_numeric_cast<unsigned long>(thing.size()), hexstr.str().c_str());
132 _return = thing;
133 }
134
testStruct(Xtruct & out,const Xtruct & thing)135 void testStruct(Xtruct& out, const Xtruct& thing) override {
136 printf("testStruct({\"%s\", %d, %d, %" PRId64 "})\n",
137 thing.string_thing.c_str(),
138 (int)thing.byte_thing,
139 thing.i32_thing,
140 thing.i64_thing);
141 out = thing;
142 }
143
testNest(Xtruct2 & out,const Xtruct2 & nest)144 void testNest(Xtruct2& out, const Xtruct2& nest) override {
145 const Xtruct& thing = nest.struct_thing;
146 printf("testNest({%d, {\"%s\", %d, %d, %" PRId64 "}, %d})\n",
147 (int)nest.byte_thing,
148 thing.string_thing.c_str(),
149 (int)thing.byte_thing,
150 thing.i32_thing,
151 thing.i64_thing,
152 nest.i32_thing);
153 out = nest;
154 }
155
testMap(map<int32_t,int32_t> & out,const map<int32_t,int32_t> & thing)156 void testMap(map<int32_t, int32_t>& out, const map<int32_t, int32_t>& thing) override {
157 printf("testMap({");
158 map<int32_t, int32_t>::const_iterator m_iter;
159 bool first = true;
160 for (m_iter = thing.begin(); m_iter != thing.end(); ++m_iter) {
161 if (first) {
162 first = false;
163 } else {
164 printf(", ");
165 }
166 printf("%d => %d", m_iter->first, m_iter->second);
167 }
168 printf("})\n");
169 out = thing;
170 }
171
testStringMap(map<std::string,std::string> & out,const map<std::string,std::string> & thing)172 void testStringMap(map<std::string, std::string>& out,
173 const map<std::string, std::string>& thing) override {
174 printf("testMap({");
175 map<std::string, std::string>::const_iterator m_iter;
176 bool first = true;
177 for (m_iter = thing.begin(); m_iter != thing.end(); ++m_iter) {
178 if (first) {
179 first = false;
180 } else {
181 printf(", ");
182 }
183 printf("%s => %s", (m_iter->first).c_str(), (m_iter->second).c_str());
184 }
185 printf("})\n");
186 out = thing;
187 }
188
testSet(set<int32_t> & out,const set<int32_t> & thing)189 void testSet(set<int32_t>& out, const set<int32_t>& thing) override {
190 printf("testSet({");
191 set<int32_t>::const_iterator s_iter;
192 bool first = true;
193 for (s_iter = thing.begin(); s_iter != thing.end(); ++s_iter) {
194 if (first) {
195 first = false;
196 } else {
197 printf(", ");
198 }
199 printf("%d", *s_iter);
200 }
201 printf("})\n");
202 out = thing;
203 }
204
testList(vector<int32_t> & out,const vector<int32_t> & thing)205 void testList(vector<int32_t>& out, const vector<int32_t>& thing) override {
206 printf("testList({");
207 vector<int32_t>::const_iterator l_iter;
208 bool first = true;
209 for (l_iter = thing.begin(); l_iter != thing.end(); ++l_iter) {
210 if (first) {
211 first = false;
212 } else {
213 printf(", ");
214 }
215 printf("%d", *l_iter);
216 }
217 printf("})\n");
218 out = thing;
219 }
220
testEnum(const Numberz::type thing)221 Numberz::type testEnum(const Numberz::type thing) override {
222 printf("testEnum(%d)\n", thing);
223 return thing;
224 }
225
testTypedef(const UserId thing)226 UserId testTypedef(const UserId thing) override {
227 printf("testTypedef(%" PRId64 ")\n", thing);
228 return thing;
229 }
230
testMapMap(map<int32_t,map<int32_t,int32_t>> & mapmap,const int32_t hello)231 void testMapMap(map<int32_t, map<int32_t, int32_t> >& mapmap, const int32_t hello) override {
232 printf("testMapMap(%d)\n", hello);
233
234 map<int32_t, int32_t> pos;
235 map<int32_t, int32_t> neg;
236 for (int i = 1; i < 5; i++) {
237 pos.insert(make_pair(i, i));
238 neg.insert(make_pair(-i, -i));
239 }
240
241 mapmap.insert(make_pair(4, pos));
242 mapmap.insert(make_pair(-4, neg));
243 }
244
testInsanity(map<UserId,map<Numberz::type,Insanity>> & insane,const Insanity & argument)245 void testInsanity(map<UserId, map<Numberz::type, Insanity> >& insane, const Insanity& argument) override {
246 printf("testInsanity()\n");
247
248 Insanity looney;
249 map<Numberz::type, Insanity> first_map;
250 map<Numberz::type, Insanity> second_map;
251
252 first_map.insert(make_pair(Numberz::TWO, argument));
253 first_map.insert(make_pair(Numberz::THREE, argument));
254
255 second_map.insert(make_pair(Numberz::SIX, looney));
256
257 insane.insert(make_pair(1, first_map));
258 insane.insert(make_pair(2, second_map));
259
260 printf("return");
261 printf(" = {");
262 map<UserId, map<Numberz::type, Insanity> >::const_iterator i_iter;
263 for (i_iter = insane.begin(); i_iter != insane.end(); ++i_iter) {
264 printf("%" PRId64 " => {", i_iter->first);
265 map<Numberz::type, Insanity>::const_iterator i2_iter;
266 for (i2_iter = i_iter->second.begin(); i2_iter != i_iter->second.end(); ++i2_iter) {
267 printf("%d => {", i2_iter->first);
268 map<Numberz::type, UserId> userMap = i2_iter->second.userMap;
269 map<Numberz::type, UserId>::const_iterator um;
270 printf("{");
271 for (um = userMap.begin(); um != userMap.end(); ++um) {
272 printf("%d => %" PRId64 ", ", um->first, um->second);
273 }
274 printf("}, ");
275
276 vector<Xtruct> xtructs = i2_iter->second.xtructs;
277 vector<Xtruct>::const_iterator x;
278 printf("{");
279 for (x = xtructs.begin(); x != xtructs.end(); ++x) {
280 printf("{\"%s\", %d, %d, %" PRId64 "}, ",
281 x->string_thing.c_str(),
282 (int)x->byte_thing,
283 x->i32_thing,
284 x->i64_thing);
285 }
286 printf("}");
287
288 printf("}, ");
289 }
290 printf("}, ");
291 }
292 printf("}\n");
293 }
294
testMulti(Xtruct & hello,const int8_t arg0,const int32_t arg1,const int64_t arg2,const std::map<int16_t,std::string> & arg3,const Numberz::type arg4,const UserId arg5)295 void testMulti(Xtruct& hello,
296 const int8_t arg0,
297 const int32_t arg1,
298 const int64_t arg2,
299 const std::map<int16_t, std::string>& arg3,
300 const Numberz::type arg4,
301 const UserId arg5) override {
302 (void)arg3;
303 (void)arg4;
304 (void)arg5;
305
306 printf("testMulti()\n");
307
308 hello.string_thing = "Hello2";
309 hello.byte_thing = arg0;
310 hello.i32_thing = arg1;
311 hello.i64_thing = (int64_t)arg2;
312 }
313
testException(const std::string & arg)314 void testException(const std::string& arg) override {
315 printf("testException(%s)\n", arg.c_str());
316 if (arg.compare("Xception") == 0) {
317 Xception e;
318 e.errorCode = 1001;
319 e.message = arg;
320 throw e;
321 } else if (arg.compare("TException") == 0) {
322 apache::thrift::TException e;
323 throw e;
324 } else {
325 Xtruct result;
326 result.string_thing = arg;
327 return;
328 }
329 }
330
testMultiException(Xtruct & result,const std::string & arg0,const std::string & arg1)331 void testMultiException(Xtruct& result,
332 const std::string& arg0,
333 const std::string& arg1) override {
334
335 printf("testMultiException(%s, %s)\n", arg0.c_str(), arg1.c_str());
336
337 if (arg0.compare("Xception") == 0) {
338 Xception e;
339 e.errorCode = 1001;
340 e.message = "This is an Xception";
341 throw e;
342 } else if (arg0.compare("Xception2") == 0) {
343 Xception2 e;
344 e.errorCode = 2002;
345 e.struct_thing.string_thing = "This is an Xception2";
346 throw e;
347 } else {
348 result.string_thing = arg1;
349 return;
350 }
351 }
352
testOneway(const int32_t aNum)353 void testOneway(const int32_t aNum) override {
354 printf("testOneway(%d): call received\n", aNum);
355 }
356 };
357
358 class SecondHandler : public SecondServiceIf
359 {
360 public:
secondtestString(std::string & result,const std::string & thing)361 void secondtestString(std::string& result, const std::string& thing) override
362 { result = "testString(\"" + thing + "\")"; }
363 };
364
365 class TestProcessorEventHandler : public TProcessorEventHandler {
getContext(const char * fn_name,void * serverContext)366 void* getContext(const char* fn_name, void* serverContext) override {
367 (void)serverContext;
368 return new std::string(fn_name);
369 }
freeContext(void * ctx,const char * fn_name)370 void freeContext(void* ctx, const char* fn_name) override {
371 (void)fn_name;
372 delete static_cast<std::string*>(ctx);
373 }
preRead(void * ctx,const char * fn_name)374 void preRead(void* ctx, const char* fn_name) override { communicate("preRead", ctx, fn_name); }
postRead(void * ctx,const char * fn_name,uint32_t bytes)375 void postRead(void* ctx, const char* fn_name, uint32_t bytes) override {
376 (void)bytes;
377 communicate("postRead", ctx, fn_name);
378 }
preWrite(void * ctx,const char * fn_name)379 void preWrite(void* ctx, const char* fn_name) override { communicate("preWrite", ctx, fn_name); }
postWrite(void * ctx,const char * fn_name,uint32_t bytes)380 void postWrite(void* ctx, const char* fn_name, uint32_t bytes) override {
381 (void)bytes;
382 communicate("postWrite", ctx, fn_name);
383 }
asyncComplete(void * ctx,const char * fn_name)384 void asyncComplete(void* ctx, const char* fn_name) override {
385 communicate("asyncComplete", ctx, fn_name);
386 }
handlerError(void * ctx,const char * fn_name)387 void handlerError(void* ctx, const char* fn_name) override {
388 communicate("handlerError", ctx, fn_name);
389 }
390
communicate(const char * event,void * ctx,const char * fn_name)391 void communicate(const char* event, void* ctx, const char* fn_name) {
392 std::cout << event << ": " << *static_cast<std::string*>(ctx) << " = " << fn_name << std::endl;
393 }
394 };
395
396 class TestHandlerAsync : public ThriftTestCobSvIf {
397 public:
TestHandlerAsync(std::shared_ptr<TestHandler> & handler)398 TestHandlerAsync(std::shared_ptr<TestHandler>& handler) : _delegate(handler) {}
399 ~TestHandlerAsync() override = default;
400
testVoid(std::function<void ()> cob)401 void testVoid(std::function<void()> cob) override {
402 _delegate->testVoid();
403 cob();
404 }
405
testString(std::function<void (std::string const & _return)> cob,const std::string & thing)406 void testString(std::function<void(std::string const& _return)> cob,
407 const std::string& thing) override {
408 std::string res;
409 _delegate->testString(res, thing);
410 cob(res);
411 }
412
testBool(std::function<void (bool const & _return)> cob,const bool thing)413 void testBool(std::function<void(bool const& _return)> cob, const bool thing) override {
414 bool res = _delegate->testBool(thing);
415 cob(res);
416 }
417
testByte(std::function<void (int8_t const & _return)> cob,const int8_t thing)418 void testByte(std::function<void(int8_t const& _return)> cob, const int8_t thing) override {
419 int8_t res = _delegate->testByte(thing);
420 cob(res);
421 }
422
testI32(std::function<void (int32_t const & _return)> cob,const int32_t thing)423 void testI32(std::function<void(int32_t const& _return)> cob, const int32_t thing) override {
424 int32_t res = _delegate->testI32(thing);
425 cob(res);
426 }
427
testI64(std::function<void (int64_t const & _return)> cob,const int64_t thing)428 void testI64(std::function<void(int64_t const& _return)> cob, const int64_t thing) override {
429 int64_t res = _delegate->testI64(thing);
430 cob(res);
431 }
432
testDouble(std::function<void (double const & _return)> cob,const double thing)433 void testDouble(std::function<void(double const& _return)> cob, const double thing) override {
434 double res = _delegate->testDouble(thing);
435 cob(res);
436 }
437
testBinary(std::function<void (std::string const & _return)> cob,const std::string & thing)438 void testBinary(std::function<void(std::string const& _return)> cob,
439 const std::string& thing) override {
440 std::string res;
441 _delegate->testBinary(res, thing);
442 cob(res);
443 }
444
testStruct(std::function<void (Xtruct const & _return)> cob,const Xtruct & thing)445 void testStruct(std::function<void(Xtruct const& _return)> cob, const Xtruct& thing) override {
446 Xtruct res;
447 _delegate->testStruct(res, thing);
448 cob(res);
449 }
450
testNest(std::function<void (Xtruct2 const & _return)> cob,const Xtruct2 & thing)451 void testNest(std::function<void(Xtruct2 const& _return)> cob, const Xtruct2& thing) override {
452 Xtruct2 res;
453 _delegate->testNest(res, thing);
454 cob(res);
455 }
456
testMap(std::function<void (std::map<int32_t,int32_t> const & _return)> cob,const std::map<int32_t,int32_t> & thing)457 void testMap(std::function<void(std::map<int32_t, int32_t> const& _return)> cob,
458 const std::map<int32_t, int32_t>& thing) override {
459 std::map<int32_t, int32_t> res;
460 _delegate->testMap(res, thing);
461 cob(res);
462 }
463
testStringMap(std::function<void (std::map<std::string,std::string> const & _return)> cob,const std::map<std::string,std::string> & thing)464 void testStringMap(
465 std::function<void(std::map<std::string, std::string> const& _return)> cob,
466 const std::map<std::string, std::string>& thing) override {
467 std::map<std::string, std::string> res;
468 _delegate->testStringMap(res, thing);
469 cob(res);
470 }
471
testSet(std::function<void (std::set<int32_t> const & _return)> cob,const std::set<int32_t> & thing)472 void testSet(std::function<void(std::set<int32_t> const& _return)> cob,
473 const std::set<int32_t>& thing) override {
474 std::set<int32_t> res;
475 _delegate->testSet(res, thing);
476 cob(res);
477 }
478
testList(std::function<void (std::vector<int32_t> const & _return)> cob,const std::vector<int32_t> & thing)479 void testList(std::function<void(std::vector<int32_t> const& _return)> cob,
480 const std::vector<int32_t>& thing) override {
481 std::vector<int32_t> res;
482 _delegate->testList(res, thing);
483 cob(res);
484 }
485
testEnum(std::function<void (Numberz::type const & _return)> cob,const Numberz::type thing)486 void testEnum(std::function<void(Numberz::type const& _return)> cob,
487 const Numberz::type thing) override {
488 Numberz::type res = _delegate->testEnum(thing);
489 cob(res);
490 }
491
testTypedef(std::function<void (UserId const & _return)> cob,const UserId thing)492 void testTypedef(std::function<void(UserId const& _return)> cob, const UserId thing) override {
493 UserId res = _delegate->testTypedef(thing);
494 cob(res);
495 }
496
testMapMap(std::function<void (std::map<int32_t,std::map<int32_t,int32_t>> const & _return)> cob,const int32_t hello)497 void testMapMap(
498 std::function<void(std::map<int32_t, std::map<int32_t, int32_t> > const& _return)> cob,
499 const int32_t hello) override {
500 std::map<int32_t, std::map<int32_t, int32_t> > res;
501 _delegate->testMapMap(res, hello);
502 cob(res);
503 }
504
testInsanity(std::function<void (std::map<UserId,std::map<Numberz::type,Insanity>> const & _return)> cob,const Insanity & argument)505 void testInsanity(
506 std::function<void(std::map<UserId, std::map<Numberz::type, Insanity> > const& _return)> cob,
507 const Insanity& argument) override {
508 std::map<UserId, std::map<Numberz::type, Insanity> > res;
509 _delegate->testInsanity(res, argument);
510 cob(res);
511 }
512
testMulti(std::function<void (Xtruct const & _return)> cob,const int8_t arg0,const int32_t arg1,const int64_t arg2,const std::map<int16_t,std::string> & arg3,const Numberz::type arg4,const UserId arg5)513 void testMulti(std::function<void(Xtruct const& _return)> cob,
514 const int8_t arg0,
515 const int32_t arg1,
516 const int64_t arg2,
517 const std::map<int16_t, std::string>& arg3,
518 const Numberz::type arg4,
519 const UserId arg5) override {
520 Xtruct res;
521 _delegate->testMulti(res, arg0, arg1, arg2, arg3, arg4, arg5);
522 cob(res);
523 }
524
testException(std::function<void ()> cob,std::function<void (::apache::thrift::TDelayedException * _throw)> exn_cob,const std::string & arg)525 void testException(
526 std::function<void()> cob,
527 std::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob,
528 const std::string& arg) override {
529 try {
530 _delegate->testException(arg);
531 } catch (const apache::thrift::TException& e) {
532 exn_cob(apache::thrift::TDelayedException::delayException(e));
533 return;
534 }
535 cob();
536 }
537
testMultiException(std::function<void (Xtruct const & _return)> cob,std::function<void (::apache::thrift::TDelayedException * _throw)> exn_cob,const std::string & arg0,const std::string & arg1)538 void testMultiException(
539 std::function<void(Xtruct const& _return)> cob,
540 std::function<void(::apache::thrift::TDelayedException* _throw)> exn_cob,
541 const std::string& arg0,
542 const std::string& arg1) override {
543 Xtruct res;
544 try {
545 _delegate->testMultiException(res, arg0, arg1);
546 } catch (const apache::thrift::TException& e) {
547 exn_cob(apache::thrift::TDelayedException::delayException(e));
548 return;
549 }
550 cob(res);
551 }
552
testOneway(std::function<void ()> cob,const int32_t secondsToSleep)553 void testOneway(std::function<void()> cob, const int32_t secondsToSleep) override {
554 _delegate->testOneway(secondsToSleep);
555 cob();
556 }
557
558 protected:
559 std::shared_ptr<TestHandler> _delegate;
560 };
561
562 namespace po = boost::program_options;
563
main(int argc,char ** argv)564 int main(int argc, char** argv) {
565
566 string testDir = boost::filesystem::system_complete(argv[0]).parent_path().parent_path().parent_path().string();
567 string certPath = testDir + "/keys/server.crt";
568 string keyPath = testDir + "/keys/server.key";
569
570 #if _WIN32
571 transport::TWinsockSingleton::create();
572 #endif
573 int port = 9090;
574 bool ssl = false;
575 bool zlib = false;
576 string transport_type = "buffered";
577 string protocol_type = "binary";
578 string server_type = "simple";
579 string domain_socket = "";
580 bool abstract_namespace = false;
581 size_t workers = 4;
582 int string_limit = 0;
583 int container_limit = 0;
584
585 po::options_description desc("Allowed options");
586 desc.add_options()
587 ("help,h", "produce help message")
588 ("port", po::value<int>(&port)->default_value(port), "Port number to listen")
589 ("domain-socket", po::value<string>(&domain_socket) ->default_value(domain_socket), "Unix Domain Socket (e.g. /tmp/ThriftTest.thrift)")
590 ("abstract-namespace", "Create the domain socket in the Abstract Namespace (no connection with filesystem pathnames)")
591 ("server-type", po::value<string>(&server_type)->default_value(server_type), "type of server, \"simple\", \"thread-pool\", \"threaded\", or \"nonblocking\"")
592 ("transport", po::value<string>(&transport_type)->default_value(transport_type), "transport: buffered, framed, http, websocket, zlib")
593 ("protocol", po::value<string>(&protocol_type)->default_value(protocol_type), "protocol: binary, compact, header, json, multi, multic, multih, multij")
594 ("ssl", "Encrypted Transport using SSL")
595 ("zlib", "Wrapped Transport using Zlib")
596 ("processor-events", "processor-events")
597 ("workers,n", po::value<size_t>(&workers)->default_value(workers), "Number of thread pools workers. Only valid for thread-pool server type")
598 ("string-limit", po::value<int>(&string_limit))
599 ("container-limit", po::value<int>(&container_limit));
600
601 po::variables_map vm;
602 po::store(po::parse_command_line(argc, argv, desc), vm);
603 po::notify(vm);
604
605 if (vm.count("help")) {
606 cout << desc << "\n";
607 return 1;
608 }
609
610 try {
611 if (!server_type.empty()) {
612 if (server_type == "simple") {
613 } else if (server_type == "thread-pool") {
614 } else if (server_type == "threaded") {
615 } else if (server_type == "nonblocking") {
616 } else {
617 throw invalid_argument("Unknown server type " + server_type);
618 }
619 }
620
621 if (!protocol_type.empty()) {
622 if (protocol_type == "binary") {
623 } else if (protocol_type == "compact") {
624 } else if (protocol_type == "json") {
625 } else if (protocol_type == "header") {
626 } else if (protocol_type == "multi") { // multiplexed binary
627 } else if (protocol_type == "multic") { // multiplexed compact
628 } else if (protocol_type == "multih") { // multiplexed header
629 } else if (protocol_type == "multij") { // multiplexed json
630 } else {
631 throw invalid_argument("Unknown protocol type " + protocol_type);
632 }
633 }
634
635 if (!transport_type.empty()) {
636 if (transport_type == "buffered") {
637 } else if (transport_type == "framed") {
638 } else if (transport_type == "http") {
639 } else if (transport_type == "websocket") {
640 } else if (transport_type == "zlib") {
641 // crosstester will pass zlib as a flag and a transport right now...
642 } else {
643 throw invalid_argument("Unknown transport type " + transport_type);
644 }
645 }
646
647 } catch (std::exception& e) {
648 cerr << e.what() << endl;
649 cout << desc << "\n";
650 return 1;
651 }
652
653 if (vm.count("ssl")) {
654 ssl = true;
655 }
656
657 if (vm.count("zlib")) {
658 zlib = true;
659 }
660
661 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
662 if (ssl) {
663 signal(SIGPIPE, SIG_IGN); // for OpenSSL, otherwise we end abruptly
664 }
665 #endif
666
667 if (vm.count("abstract-namespace")) {
668 abstract_namespace = true;
669 }
670
671 // Dispatcher
672 std::shared_ptr<TProtocolFactory> protocolFactory;
673 if (protocol_type == "json" || protocol_type == "multij") {
674 std::shared_ptr<TProtocolFactory> jsonProtocolFactory(new TJSONProtocolFactory());
675 protocolFactory = jsonProtocolFactory;
676 } else if (protocol_type == "compact" || protocol_type == "multic") {
677 auto *compactProtocolFactory = new TCompactProtocolFactoryT<TBufferBase>();
678 compactProtocolFactory->setContainerSizeLimit(container_limit);
679 compactProtocolFactory->setStringSizeLimit(string_limit);
680 protocolFactory.reset(compactProtocolFactory);
681 } else if (protocol_type == "header" || protocol_type == "multih") {
682 std::shared_ptr<TProtocolFactory> headerProtocolFactory(new THeaderProtocolFactory());
683 protocolFactory = headerProtocolFactory;
684 } else {
685 auto* binaryProtocolFactory = new TBinaryProtocolFactoryT<TBufferBase>();
686 binaryProtocolFactory->setContainerSizeLimit(container_limit);
687 binaryProtocolFactory->setStringSizeLimit(string_limit);
688 protocolFactory.reset(binaryProtocolFactory);
689 }
690
691 // Processors
692 std::shared_ptr<TestHandler> testHandler(new TestHandler());
693 std::shared_ptr<TProcessor> testProcessor(new ThriftTestProcessor(testHandler));
694
695 if (vm.count("processor-events")) {
696 testProcessor->setEventHandler(
697 std::shared_ptr<TProcessorEventHandler>(new TestProcessorEventHandler()));
698 }
699
700 // Transport
701 std::shared_ptr<TSSLSocketFactory> sslSocketFactory;
702 std::shared_ptr<TServerSocket> serverSocket;
703
704 if (ssl) {
705 sslSocketFactory = std::shared_ptr<TSSLSocketFactory>(new TSSLSocketFactory());
706 sslSocketFactory->loadCertificate(certPath.c_str());
707 sslSocketFactory->loadPrivateKey(keyPath.c_str());
708 sslSocketFactory->ciphers("ALL:!ADH:!LOW:!EXP:!MD5:@STRENGTH");
709 if (server_type != "nonblocking") {
710 serverSocket = std::shared_ptr<TServerSocket>(new TSSLServerSocket(port, sslSocketFactory));
711 }
712 } else {
713 if (domain_socket != "") {
714 if (abstract_namespace) {
715 std::string abstract_socket("\0", 1);
716 abstract_socket += domain_socket;
717 serverSocket = std::shared_ptr<TServerSocket>(new TServerSocket(abstract_socket));
718 } else {
719 unlink(domain_socket.c_str());
720 serverSocket = std::shared_ptr<TServerSocket>(new TServerSocket(domain_socket));
721 }
722 port = 0;
723 } else {
724 serverSocket = std::shared_ptr<TServerSocket>(new TServerSocket(port));
725 }
726 }
727
728 // Factory
729 std::shared_ptr<TTransportFactory> transportFactory;
730
731 if (transport_type == "http" && server_type != "nonblocking") {
732 transportFactory = std::make_shared<THttpServerTransportFactory>();
733 } else if (transport_type == "websocket" && server_type != "nonblocking") {
734 if (protocol_type == "json" || protocol_type == "multij") {
735 transportFactory = std::make_shared<TTextWebSocketServerTransportFactory>();
736 } else {
737 transportFactory = std::make_shared<TBinaryWebSocketServerTransportFactory>();
738 }
739 } else if (transport_type == "framed") {
740 transportFactory = std::make_shared<TFramedTransportFactory>();
741 } else {
742 transportFactory = std::make_shared<TBufferedTransportFactory>();
743 }
744
745 if (zlib) {
746 // currently TZlibTransportFactory is the only factory than can wrap another:
747 transportFactory = std::make_shared<TZlibTransportFactory>(transportFactory);
748 }
749
750 // Server Info
751 cout << "Starting \"" << server_type << "\" server (" << transport_type << "/" << protocol_type
752 << ") listen on: ";
753 if (abstract_namespace) {
754 cout << '@';
755 }
756 cout << domain_socket;
757 if (port != 0) {
758 cout << port;
759 }
760 cout << endl;
761
762 // Multiplexed Processor if needed
763 if (boost::starts_with(protocol_type, "multi")) {
764 std::shared_ptr<SecondHandler> secondHandler(new SecondHandler());
765 std::shared_ptr<SecondServiceProcessor> secondProcessor(new SecondServiceProcessor(secondHandler));
766
767 std::shared_ptr<TMultiplexedProcessor> multiplexedProcessor(new TMultiplexedProcessor());
768 multiplexedProcessor->registerDefault(testProcessor); // non-multi clients go to the default processor (multi:binary, multic:compact, ...)
769 multiplexedProcessor->registerProcessor("ThriftTest", testProcessor);
770 multiplexedProcessor->registerProcessor("SecondService", secondProcessor);
771 testProcessor = std::dynamic_pointer_cast<TProcessor>(multiplexedProcessor);
772 }
773
774 // Server
775 std::shared_ptr<apache::thrift::server::TServer> server;
776
777 if (server_type == "simple") {
778 server.reset(new TSimpleServer(testProcessor, serverSocket, transportFactory, protocolFactory));
779 } else if (server_type == "thread-pool") {
780
781 std::shared_ptr<ThreadFactory> threadFactory
782 = std::shared_ptr<ThreadFactory>(new ThreadFactory());
783
784 std::shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workers);
785 threadManager->threadFactory(threadFactory);
786 threadManager->start();
787
788 server.reset(new TThreadPoolServer(testProcessor,
789 serverSocket,
790 transportFactory,
791 protocolFactory,
792 threadManager));
793 } else if (server_type == "threaded") {
794 server.reset(
795 new TThreadedServer(testProcessor, serverSocket, transportFactory, protocolFactory));
796 } else if (server_type == "nonblocking") {
797 if (transport_type == "http") {
798 std::shared_ptr<TestHandlerAsync> testHandlerAsync(new TestHandlerAsync(testHandler));
799 std::shared_ptr<TAsyncProcessor> testProcessorAsync(
800 new ThriftTestAsyncProcessor(testHandlerAsync));
801 std::shared_ptr<TAsyncBufferProcessor> testBufferProcessor(
802 new TAsyncProtocolProcessor(testProcessorAsync, protocolFactory));
803
804 // not loading nonblockingServer into "server" because
805 // TEvhttpServer doesn't inherit from TServer, and doesn't
806 // provide a stop method.
807 TEvhttpServer nonblockingServer(testBufferProcessor, port);
808 nonblockingServer.serve();
809 } else if (transport_type == "framed") {
810 std::shared_ptr<transport::TNonblockingServerTransport> nbSocket;
811 nbSocket.reset(
812 ssl ? new transport::TNonblockingSSLServerSocket(port, sslSocketFactory)
813 : new transport::TNonblockingServerSocket(port));
814 server.reset(new TNonblockingServer(testProcessor, protocolFactory, nbSocket));
815 } else {
816 cerr << "server-type nonblocking requires transport of http or framed" << endl;
817 exit(1);
818 }
819 }
820
821 if (server.get() != nullptr) {
822 if (protocol_type == "header") {
823 // Tell the server to use the same protocol for input / output
824 // if using header
825 server->setOutputProtocolFactory(std::shared_ptr<TProtocolFactory>());
826 }
827
828 apache::thrift::concurrency::ThreadFactory factory;
829 factory.setDetached(false);
830 std::shared_ptr<apache::thrift::concurrency::Runnable> serverThreadRunner(server);
831 std::shared_ptr<apache::thrift::concurrency::Thread> thread
832 = factory.newThread(serverThreadRunner);
833
834 #ifdef HAVE_SIGNAL_H
835 signal(SIGINT, signal_handler);
836 #endif
837
838 thread->start();
839 gMonitor.waitForever(); // wait for a shutdown signal
840
841 #ifdef HAVE_SIGNAL_H
842 signal(SIGINT, SIG_DFL);
843 #endif
844
845 server->stop();
846 thread->join();
847 server.reset();
848 }
849
850 cout << "done." << endl;
851 return 0;
852 }
853