1 /*
2 * Copyright (c) 2006- Facebook
3 *
4 * SPDX-License-Identifier: Apache-2.0
5 */
6 /*
7 * Licensed to the Apache Software Foundation (ASF) under one
8 * or more contributor license agreements. See the NOTICE file
9 * distributed with this work for additional information
10 * regarding copyright ownership. The ASF licenses this file
11 * to you under the Apache License, Version 2.0 (the
12 * "License"); you may not use this file except in compliance
13 * with the License. You may obtain a copy of the License at
14 *
15 * http://www.apache.org/licenses/LICENSE-2.0
16 *
17 * Unless required by applicable law or agreed to in writing,
18 * software distributed under the License is distributed on an
19 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
20 * KIND, either express or implied. See the License for the
21 * specific language governing permissions and limitations
22 * under the License.
23 */
24
25 #include <algorithm>
26 #include <functional>
27 #include <stdexcept>
28 #include <stdint.h>
29 #include <thrift/server/TServerFramework.h>
30
31 namespace apache
32 {
33 namespace thrift
34 {
35 namespace server
36 {
37
38 // using apache::thrift::concurrency::Synchronized;
39 using apache::thrift::protocol::TProtocol;
40 using apache::thrift::protocol::TProtocolFactory;
41 using apache::thrift::transport::TServerTransport;
42 using apache::thrift::transport::TTransport;
43 using apache::thrift::transport::TTransportException;
44 using apache::thrift::transport::TTransportFactory;
45 using std::bind;
46 using std::shared_ptr;
47 using std::string;
48
TServerFramework(const shared_ptr<TProcessorFactory> & processorFactory,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & transportFactory,const shared_ptr<TProtocolFactory> & protocolFactory)49 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory> &processorFactory,
50 const shared_ptr<TServerTransport> &serverTransport,
51 const shared_ptr<TTransportFactory> &transportFactory,
52 const shared_ptr<TProtocolFactory> &protocolFactory)
53 : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
54 clients_(0), hwm_(0), limit_(INT64_MAX)
55 {
56 }
57
TServerFramework(const shared_ptr<TProcessor> & processor,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & transportFactory,const shared_ptr<TProtocolFactory> & protocolFactory)58 TServerFramework::TServerFramework(const shared_ptr<TProcessor> &processor,
59 const shared_ptr<TServerTransport> &serverTransport,
60 const shared_ptr<TTransportFactory> &transportFactory,
61 const shared_ptr<TProtocolFactory> &protocolFactory)
62 : TServer(processor, serverTransport, transportFactory, protocolFactory), clients_(0),
63 hwm_(0), limit_(INT64_MAX)
64 {
65 }
66
TServerFramework(const shared_ptr<TProcessorFactory> & processorFactory,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & inputTransportFactory,const shared_ptr<TTransportFactory> & outputTransportFactory,const shared_ptr<TProtocolFactory> & inputProtocolFactory,const shared_ptr<TProtocolFactory> & outputProtocolFactory)67 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory> &processorFactory,
68 const shared_ptr<TServerTransport> &serverTransport,
69 const shared_ptr<TTransportFactory> &inputTransportFactory,
70 const shared_ptr<TTransportFactory> &outputTransportFactory,
71 const shared_ptr<TProtocolFactory> &inputProtocolFactory,
72 const shared_ptr<TProtocolFactory> &outputProtocolFactory)
73 : TServer(processorFactory, serverTransport, inputTransportFactory, outputTransportFactory,
74 inputProtocolFactory, outputProtocolFactory),
75 clients_(0), hwm_(0), limit_(INT64_MAX)
76 {
77 }
78
TServerFramework(const shared_ptr<TProcessor> & processor,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & inputTransportFactory,const shared_ptr<TTransportFactory> & outputTransportFactory,const shared_ptr<TProtocolFactory> & inputProtocolFactory,const shared_ptr<TProtocolFactory> & outputProtocolFactory)79 TServerFramework::TServerFramework(const shared_ptr<TProcessor> &processor,
80 const shared_ptr<TServerTransport> &serverTransport,
81 const shared_ptr<TTransportFactory> &inputTransportFactory,
82 const shared_ptr<TTransportFactory> &outputTransportFactory,
83 const shared_ptr<TProtocolFactory> &inputProtocolFactory,
84 const shared_ptr<TProtocolFactory> &outputProtocolFactory)
85 : TServer(processor, serverTransport, inputTransportFactory, outputTransportFactory,
86 inputProtocolFactory, outputProtocolFactory),
87 clients_(0), hwm_(0), limit_(INT64_MAX)
88 {
89 }
90
91 TServerFramework::~TServerFramework() = default;
92
releaseOneDescriptor(const string & name,T & pTransport)93 template <typename T> static void releaseOneDescriptor(const string &name, T &pTransport)
94 {
95 if (pTransport) {
96 try {
97 pTransport->close();
98 } catch (const TTransportException &ttx) {
99 string errStr =
100 string("TServerFramework " + name + " close failed: ") + ttx.what();
101 GlobalOutput(errStr.c_str());
102 }
103 }
104 }
105
serve()106 void TServerFramework::serve()
107 {
108 shared_ptr<TTransport> client;
109 shared_ptr<TTransport> inputTransport;
110 shared_ptr<TTransport> outputTransport;
111 shared_ptr<TProtocol> inputProtocol;
112 shared_ptr<TProtocol> outputProtocol;
113
114 // Start the server listening
115 serverTransport_->listen();
116
117 // Run the preServe event to indicate server is now listening
118 // and that it is safe to connect.
119 if (eventHandler_) {
120 eventHandler_->preServe();
121 }
122
123 // Fetch client from server
124 for (;;) {
125 try {
126 // Dereference any resources from any previous client creation
127 // such that a blocking accept does not hold them indefinitely.
128 outputProtocol.reset();
129 inputProtocol.reset();
130 outputTransport.reset();
131 inputTransport.reset();
132 client.reset();
133
134 // If we have reached the limit on the number of concurrent
135 // clients allowed, wait for one or more clients to drain before
136 // accepting another.
137 {
138 // Synchronized sync(mon_);
139 while (clients_ >= limit_) {
140 // mon_.wait();
141 }
142 }
143
144 client = serverTransport_->accept();
145
146 inputTransport = inputTransportFactory_->getTransport(client);
147 outputTransport = outputTransportFactory_->getTransport(client);
148 if (!outputProtocolFactory_) {
149 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport,
150 outputTransport);
151 outputProtocol = inputProtocol;
152 } else {
153 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
154 outputProtocol =
155 outputProtocolFactory_->getProtocol(outputTransport);
156 }
157
158 newlyConnectedClient(shared_ptr<TConnectedClient>(
159 new TConnectedClient(
160 getProcessor(inputProtocol, outputProtocol, client),
161 inputProtocol, outputProtocol, eventHandler_, client),
162 bind(&TServerFramework::disposeConnectedClient, this,
163 std::placeholders::_1)));
164
165 } catch (TTransportException &ttx) {
166 releaseOneDescriptor("inputTransport", inputTransport);
167 releaseOneDescriptor("outputTransport", outputTransport);
168 releaseOneDescriptor("client", client);
169 if (ttx.getType() == TTransportException::TIMED_OUT ||
170 ttx.getType() == TTransportException::CLIENT_DISCONNECT) {
171 // Accept timeout and client disconnect - continue processing.
172 continue;
173 } else if (ttx.getType() == TTransportException::END_OF_FILE ||
174 ttx.getType() == TTransportException::INTERRUPTED) {
175 // Server was interrupted. This only happens when stopping.
176 break;
177 } else {
178 // All other transport exceptions are logged.
179 // State of connection is unknown. Done.
180 string errStr = string("TServerTransport died: ") + ttx.what();
181 GlobalOutput(errStr.c_str());
182 break;
183 }
184 }
185 }
186
187 releaseOneDescriptor("serverTransport", serverTransport_);
188 }
189
getConcurrentClientLimit() const190 int64_t TServerFramework::getConcurrentClientLimit() const
191 {
192 // Synchronized sync(mon_);
193 return limit_;
194 }
195
getConcurrentClientCount() const196 int64_t TServerFramework::getConcurrentClientCount() const
197 {
198 // Synchronized sync(mon_);
199 return clients_;
200 }
201
getConcurrentClientCountHWM() const202 int64_t TServerFramework::getConcurrentClientCountHWM() const
203 {
204 // Synchronized sync(mon_);
205 return hwm_;
206 }
207
setConcurrentClientLimit(int64_t newLimit)208 void TServerFramework::setConcurrentClientLimit(int64_t newLimit)
209 {
210 if (newLimit < 1) {
211 throw std::invalid_argument("newLimit must be greater than zero");
212 }
213 // Synchronized sync(mon_);
214 limit_ = newLimit;
215 if (limit_ - clients_ > 0) {
216 // mon_.notify();
217 }
218 }
219
stop()220 void TServerFramework::stop()
221 {
222 // Order is important because serve() releases serverTransport_ when it is
223 // interrupted, which closes the socket that interruptChildren uses.
224 serverTransport_->interruptChildren();
225 serverTransport_->interrupt();
226 }
227
newlyConnectedClient(const shared_ptr<TConnectedClient> & pClient)228 void TServerFramework::newlyConnectedClient(const shared_ptr<TConnectedClient> &pClient)
229 {
230 {
231 // Synchronized sync(mon_);
232 ++clients_;
233 hwm_ = (std::max)(hwm_, clients_);
234 }
235
236 onClientConnected(pClient);
237 }
238
239 #pragma GCC diagnostic push
240 #pragma GCC diagnostic ignored "-Wdelete-non-virtual-dtor"
disposeConnectedClient(TConnectedClient * pClient)241 void TServerFramework::disposeConnectedClient(TConnectedClient *pClient)
242 {
243 onClientDisconnected(pClient);
244 delete pClient;
245
246 // Synchronized sync(mon_);
247 if (limit_ - --clients_ > 0) {
248 // mon_.notify();
249 }
250 }
251 #pragma GCC diagnostic pop
252
253 } // namespace server
254 } // namespace thrift
255 } // namespace apache
256