1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_
21 #define _THRIFT_TRANSPORT_TSERVERSOCKET_H_ 1
22 
23 #include <functional>
24 
25 #include <thrift/concurrency/Mutex.h>
26 #include <thrift/transport/PlatformSocket.h>
27 #include <thrift/transport/TServerTransport.h>
28 
29 #include <sys/types.h>
30 #ifdef HAVE_SYS_SOCKET_H
31 #include <sys/socket.h>
32 #endif
33 #ifdef HAVE_NETDB_H
34 #include <netdb.h>
35 #endif
36 
37 namespace apache {
38 namespace thrift {
39 namespace transport {
40 
41 class TSocket;
42 
43 /**
44  * Server socket implementation of TServerTransport. Wrapper around a unix
45  * socket listen and accept calls.
46  *
47  */
48 class TServerSocket : public TServerTransport {
49 public:
50   typedef std::function<void(THRIFT_SOCKET fd)> socket_func_t;
51 
52   const static int DEFAULT_BACKLOG = 1024;
53 
54   /**
55    * Constructor.
56    *
57    * @param port    Port number to bind to
58    */
59   TServerSocket(int port);
60 
61   /**
62    * Constructor.
63    *
64    * @param port        Port number to bind to
65    * @param sendTimeout Socket send timeout
66    * @param recvTimeout Socket receive timeout
67    */
68   TServerSocket(int port, int sendTimeout, int recvTimeout);
69 
70   /**
71    * Constructor.
72    *
73    * @param address Address to bind to
74    * @param port    Port number to bind to
75    */
76   TServerSocket(const std::string& address, int port);
77 
78   /**
79    * Constructor used for unix sockets.
80    *
81    * @param path Pathname for unix socket.
82    */
83   TServerSocket(const std::string& path);
84 
85   ~TServerSocket() override;
86 
87 
88   bool isOpen() const override;
89 
90   void setSendTimeout(int sendTimeout);
91   void setRecvTimeout(int recvTimeout);
92 
93   void setAcceptTimeout(int accTimeout);
94   void setAcceptBacklog(int accBacklog);
95 
96   void setRetryLimit(int retryLimit);
97   void setRetryDelay(int retryDelay);
98 
setKeepAlive(bool keepAlive)99   void setKeepAlive(bool keepAlive) { keepAlive_ = keepAlive; }
100 
101   void setTcpSendBuffer(int tcpSendBuffer);
102   void setTcpRecvBuffer(int tcpRecvBuffer);
103 
104   // listenCallback gets called just before listen, and after all Thrift
105   // setsockopt calls have been made.  If you have custom setsockopt
106   // things that need to happen on the listening socket, this is the place to do it.
setListenCallback(const socket_func_t & listenCallback)107   void setListenCallback(const socket_func_t& listenCallback) { listenCallback_ = listenCallback; }
108 
109   // acceptCallback gets called after each accept call, on the newly created socket.
110   // It is called after all Thrift setsockopt calls have been made.  If you have
111   // custom setsockopt things that need to happen on the accepted
112   // socket, this is the place to do it.
setAcceptCallback(const socket_func_t & acceptCallback)113   void setAcceptCallback(const socket_func_t& acceptCallback) { acceptCallback_ = acceptCallback; }
114 
115   // When enabled (the default), new children TSockets will be constructed so
116   // they can be interrupted by TServerTransport::interruptChildren().
117   // This is more expensive in terms of system calls (poll + recv) however
118   // ensures a connected client cannot interfere with TServer::stop().
119   //
120   // When disabled, TSocket children do not incur an additional poll() call.
121   // Server-side reads are more efficient, however a client can interfere with
122   // the server's ability to shutdown properly by staying connected.
123   //
124   // Must be called before listen(); mode cannot be switched after that.
125   // \throws std::logic_error if listen() has been called
126   void setInterruptableChildren(bool enable);
127 
getSocketFD()128   THRIFT_SOCKET getSocketFD() override { return serverSocket_; }
129 
130   int getPort() const;
131 
132   std::string getPath() const;
133 
134   bool isUnixDomainSocket() const;
135 
136   void listen() override;
137   void interrupt() override;
138   void interruptChildren() override;
139   void close() override;
140 
141 protected:
142   std::shared_ptr<TTransport> acceptImpl() override;
143   virtual std::shared_ptr<TSocket> createSocket(THRIFT_SOCKET client);
144   bool interruptableChildren_;
145   std::shared_ptr<THRIFT_SOCKET> pChildInterruptSockReader_; // if interruptableChildren_ this is shared with child TSockets
146 
147 private:
148   void notify(THRIFT_SOCKET notifySock);
149   void _setup_sockopts();
150   void _setup_unixdomain_sockopts();
151   void _setup_tcp_sockopts();
152 
153   int port_;
154   std::string address_;
155   std::string path_;
156   THRIFT_SOCKET serverSocket_;
157   int acceptBacklog_;
158   int sendTimeout_;
159   int recvTimeout_;
160   int accTimeout_;
161   int retryLimit_;
162   int retryDelay_;
163   int tcpSendBuffer_;
164   int tcpRecvBuffer_;
165   bool keepAlive_;
166   bool listening_;
167 
168   concurrency::Mutex rwMutex_;                                 // thread-safe interrupt
169   THRIFT_SOCKET interruptSockWriter_;                          // is notified on interrupt()
170   THRIFT_SOCKET interruptSockReader_;                          // is used in select/poll with serverSocket_ for interruptability
171   THRIFT_SOCKET childInterruptSockWriter_;                     // is notified on interruptChildren()
172 
173   socket_func_t listenCallback_;
174   socket_func_t acceptCallback_;
175 };
176 }
177 }
178 } // apache::thrift::transport
179 
180 #endif // #ifndef _THRIFT_TRANSPORT_TSERVERSOCKET_H_
181