1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
21 #define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1
22 
23 #include <thrift/Thrift.h>
24 #include <thrift/TConfiguration.h>
25 #include <thrift/transport/TTransportException.h>
26 #include <memory>
27 #include <string>
28 
29 namespace apache {
30 namespace thrift {
31 namespace transport {
32 
33 /**
34  * Helper template to hoist readAll implementation out of TTransport
35  */
36 template <class Transport_>
readAll(Transport_ & trans,uint8_t * buf,uint32_t len)37 uint32_t readAll(Transport_& trans, uint8_t* buf, uint32_t len) {
38   uint32_t have = 0;
39   uint32_t get = 0;
40 
41   while (have < len) {
42     get = trans.read(buf + have, len - have);
43     if (get <= 0) {
44       throw TTransportException(TTransportException::END_OF_FILE, "No more data to read.");
45     }
46     have += get;
47   }
48 
49   return have;
50 }
51 
52 /**
53  * Generic interface for a method of transporting data. A TTransport may be
54  * capable of either reading or writing, but not necessarily both.
55  *
56  */
57 class TTransport {
58 public:
59   TTransport(std::shared_ptr<TConfiguration> config = nullptr) {
60     if(config == nullptr) {
61       configuration_ = std::shared_ptr<TConfiguration> (new TConfiguration());
62     } else {
63       configuration_ = config;
64     }
65     resetConsumedMessageSize();
66   }
67 
68   /**
69    * Virtual deconstructor.
70    */
71   virtual ~TTransport() = default;
72 
73   /**
74    * Whether this transport is open.
75    */
isOpen()76   virtual bool isOpen() const { return false; }
77 
78   /**
79    * Tests whether there is more data to read or if the remote side is
80    * still open. By default this is true whenever the transport is open,
81    * but implementations should add logic to test for this condition where
82    * possible (i.e. on a socket).
83    * This is used by a server to check if it should listen for another
84    * request.
85    */
peek()86   virtual bool peek() { return isOpen(); }
87 
88   /**
89    * Opens the transport for communications.
90    *
91    * @return bool Whether the transport was successfully opened
92    * @throws TTransportException if opening failed
93    */
open()94   virtual void open() {
95     throw TTransportException(TTransportException::NOT_OPEN, "Cannot open base TTransport.");
96   }
97 
98   /**
99    * Closes the transport.
100    */
close()101   virtual void close() {
102     throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport.");
103   }
104 
105   /**
106    * Attempt to read up to the specified number of bytes into the string.
107    *
108    * @param buf  Reference to the location to write the data
109    * @param len  How many bytes to read
110    * @return How many bytes were actually read
111    * @throws TTransportException If an error occurs
112    */
read(uint8_t * buf,uint32_t len)113   uint32_t read(uint8_t* buf, uint32_t len) {
114     T_VIRTUAL_CALL();
115     return read_virt(buf, len);
116   }
read_virt(uint8_t *,uint32_t)117   virtual uint32_t read_virt(uint8_t* /* buf */, uint32_t /* len */) {
118     throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot read.");
119   }
120 
121   /**
122    * Reads the given amount of data in its entirety no matter what.
123    *
124    * @param s     Reference to location for read data
125    * @param len   How many bytes to read
126    * @return How many bytes read, which must be equal to size
127    * @throws TTransportException If insufficient data was read
128    */
readAll(uint8_t * buf,uint32_t len)129   uint32_t readAll(uint8_t* buf, uint32_t len) {
130     T_VIRTUAL_CALL();
131     return readAll_virt(buf, len);
132   }
readAll_virt(uint8_t * buf,uint32_t len)133   virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) {
134     return apache::thrift::transport::readAll(*this, buf, len);
135   }
136 
137   /**
138    * Called when read is completed.
139    * This can be over-ridden to perform a transport-specific action
140    * e.g. logging the request to a file
141    *
142    * @return number of bytes read if available, 0 otherwise.
143    */
readEnd()144   virtual uint32_t readEnd() {
145     // default behaviour is to do nothing
146     return 0;
147   }
148 
149   /**
150    * Writes the string in its entirety to the buffer.
151    *
152    * Note: You must call flush() to ensure the data is actually written,
153    * and available to be read back in the future.  Destroying a TTransport
154    * object does not automatically flush pending data--if you destroy a
155    * TTransport object with written but unflushed data, that data may be
156    * discarded.
157    *
158    * @param buf  The data to write out
159    * @throws TTransportException if an error occurs
160    */
write(const uint8_t * buf,uint32_t len)161   void write(const uint8_t* buf, uint32_t len) {
162     T_VIRTUAL_CALL();
163     write_virt(buf, len);
164   }
write_virt(const uint8_t *,uint32_t)165   virtual void write_virt(const uint8_t* /* buf */, uint32_t /* len */) {
166     throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot write.");
167   }
168 
169   /**
170    * Called when write is completed.
171    * This can be over-ridden to perform a transport-specific action
172    * at the end of a request.
173    *
174    * @return number of bytes written if available, 0 otherwise
175    */
writeEnd()176   virtual uint32_t writeEnd() {
177     // default behaviour is to do nothing
178     return 0;
179   }
180 
181   /**
182    * Flushes any pending data to be written. Typically used with buffered
183    * transport mechanisms.
184    *
185    * @throws TTransportException if an error occurs
186    */
flush()187   virtual void flush() {
188     // default behaviour is to do nothing
189   }
190 
191   /**
192    * Attempts to return a pointer to \c len bytes, possibly copied into \c buf.
193    * Does not consume the bytes read (i.e.: a later read will return the same
194    * data).  This method is meant to support protocols that need to read
195    * variable-length fields.  They can attempt to borrow the maximum amount of
196    * data that they will need, then consume (see next method) what they
197    * actually use.  Some transports will not support this method and others
198    * will fail occasionally, so protocols must be prepared to use read if
199    * borrow fails.
200    *
201    * @oaram buf  A buffer where the data can be stored if needed.
202    *             If borrow doesn't return buf, then the contents of
203    *             buf after the call are undefined.  This parameter may be
204    *             nullptr to indicate that the caller is not supplying storage,
205    *             but would like a pointer into an internal buffer, if
206    *             available.
207    * @param len  *len should initially contain the number of bytes to borrow.
208    *             If borrow succeeds, *len will contain the number of bytes
209    *             available in the returned pointer.  This will be at least
210    *             what was requested, but may be more if borrow returns
211    *             a pointer to an internal buffer, rather than buf.
212    *             If borrow fails, the contents of *len are undefined.
213    * @return If the borrow succeeds, return a pointer to the borrowed data.
214    *         This might be equal to \c buf, or it might be a pointer into
215    *         the transport's internal buffers.
216    * @throws TTransportException if an error occurs
217    */
borrow(uint8_t * buf,uint32_t * len)218   const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
219     T_VIRTUAL_CALL();
220     return borrow_virt(buf, len);
221   }
borrow_virt(uint8_t *,uint32_t *)222   virtual const uint8_t* borrow_virt(uint8_t* /* buf */, uint32_t* /* len */) { return nullptr; }
223 
224   /**
225    * Remove len bytes from the transport.  This should always follow a borrow
226    * of at least len bytes, and should always succeed.
227    * TODO(dreiss): Is there any transport that could borrow but fail to
228    * consume, or that would require a buffer to dump the consumed data?
229    *
230    * @param len  How many bytes to consume
231    * @throws TTransportException If an error occurs
232    */
consume(uint32_t len)233   void consume(uint32_t len) {
234     T_VIRTUAL_CALL();
235     consume_virt(len);
236   }
consume_virt(uint32_t)237   virtual void consume_virt(uint32_t /* len */) {
238     throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot consume.");
239   }
240 
241   /**
242    * Returns the origin of the transports call. The value depends on the
243    * transport used. An IP based transport for example will return the
244    * IP address of the client making the request.
245    * If the transport doesn't know the origin Unknown is returned.
246    *
247    * The returned value can be used in a log message for example
248    */
getOrigin()249   virtual const std::string getOrigin() const { return "Unknown"; }
250 
getConfiguration()251   std::shared_ptr<TConfiguration> getConfiguration() { return configuration_; }
252 
setConfiguration(std::shared_ptr<TConfiguration> config)253   void setConfiguration(std::shared_ptr<TConfiguration> config) {
254     if (config != nullptr) configuration_ = config;
255   }
256 
257   /**
258    * Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
259    * Will throw if we already consumed too many bytes or if the new size is larger than allowed.
260    *
261    * @param size  real message size
262    */
updateKnownMessageSize(long int size)263   void updateKnownMessageSize(long int size)
264   {
265     long int consumed = knownMessageSize_ - remainingMessageSize_;
266     resetConsumedMessageSize(size);
267     countConsumedMessageBytes(consumed);
268   }
269 
270   /**
271    * Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
272    *
273    * @param numBytes  numBytes bytes of data
274    */
checkReadBytesAvailable(long int numBytes)275   void checkReadBytesAvailable(long int numBytes)
276   {
277     if (remainingMessageSize_ < numBytes)
278       throw TTransportException(TTransportException::END_OF_FILE, "MaxMessageSize reached");
279   }
280 
281 protected:
282   std::shared_ptr<TConfiguration> configuration_;
283   long int remainingMessageSize_;
284   long int knownMessageSize_;
285 
getRemainingMessageSize()286   inline long int getRemainingMessageSize() { return remainingMessageSize_; }
setRemainingMessageSize(long int remainingMessageSize)287   inline void setRemainingMessageSize(long int remainingMessageSize) { remainingMessageSize_ = remainingMessageSize; }
getMaxMessageSize()288   inline int getMaxMessageSize() { return configuration_->getMaxMessageSize(); }
getKnownMessageSize()289   inline long int getKnownMessageSize() { return knownMessageSize_; }
setKnownMessageSize(long int knownMessageSize)290   void setKnownMessageSize(long int knownMessageSize) { knownMessageSize_ = knownMessageSize; }
291 
292   /**
293    * Resets RemainingMessageSize to the configured maximum
294    *
295    *  @param newSize  configured size
296    */
297   void resetConsumedMessageSize(long newSize = -1)
298   {
299     // full reset
300     if (newSize < 0)
301     {
302         knownMessageSize_ = getMaxMessageSize();
303         remainingMessageSize_ = getMaxMessageSize();
304         return;
305     }
306 
307     // update only: message size can shrink, but not grow
308     if (newSize > knownMessageSize_)
309         throw TTransportException(TTransportException::END_OF_FILE, "MaxMessageSize reached");
310 
311     knownMessageSize_ = newSize;
312     remainingMessageSize_ = newSize;
313   }
314 
315   /**
316    * Consumes numBytes from the RemainingMessageSize.
317    *
318    *  @param numBytes  Consumes numBytes
319    */
countConsumedMessageBytes(long int numBytes)320   void countConsumedMessageBytes(long int numBytes)
321   {
322     if (remainingMessageSize_ >= numBytes)
323     {
324       remainingMessageSize_ -= numBytes;
325     }
326     else
327     {
328       remainingMessageSize_ = 0;
329       throw TTransportException(TTransportException::END_OF_FILE, "MaxMessageSize reached");
330     }
331   }
332 };
333 
334 /**
335  * Generic factory class to make an input and output transport out of a
336  * source transport. Commonly used inside servers to make input and output
337  * streams out of raw clients.
338  *
339  */
340 class TTransportFactory {
341 public:
342   TTransportFactory() = default;
343 
344   virtual ~TTransportFactory() = default;
345 
346   /**
347    * Default implementation does nothing, just returns the transport given.
348    */
getTransport(std::shared_ptr<TTransport> trans)349   virtual std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) {
350     return trans;
351   }
352 };
353 }
354 }
355 } // apache::thrift::transport
356 
357 #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
358