1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <algorithm>
21 #include <functional>
22 #include <stdexcept>
23 #include <stdint.h>
24 #include <thrift/server/TServerFramework.h>
25 
26 namespace apache {
27 namespace thrift {
28 namespace server {
29 
30 using apache::thrift::concurrency::Synchronized;
31 using apache::thrift::protocol::TProtocol;
32 using apache::thrift::protocol::TProtocolFactory;
33 using std::bind;
34 using std::shared_ptr;
35 using apache::thrift::transport::TServerTransport;
36 using apache::thrift::transport::TTransport;
37 using apache::thrift::transport::TTransportException;
38 using apache::thrift::transport::TTransportFactory;
39 using std::string;
40 
TServerFramework(const shared_ptr<TProcessorFactory> & processorFactory,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & transportFactory,const shared_ptr<TProtocolFactory> & protocolFactory)41 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory>& processorFactory,
42                                    const shared_ptr<TServerTransport>& serverTransport,
43                                    const shared_ptr<TTransportFactory>& transportFactory,
44                                    const shared_ptr<TProtocolFactory>& protocolFactory)
45   : TServer(processorFactory, serverTransport, transportFactory, protocolFactory),
46     clients_(0),
47     hwm_(0),
48     limit_(INT64_MAX) {
49 }
50 
TServerFramework(const shared_ptr<TProcessor> & processor,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & transportFactory,const shared_ptr<TProtocolFactory> & protocolFactory)51 TServerFramework::TServerFramework(const shared_ptr<TProcessor>& processor,
52                                    const shared_ptr<TServerTransport>& serverTransport,
53                                    const shared_ptr<TTransportFactory>& transportFactory,
54                                    const shared_ptr<TProtocolFactory>& protocolFactory)
55   : TServer(processor, serverTransport, transportFactory, protocolFactory),
56     clients_(0),
57     hwm_(0),
58     limit_(INT64_MAX) {
59 }
60 
TServerFramework(const shared_ptr<TProcessorFactory> & processorFactory,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & inputTransportFactory,const shared_ptr<TTransportFactory> & outputTransportFactory,const shared_ptr<TProtocolFactory> & inputProtocolFactory,const shared_ptr<TProtocolFactory> & outputProtocolFactory)61 TServerFramework::TServerFramework(const shared_ptr<TProcessorFactory>& processorFactory,
62                                    const shared_ptr<TServerTransport>& serverTransport,
63                                    const shared_ptr<TTransportFactory>& inputTransportFactory,
64                                    const shared_ptr<TTransportFactory>& outputTransportFactory,
65                                    const shared_ptr<TProtocolFactory>& inputProtocolFactory,
66                                    const shared_ptr<TProtocolFactory>& outputProtocolFactory)
67   : TServer(processorFactory,
68             serverTransport,
69             inputTransportFactory,
70             outputTransportFactory,
71             inputProtocolFactory,
72             outputProtocolFactory),
73     clients_(0),
74     hwm_(0),
75     limit_(INT64_MAX) {
76 }
77 
TServerFramework(const shared_ptr<TProcessor> & processor,const shared_ptr<TServerTransport> & serverTransport,const shared_ptr<TTransportFactory> & inputTransportFactory,const shared_ptr<TTransportFactory> & outputTransportFactory,const shared_ptr<TProtocolFactory> & inputProtocolFactory,const shared_ptr<TProtocolFactory> & outputProtocolFactory)78 TServerFramework::TServerFramework(const shared_ptr<TProcessor>& processor,
79                                    const shared_ptr<TServerTransport>& serverTransport,
80                                    const shared_ptr<TTransportFactory>& inputTransportFactory,
81                                    const shared_ptr<TTransportFactory>& outputTransportFactory,
82                                    const shared_ptr<TProtocolFactory>& inputProtocolFactory,
83                                    const shared_ptr<TProtocolFactory>& outputProtocolFactory)
84   : TServer(processor,
85             serverTransport,
86             inputTransportFactory,
87             outputTransportFactory,
88             inputProtocolFactory,
89             outputProtocolFactory),
90     clients_(0),
91     hwm_(0),
92     limit_(INT64_MAX) {
93 }
94 
95 TServerFramework::~TServerFramework() = default;
96 
97 template <typename T>
releaseOneDescriptor(const string & name,T & pTransport)98 static void releaseOneDescriptor(const string& name, T& pTransport) {
99   if (pTransport) {
100     try {
101       pTransport->close();
102     } catch (const TTransportException& ttx) {
103       string errStr = string("TServerFramework " + name + " close failed: ") + ttx.what();
104       GlobalOutput(errStr.c_str());
105     }
106   }
107 }
108 
serve()109 void TServerFramework::serve() {
110   shared_ptr<TTransport> client;
111   shared_ptr<TTransport> inputTransport;
112   shared_ptr<TTransport> outputTransport;
113   shared_ptr<TProtocol> inputProtocol;
114   shared_ptr<TProtocol> outputProtocol;
115 
116   // Start the server listening
117   serverTransport_->listen();
118 
119   // Run the preServe event to indicate server is now listening
120   // and that it is safe to connect.
121   if (eventHandler_) {
122     eventHandler_->preServe();
123   }
124 
125   // Fetch client from server
126   for (;;) {
127     try {
128       // Dereference any resources from any previous client creation
129       // such that a blocking accept does not hold them indefinitely.
130       outputProtocol.reset();
131       inputProtocol.reset();
132       outputTransport.reset();
133       inputTransport.reset();
134       client.reset();
135 
136       // If we have reached the limit on the number of concurrent
137       // clients allowed, wait for one or more clients to drain before
138       // accepting another.
139       {
140         Synchronized sync(mon_);
141         while (clients_ >= limit_) {
142           mon_.wait();
143         }
144       }
145 
146       client = serverTransport_->accept();
147 
148       inputTransport = inputTransportFactory_->getTransport(client);
149       outputTransport = outputTransportFactory_->getTransport(client);
150       if (!outputProtocolFactory_) {
151         inputProtocol = inputProtocolFactory_->getProtocol(inputTransport, outputTransport);
152         outputProtocol = inputProtocol;
153       } else {
154         inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
155         outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
156       }
157 
158       newlyConnectedClient(shared_ptr<TConnectedClient>(
159           new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
160                                inputProtocol,
161                                outputProtocol,
162                                eventHandler_,
163                                client),
164           bind(&TServerFramework::disposeConnectedClient, this, std::placeholders::_1)));
165 
166     } catch (TTransportException& ttx) {
167       releaseOneDescriptor("inputTransport", inputTransport);
168       releaseOneDescriptor("outputTransport", outputTransport);
169       releaseOneDescriptor("client", client);
170       if (ttx.getType() == TTransportException::TIMED_OUT
171           || ttx.getType() == TTransportException::CLIENT_DISCONNECT) {
172         // Accept timeout and client disconnect - continue processing.
173         continue;
174       } else if (ttx.getType() == TTransportException::END_OF_FILE
175                  || ttx.getType() == TTransportException::INTERRUPTED) {
176         // Server was interrupted.  This only happens when stopping.
177         break;
178       } else {
179         // All other transport exceptions are logged.
180         // State of connection is unknown.  Done.
181         string errStr = string("TServerTransport died: ") + ttx.what();
182         GlobalOutput(errStr.c_str());
183         break;
184       }
185     }
186   }
187 
188   releaseOneDescriptor("serverTransport", serverTransport_);
189 }
190 
getConcurrentClientLimit() const191 int64_t TServerFramework::getConcurrentClientLimit() const {
192   Synchronized sync(mon_);
193   return limit_;
194 }
195 
getConcurrentClientCount() const196 int64_t TServerFramework::getConcurrentClientCount() const {
197   Synchronized sync(mon_);
198   return clients_;
199 }
200 
getConcurrentClientCountHWM() const201 int64_t TServerFramework::getConcurrentClientCountHWM() const {
202   Synchronized sync(mon_);
203   return hwm_;
204 }
205 
setConcurrentClientLimit(int64_t newLimit)206 void TServerFramework::setConcurrentClientLimit(int64_t newLimit) {
207   if (newLimit < 1) {
208     throw std::invalid_argument("newLimit must be greater than zero");
209   }
210   Synchronized sync(mon_);
211   limit_ = newLimit;
212   if (limit_ - clients_ > 0) {
213     mon_.notify();
214   }
215 }
216 
stop()217 void TServerFramework::stop() {
218   // Order is important because serve() releases serverTransport_ when it is
219   // interrupted, which closes the socket that interruptChildren uses.
220   serverTransport_->interruptChildren();
221   serverTransport_->interrupt();
222 }
223 
newlyConnectedClient(const shared_ptr<TConnectedClient> & pClient)224 void TServerFramework::newlyConnectedClient(const shared_ptr<TConnectedClient>& pClient) {
225   {
226     Synchronized sync(mon_);
227     ++clients_;
228     hwm_ = (std::max)(hwm_, clients_);
229   }
230 
231   onClientConnected(pClient);
232 }
233 
disposeConnectedClient(TConnectedClient * pClient)234 void TServerFramework::disposeConnectedClient(TConnectedClient* pClient) {
235   onClientDisconnected(pClient);
236   delete pClient;
237 
238   Synchronized sync(mon_);
239   if (limit_ - --clients_ > 0) {
240     mon_.notify();
241   }
242 }
243 
244 }
245 }
246 } // apache::thrift::server
247