1 /*
2  * Copyright (c) 2006- Facebook
3  *
4  * SPDX-License-Identifier: Apache-2.0
5  */
6 /*
7  * Licensed to the Apache Software Foundation (ASF) under one
8  * or more contributor license agreements. See the NOTICE file
9  * distributed with this work for additional information
10  * regarding copyright ownership. The ASF licenses this file
11  * to you under the Apache License, Version 2.0 (the
12  * "License"); you may not use this file except in compliance
13  * with the License. You may obtain a copy of the License at
14  *
15  *   http://www.apache.org/licenses/LICENSE-2.0
16  *
17  * Unless required by applicable law or agreed to in writing,
18  * software distributed under the License is distributed on an
19  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20  * KIND, either express or implied. See the License for the
21  * specific language governing permissions and limitations
22  * under the License.
23  */
24 
25 #include <algorithm>
26 #include <functional>
27 #include <stdexcept>
28 #include <stdint.h>
29 #include <thrift/server/TServerFramework.h>
30 
31 namespace apache
32 {
33 namespace thrift
34 {
35 namespace server
36 {
37 
38 // using apache::thrift::concurrency::Synchronized;
39 using apache::thrift::protocol::TProtocol;
40 using apache::thrift::protocol::TProtocolFactory;
41 using apache::thrift::transport::TServerTransport;
42 using apache::thrift::transport::TTransport;
43 using apache::thrift::transport::TTransportException;
44 using apache::thrift::transport::TTransportFactory;
45 using std::bind;
46 using std::shared_ptr;
47 using std::string;
48 
TServerFramework(const shared_ptr<TProcessorFactory> & processorFactory,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & transportFactory,const shared_ptr<TProtocolFactory> & protocolFactory)49 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory> &processorFactory,
50 				   const shared_ptr<TServerTransport> &serverTransport,
51 				   const shared_ptr<TTransportFactory> &transportFactory,
52 				   const shared_ptr<TProtocolFactory> &protocolFactory)
53 	: TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
54 	  clients_(0), hwm_(0), limit_(INT64_MAX)
55 {
56 }
57 
TServerFramework(const shared_ptr<TProcessor> & processor,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & transportFactory,const shared_ptr<TProtocolFactory> & protocolFactory)58 TServerFramework::TServerFramework(const shared_ptr<TProcessor> &processor,
59 				   const shared_ptr<TServerTransport> &serverTransport,
60 				   const shared_ptr<TTransportFactory> &transportFactory,
61 				   const shared_ptr<TProtocolFactory> &protocolFactory)
62 	: TServer(processor, serverTransport, transportFactory, protocolFactory), clients_(0),
63 	  hwm_(0), limit_(INT64_MAX)
64 {
65 }
66 
TServerFramework(const shared_ptr<TProcessorFactory> & processorFactory,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & inputTransportFactory,const shared_ptr<TTransportFactory> & outputTransportFactory,const shared_ptr<TProtocolFactory> & inputProtocolFactory,const shared_ptr<TProtocolFactory> & outputProtocolFactory)67 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory> &processorFactory,
68 				   const shared_ptr<TServerTransport> &serverTransport,
69 				   const shared_ptr<TTransportFactory> &inputTransportFactory,
70 				   const shared_ptr<TTransportFactory> &outputTransportFactory,
71 				   const shared_ptr<TProtocolFactory> &inputProtocolFactory,
72 				   const shared_ptr<TProtocolFactory> &outputProtocolFactory)
73 	: TServer(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
74 		  inputProtocolFactory, outputProtocolFactory),
75 	  clients_(0), hwm_(0), limit_(INT64_MAX)
76 {
77 }
78 
TServerFramework(const shared_ptr<TProcessor> & processor,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & inputTransportFactory,const shared_ptr<TTransportFactory> & outputTransportFactory,const shared_ptr<TProtocolFactory> & inputProtocolFactory,const shared_ptr<TProtocolFactory> & outputProtocolFactory)79 TServerFramework::TServerFramework(const shared_ptr<TProcessor> &processor,
80 				   const shared_ptr<TServerTransport> &serverTransport,
81 				   const shared_ptr<TTransportFactory> &inputTransportFactory,
82 				   const shared_ptr<TTransportFactory> &outputTransportFactory,
83 				   const shared_ptr<TProtocolFactory> &inputProtocolFactory,
84 				   const shared_ptr<TProtocolFactory> &outputProtocolFactory)
85 	: TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
86 		  inputProtocolFactory, outputProtocolFactory),
87 	  clients_(0), hwm_(0), limit_(INT64_MAX)
88 {
89 }
90 
91 TServerFramework::~TServerFramework() = default;
92 
releaseOneDescriptor(const string & name,T & pTransport)93 template <typename T> static void releaseOneDescriptor(const string &name, T &pTransport)
94 {
95 	if (pTransport) {
96 		try {
97 			pTransport->close();
98 		} catch (const TTransportException &ttx) {
99 			string errStr =
100 				string("TServerFramework " + name + " close failed: ") + ttx.what();
101 			GlobalOutput(errStr.c_str());
102 		}
103 	}
104 }
105 
serve()106 void TServerFramework::serve()
107 {
108 	shared_ptr<TTransport> client;
109 	shared_ptr<TTransport> inputTransport;
110 	shared_ptr<TTransport> outputTransport;
111 	shared_ptr<TProtocol> inputProtocol;
112 	shared_ptr<TProtocol> outputProtocol;
113 
114 	// Start the server listening
115 	serverTransport_->listen();
116 
117 	// Run the preServe event to indicate server is now listening
118 	// and that it is safe to connect.
119 	if (eventHandler_) {
120 		eventHandler_->preServe();
121 	}
122 
123 	// Fetch client from server
124 	for (;;) {
125 		try {
126 			// Dereference any resources from any previous client creation
127 			// such that a blocking accept does not hold them indefinitely.
128 			outputProtocol.reset();
129 			inputProtocol.reset();
130 			outputTransport.reset();
131 			inputTransport.reset();
132 			client.reset();
133 
134 			// If we have reached the limit on the number of concurrent
135 			// clients allowed, wait for one or more clients to drain before
136 			// accepting another.
137 			{
138 				// Synchronized sync(mon_);
139 				while (clients_ >= limit_) {
140 					// mon_.wait();
141 				}
142 			}
143 
144 			client = serverTransport_->accept();
145 
146 			inputTransport = inputTransportFactory_->getTransport(client);
147 			outputTransport = outputTransportFactory_->getTransport(client);
148 			if (!outputProtocolFactory_) {
149 				inputProtocol = inputProtocolFactory_->getProtocol(inputTransport,
150 										   outputTransport);
151 				outputProtocol = inputProtocol;
152 			} else {
153 				inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
154 				outputProtocol =
155 					outputProtocolFactory_->getProtocol(outputTransport);
156 			}
157 
158 			newlyConnectedClient(shared_ptr<TConnectedClient>(
159 				new TConnectedClient(
160 					getProcessor(inputProtocol, outputProtocol, client),
161 					inputProtocol, outputProtocol, eventHandler_, client),
162 				bind(&TServerFramework::disposeConnectedClient, this,
163 				     std::placeholders::_1)));
164 
165 		} catch (TTransportException &ttx) {
166 			releaseOneDescriptor("inputTransport", inputTransport);
167 			releaseOneDescriptor("outputTransport", outputTransport);
168 			releaseOneDescriptor("client", client);
169 			if (ttx.getType() == TTransportException::TIMED_OUT ||
170 			    ttx.getType() == TTransportException::CLIENT_DISCONNECT) {
171 				// Accept timeout and client disconnect - continue processing.
172 				continue;
173 			} else if (ttx.getType() == TTransportException::END_OF_FILE ||
174 				   ttx.getType() == TTransportException::INTERRUPTED) {
175 				// Server was interrupted.  This only happens when stopping.
176 				break;
177 			} else {
178 				// All other transport exceptions are logged.
179 				// State of connection is unknown.  Done.
180 				string errStr = string("TServerTransport died: ") + ttx.what();
181 				GlobalOutput(errStr.c_str());
182 				break;
183 			}
184 		}
185 	}
186 
187 	releaseOneDescriptor("serverTransport", serverTransport_);
188 }
189 
getConcurrentClientLimit() const190 int64_t TServerFramework::getConcurrentClientLimit() const
191 {
192 	// Synchronized sync(mon_);
193 	return limit_;
194 }
195 
getConcurrentClientCount() const196 int64_t TServerFramework::getConcurrentClientCount() const
197 {
198 	// Synchronized sync(mon_);
199 	return clients_;
200 }
201 
getConcurrentClientCountHWM() const202 int64_t TServerFramework::getConcurrentClientCountHWM() const
203 {
204 	// Synchronized sync(mon_);
205 	return hwm_;
206 }
207 
setConcurrentClientLimit(int64_t newLimit)208 void TServerFramework::setConcurrentClientLimit(int64_t newLimit)
209 {
210 	if (newLimit < 1) {
211 		throw std::invalid_argument("newLimit must be greater than zero");
212 	}
213 	// Synchronized sync(mon_);
214 	limit_ = newLimit;
215 	if (limit_ - clients_ > 0) {
216 		// mon_.notify();
217 	}
218 }
219 
stop()220 void TServerFramework::stop()
221 {
222 	// Order is important because serve() releases serverTransport_ when it is
223 	// interrupted, which closes the socket that interruptChildren uses.
224 	serverTransport_->interruptChildren();
225 	serverTransport_->interrupt();
226 }
227 
newlyConnectedClient(const shared_ptr<TConnectedClient> & pClient)228 void TServerFramework::newlyConnectedClient(const shared_ptr<TConnectedClient> &pClient)
229 {
230 	{
231 		// Synchronized sync(mon_);
232 		++clients_;
233 		hwm_ = (std::max)(hwm_, clients_);
234 	}
235 
236 	onClientConnected(pClient);
237 }
238 
239 #pragma GCC diagnostic push
240 #pragma GCC diagnostic ignored "-Wdelete-non-virtual-dtor"
disposeConnectedClient(TConnectedClient * pClient)241 void TServerFramework::disposeConnectedClient(TConnectedClient *pClient)
242 {
243 	onClientDisconnected(pClient);
244 	delete pClient;
245 
246 	// Synchronized sync(mon_);
247 	if (limit_ - --clients_ > 0) {
248 		// mon_.notify();
249 	}
250 }
251 #pragma GCC diagnostic pop
252 
253 } // namespace server
254 } // namespace thrift
255 } // namespace apache
256