1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <algorithm>
21 #include <functional>
22 #include <stdexcept>
23 #include <stdint.h>
24 #include <thrift/server/TServerFramework.h>
25
26 namespace apache {
27 namespace thrift {
28 namespace server {
29
30 using apache::thrift::concurrency::Synchronized;
31 using apache::thrift::protocol::TProtocol;
32 using apache::thrift::protocol::TProtocolFactory;
33 using std::bind;
34 using std::shared_ptr;
35 using apache::thrift::transport::TServerTransport;
36 using apache::thrift::transport::TTransport;
37 using apache::thrift::transport::TTransportException;
38 using apache::thrift::transport::TTransportFactory;
39 using std::string;
40
TServerFramework(const shared_ptr<TProcessorFactory> & processorFactory,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & transportFactory,const shared_ptr<TProtocolFactory> & protocolFactory)41 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory>& processorFactory,
42 const shared_ptr<TServerTransport>& serverTransport,
43 const shared_ptr<TTransportFactory>& transportFactory,
44 const shared_ptr<TProtocolFactory>& protocolFactory)
45 : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
46 clients_(0),
47 hwm_(0),
48 limit_(INT64_MAX) {
49 }
50
TServerFramework(const shared_ptr<TProcessor> & processor,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & transportFactory,const shared_ptr<TProtocolFactory> & protocolFactory)51 TServerFramework::TServerFramework(const shared_ptr<TProcessor>& processor,
52 const shared_ptr<TServerTransport>& serverTransport,
53 const shared_ptr<TTransportFactory>& transportFactory,
54 const shared_ptr<TProtocolFactory>& protocolFactory)
55 : TServer(processor, serverTransport, transportFactory, protocolFactory),
56 clients_(0),
57 hwm_(0),
58 limit_(INT64_MAX) {
59 }
60
TServerFramework(const shared_ptr<TProcessorFactory> & processorFactory,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & inputTransportFactory,const shared_ptr<TTransportFactory> & outputTransportFactory,const shared_ptr<TProtocolFactory> & inputProtocolFactory,const shared_ptr<TProtocolFactory> & outputProtocolFactory)61 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory>& processorFactory,
62 const shared_ptr<TServerTransport>& serverTransport,
63 const shared_ptr<TTransportFactory>& inputTransportFactory,
64 const shared_ptr<TTransportFactory>& outputTransportFactory,
65 const shared_ptr<TProtocolFactory>& inputProtocolFactory,
66 const shared_ptr<TProtocolFactory>& outputProtocolFactory)
67 : TServer(processorFactory,
68 serverTransport,
69 inputTransportFactory,
70 outputTransportFactory,
71 inputProtocolFactory,
72 outputProtocolFactory),
73 clients_(0),
74 hwm_(0),
75 limit_(INT64_MAX) {
76 }
77
TServerFramework(const shared_ptr<TProcessor> & processor,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & inputTransportFactory,const shared_ptr<TTransportFactory> & outputTransportFactory,const shared_ptr<TProtocolFactory> & inputProtocolFactory,const shared_ptr<TProtocolFactory> & outputProtocolFactory)78 TServerFramework::TServerFramework(const shared_ptr<TProcessor>& processor,
79 const shared_ptr<TServerTransport>& serverTransport,
80 const shared_ptr<TTransportFactory>& inputTransportFactory,
81 const shared_ptr<TTransportFactory>& outputTransportFactory,
82 const shared_ptr<TProtocolFactory>& inputProtocolFactory,
83 const shared_ptr<TProtocolFactory>& outputProtocolFactory)
84 : TServer(processor,
85 serverTransport,
86 inputTransportFactory,
87 outputTransportFactory,
88 inputProtocolFactory,
89 outputProtocolFactory),
90 clients_(0),
91 hwm_(0),
92 limit_(INT64_MAX) {
93 }
94
95 TServerFramework::~TServerFramework() = default;
96
97 template <typename T>
releaseOneDescriptor(const string & name,T & pTransport)98 static void releaseOneDescriptor(const string& name, T& pTransport) {
99 if (pTransport) {
100 try {
101 pTransport->close();
102 } catch (const TTransportException& ttx) {
103 string errStr = string("TServerFramework " + name + " close failed: ") + ttx.what();
104 GlobalOutput(errStr.c_str());
105 }
106 }
107 }
108
serve()109 void TServerFramework::serve() {
110 shared_ptr<TTransport> client;
111 shared_ptr<TTransport> inputTransport;
112 shared_ptr<TTransport> outputTransport;
113 shared_ptr<TProtocol> inputProtocol;
114 shared_ptr<TProtocol> outputProtocol;
115
116 // Start the server listening
117 serverTransport_->listen();
118
119 // Run the preServe event to indicate server is now listening
120 // and that it is safe to connect.
121 if (eventHandler_) {
122 eventHandler_->preServe();
123 }
124
125 // Fetch client from server
126 for (;;) {
127 try {
128 // Dereference any resources from any previous client creation
129 // such that a blocking accept does not hold them indefinitely.
130 outputProtocol.reset();
131 inputProtocol.reset();
132 outputTransport.reset();
133 inputTransport.reset();
134 client.reset();
135
136 // If we have reached the limit on the number of concurrent
137 // clients allowed, wait for one or more clients to drain before
138 // accepting another.
139 {
140 Synchronized sync(mon_);
141 while (clients_ >= limit_) {
142 mon_.wait();
143 }
144 }
145
146 client = serverTransport_->accept();
147
148 inputTransport = inputTransportFactory_->getTransport(client);
149 outputTransport = outputTransportFactory_->getTransport(client);
150 if (!outputProtocolFactory_) {
151 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport, outputTransport);
152 outputProtocol = inputProtocol;
153 } else {
154 inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
155 outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
156 }
157
158 newlyConnectedClient(shared_ptr<TConnectedClient>(
159 new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
160 inputProtocol,
161 outputProtocol,
162 eventHandler_,
163 client),
164 bind(&TServerFramework::disposeConnectedClient, this, std::placeholders::_1)));
165
166 } catch (TTransportException& ttx) {
167 releaseOneDescriptor("inputTransport", inputTransport);
168 releaseOneDescriptor("outputTransport", outputTransport);
169 releaseOneDescriptor("client", client);
170 if (ttx.getType() == TTransportException::TIMED_OUT
171 || ttx.getType() == TTransportException::CLIENT_DISCONNECT) {
172 // Accept timeout and client disconnect - continue processing.
173 continue;
174 } else if (ttx.getType() == TTransportException::END_OF_FILE
175 || ttx.getType() == TTransportException::INTERRUPTED) {
176 // Server was interrupted. This only happens when stopping.
177 break;
178 } else {
179 // All other transport exceptions are logged.
180 // State of connection is unknown. Done.
181 string errStr = string("TServerTransport died: ") + ttx.what();
182 GlobalOutput(errStr.c_str());
183 break;
184 }
185 }
186 }
187
188 releaseOneDescriptor("serverTransport", serverTransport_);
189 }
190
getConcurrentClientLimit() const191 int64_t TServerFramework::getConcurrentClientLimit() const {
192 Synchronized sync(mon_);
193 return limit_;
194 }
195
getConcurrentClientCount() const196 int64_t TServerFramework::getConcurrentClientCount() const {
197 Synchronized sync(mon_);
198 return clients_;
199 }
200
getConcurrentClientCountHWM() const201 int64_t TServerFramework::getConcurrentClientCountHWM() const {
202 Synchronized sync(mon_);
203 return hwm_;
204 }
205
setConcurrentClientLimit(int64_t newLimit)206 void TServerFramework::setConcurrentClientLimit(int64_t newLimit) {
207 if (newLimit < 1) {
208 throw std::invalid_argument("newLimit must be greater than zero");
209 }
210 Synchronized sync(mon_);
211 limit_ = newLimit;
212 if (limit_ - clients_ > 0) {
213 mon_.notify();
214 }
215 }
216
stop()217 void TServerFramework::stop() {
218 // Order is important because serve() releases serverTransport_ when it is
219 // interrupted, which closes the socket that interruptChildren uses.
220 serverTransport_->interruptChildren();
221 serverTransport_->interrupt();
222 }
223
newlyConnectedClient(const shared_ptr<TConnectedClient> & pClient)224 void TServerFramework::newlyConnectedClient(const shared_ptr<TConnectedClient>& pClient) {
225 {
226 Synchronized sync(mon_);
227 ++clients_;
228 hwm_ = (std::max)(hwm_, clients_);
229 }
230
231 onClientConnected(pClient);
232 }
233
disposeConnectedClient(TConnectedClient * pClient)234 void TServerFramework::disposeConnectedClient(TConnectedClient* pClient) {
235 onClientDisconnected(pClient);
236 delete pClient;
237
238 Synchronized sync(mon_);
239 if (limit_ - --clients_ > 0) {
240 mon_.notify();
241 }
242 }
243
244 }
245 }
246 } // apache::thrift::server
247