1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
21 #define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1
22
23 #include <thrift/Thrift.h>
24 #include <thrift/TConfiguration.h>
25 #include <thrift/transport/TTransportException.h>
26 #include <memory>
27 #include <string>
28
29 namespace apache {
30 namespace thrift {
31 namespace transport {
32
33 /**
34 * Helper template to hoist readAll implementation out of TTransport
35 */
36 template <class Transport_>
readAll(Transport_ & trans,uint8_t * buf,uint32_t len)37 uint32_t readAll(Transport_& trans, uint8_t* buf, uint32_t len) {
38 uint32_t have = 0;
39 uint32_t get = 0;
40
41 while (have < len) {
42 get = trans.read(buf + have, len - have);
43 if (get <= 0) {
44 throw TTransportException(TTransportException::END_OF_FILE, "No more data to read.");
45 }
46 have += get;
47 }
48
49 return have;
50 }
51
52 /**
53 * Generic interface for a method of transporting data. A TTransport may be
54 * capable of either reading or writing, but not necessarily both.
55 *
56 */
57 class TTransport {
58 public:
59 TTransport(std::shared_ptr<TConfiguration> config = nullptr) {
60 if(config == nullptr) {
61 configuration_ = std::shared_ptr<TConfiguration> (new TConfiguration());
62 } else {
63 configuration_ = config;
64 }
65 resetConsumedMessageSize();
66 }
67
68 /**
69 * Virtual deconstructor.
70 */
71 virtual ~TTransport() = default;
72
73 /**
74 * Whether this transport is open.
75 */
isOpen()76 virtual bool isOpen() const { return false; }
77
78 /**
79 * Tests whether there is more data to read or if the remote side is
80 * still open. By default this is true whenever the transport is open,
81 * but implementations should add logic to test for this condition where
82 * possible (i.e. on a socket).
83 * This is used by a server to check if it should listen for another
84 * request.
85 */
peek()86 virtual bool peek() { return isOpen(); }
87
88 /**
89 * Opens the transport for communications.
90 *
91 * @return bool Whether the transport was successfully opened
92 * @throws TTransportException if opening failed
93 */
open()94 virtual void open() {
95 throw TTransportException(TTransportException::NOT_OPEN, "Cannot open base TTransport.");
96 }
97
98 /**
99 * Closes the transport.
100 */
close()101 virtual void close() {
102 throw TTransportException(TTransportException::NOT_OPEN, "Cannot close base TTransport.");
103 }
104
105 /**
106 * Attempt to read up to the specified number of bytes into the string.
107 *
108 * @param buf Reference to the location to write the data
109 * @param len How many bytes to read
110 * @return How many bytes were actually read
111 * @throws TTransportException If an error occurs
112 */
read(uint8_t * buf,uint32_t len)113 uint32_t read(uint8_t* buf, uint32_t len) {
114 T_VIRTUAL_CALL();
115 return read_virt(buf, len);
116 }
read_virt(uint8_t *,uint32_t)117 virtual uint32_t read_virt(uint8_t* /* buf */, uint32_t /* len */) {
118 throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot read.");
119 }
120
121 /**
122 * Reads the given amount of data in its entirety no matter what.
123 *
124 * @param s Reference to location for read data
125 * @param len How many bytes to read
126 * @return How many bytes read, which must be equal to size
127 * @throws TTransportException If insufficient data was read
128 */
readAll(uint8_t * buf,uint32_t len)129 uint32_t readAll(uint8_t* buf, uint32_t len) {
130 T_VIRTUAL_CALL();
131 return readAll_virt(buf, len);
132 }
readAll_virt(uint8_t * buf,uint32_t len)133 virtual uint32_t readAll_virt(uint8_t* buf, uint32_t len) {
134 return apache::thrift::transport::readAll(*this, buf, len);
135 }
136
137 /**
138 * Called when read is completed.
139 * This can be over-ridden to perform a transport-specific action
140 * e.g. logging the request to a file
141 *
142 * @return number of bytes read if available, 0 otherwise.
143 */
readEnd()144 virtual uint32_t readEnd() {
145 // default behaviour is to do nothing
146 return 0;
147 }
148
149 /**
150 * Writes the string in its entirety to the buffer.
151 *
152 * Note: You must call flush() to ensure the data is actually written,
153 * and available to be read back in the future. Destroying a TTransport
154 * object does not automatically flush pending data--if you destroy a
155 * TTransport object with written but unflushed data, that data may be
156 * discarded.
157 *
158 * @param buf The data to write out
159 * @throws TTransportException if an error occurs
160 */
write(const uint8_t * buf,uint32_t len)161 void write(const uint8_t* buf, uint32_t len) {
162 T_VIRTUAL_CALL();
163 write_virt(buf, len);
164 }
write_virt(const uint8_t *,uint32_t)165 virtual void write_virt(const uint8_t* /* buf */, uint32_t /* len */) {
166 throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot write.");
167 }
168
169 /**
170 * Called when write is completed.
171 * This can be over-ridden to perform a transport-specific action
172 * at the end of a request.
173 *
174 * @return number of bytes written if available, 0 otherwise
175 */
writeEnd()176 virtual uint32_t writeEnd() {
177 // default behaviour is to do nothing
178 return 0;
179 }
180
181 /**
182 * Flushes any pending data to be written. Typically used with buffered
183 * transport mechanisms.
184 *
185 * @throws TTransportException if an error occurs
186 */
flush()187 virtual void flush() {
188 // default behaviour is to do nothing
189 }
190
191 /**
192 * Attempts to return a pointer to \c len bytes, possibly copied into \c buf.
193 * Does not consume the bytes read (i.e.: a later read will return the same
194 * data). This method is meant to support protocols that need to read
195 * variable-length fields. They can attempt to borrow the maximum amount of
196 * data that they will need, then consume (see next method) what they
197 * actually use. Some transports will not support this method and others
198 * will fail occasionally, so protocols must be prepared to use read if
199 * borrow fails.
200 *
201 * @oaram buf A buffer where the data can be stored if needed.
202 * If borrow doesn't return buf, then the contents of
203 * buf after the call are undefined. This parameter may be
204 * nullptr to indicate that the caller is not supplying storage,
205 * but would like a pointer into an internal buffer, if
206 * available.
207 * @param len *len should initially contain the number of bytes to borrow.
208 * If borrow succeeds, *len will contain the number of bytes
209 * available in the returned pointer. This will be at least
210 * what was requested, but may be more if borrow returns
211 * a pointer to an internal buffer, rather than buf.
212 * If borrow fails, the contents of *len are undefined.
213 * @return If the borrow succeeds, return a pointer to the borrowed data.
214 * This might be equal to \c buf, or it might be a pointer into
215 * the transport's internal buffers.
216 * @throws TTransportException if an error occurs
217 */
borrow(uint8_t * buf,uint32_t * len)218 const uint8_t* borrow(uint8_t* buf, uint32_t* len) {
219 T_VIRTUAL_CALL();
220 return borrow_virt(buf, len);
221 }
borrow_virt(uint8_t *,uint32_t *)222 virtual const uint8_t* borrow_virt(uint8_t* /* buf */, uint32_t* /* len */) { return nullptr; }
223
224 /**
225 * Remove len bytes from the transport. This should always follow a borrow
226 * of at least len bytes, and should always succeed.
227 * TODO(dreiss): Is there any transport that could borrow but fail to
228 * consume, or that would require a buffer to dump the consumed data?
229 *
230 * @param len How many bytes to consume
231 * @throws TTransportException If an error occurs
232 */
consume(uint32_t len)233 void consume(uint32_t len) {
234 T_VIRTUAL_CALL();
235 consume_virt(len);
236 }
consume_virt(uint32_t)237 virtual void consume_virt(uint32_t /* len */) {
238 throw TTransportException(TTransportException::NOT_OPEN, "Base TTransport cannot consume.");
239 }
240
241 /**
242 * Returns the origin of the transports call. The value depends on the
243 * transport used. An IP based transport for example will return the
244 * IP address of the client making the request.
245 * If the transport doesn't know the origin Unknown is returned.
246 *
247 * The returned value can be used in a log message for example
248 */
getOrigin()249 virtual const std::string getOrigin() const { return "Unknown"; }
250
getConfiguration()251 std::shared_ptr<TConfiguration> getConfiguration() { return configuration_; }
252
setConfiguration(std::shared_ptr<TConfiguration> config)253 void setConfiguration(std::shared_ptr<TConfiguration> config) {
254 if (config != nullptr) configuration_ = config;
255 }
256
257 /**
258 * Updates RemainingMessageSize to reflect then known real message size (e.g. framed transport).
259 * Will throw if we already consumed too many bytes or if the new size is larger than allowed.
260 *
261 * @param size real message size
262 */
updateKnownMessageSize(long int size)263 void updateKnownMessageSize(long int size)
264 {
265 long int consumed = knownMessageSize_ - remainingMessageSize_;
266 resetConsumedMessageSize(size);
267 countConsumedMessageBytes(consumed);
268 }
269
270 /**
271 * Throws if there are not enough bytes in the input stream to satisfy a read of numBytes bytes of data
272 *
273 * @param numBytes numBytes bytes of data
274 */
checkReadBytesAvailable(long int numBytes)275 void checkReadBytesAvailable(long int numBytes)
276 {
277 if (remainingMessageSize_ < numBytes)
278 throw TTransportException(TTransportException::END_OF_FILE, "MaxMessageSize reached");
279 }
280
281 protected:
282 std::shared_ptr<TConfiguration> configuration_;
283 long int remainingMessageSize_;
284 long int knownMessageSize_;
285
getRemainingMessageSize()286 inline long int getRemainingMessageSize() { return remainingMessageSize_; }
setRemainingMessageSize(long int remainingMessageSize)287 inline void setRemainingMessageSize(long int remainingMessageSize) { remainingMessageSize_ = remainingMessageSize; }
getMaxMessageSize()288 inline int getMaxMessageSize() { return configuration_->getMaxMessageSize(); }
getKnownMessageSize()289 inline long int getKnownMessageSize() { return knownMessageSize_; }
setKnownMessageSize(long int knownMessageSize)290 void setKnownMessageSize(long int knownMessageSize) { knownMessageSize_ = knownMessageSize; }
291
292 /**
293 * Resets RemainingMessageSize to the configured maximum
294 *
295 * @param newSize configured size
296 */
297 void resetConsumedMessageSize(long newSize = -1)
298 {
299 // full reset
300 if (newSize < 0)
301 {
302 knownMessageSize_ = getMaxMessageSize();
303 remainingMessageSize_ = getMaxMessageSize();
304 return;
305 }
306
307 // update only: message size can shrink, but not grow
308 if (newSize > knownMessageSize_)
309 throw TTransportException(TTransportException::END_OF_FILE, "MaxMessageSize reached");
310
311 knownMessageSize_ = newSize;
312 remainingMessageSize_ = newSize;
313 }
314
315 /**
316 * Consumes numBytes from the RemainingMessageSize.
317 *
318 * @param numBytes Consumes numBytes
319 */
countConsumedMessageBytes(long int numBytes)320 void countConsumedMessageBytes(long int numBytes)
321 {
322 if (remainingMessageSize_ >= numBytes)
323 {
324 remainingMessageSize_ -= numBytes;
325 }
326 else
327 {
328 remainingMessageSize_ = 0;
329 throw TTransportException(TTransportException::END_OF_FILE, "MaxMessageSize reached");
330 }
331 }
332 };
333
334 /**
335 * Generic factory class to make an input and output transport out of a
336 * source transport. Commonly used inside servers to make input and output
337 * streams out of raw clients.
338 *
339 */
340 class TTransportFactory {
341 public:
342 TTransportFactory() = default;
343
344 virtual ~TTransportFactory() = default;
345
346 /**
347 * Default implementation does nothing, just returns the transport given.
348 */
getTransport(std::shared_ptr<TTransport> trans)349 virtual std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) {
350 return trans;
351 }
352 };
353 }
354 }
355 } // apache::thrift::transport
356
357 #endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
358