1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef THRIFT_TRANSPORT_THEADERTRANSPORT_H_
21 #define THRIFT_TRANSPORT_THEADERTRANSPORT_H_ 1
22 
23 #include <bitset>
24 #include <limits>
25 #include <vector>
26 #include <stdexcept>
27 #include <string>
28 #include <map>
29 
30 #ifdef HAVE_STDINT_H
31 #include <stdint.h>
32 #elif HAVE_INTTYPES_H
33 #include <inttypes.h>
34 #endif
35 
36 #include <thrift/protocol/TProtocolTypes.h>
37 #include <thrift/transport/TBufferTransports.h>
38 #include <thrift/transport/TTransport.h>
39 #include <thrift/transport/TVirtualTransport.h>
40 
41 enum CLIENT_TYPE {
42   THRIFT_HEADER_CLIENT_TYPE = 0,
43   THRIFT_FRAMED_BINARY = 1,
44   THRIFT_UNFRAMED_BINARY = 2,
45   THRIFT_FRAMED_COMPACT = 3,
46   THRIFT_UNFRAMED_COMPACT = 4,
47   THRIFT_UNKNOWN_CLIENT_TYPE = 5,
48 };
49 
50 namespace apache {
51 namespace thrift {
52 namespace transport {
53 
54 using apache::thrift::protocol::T_COMPACT_PROTOCOL;
55 
56 /**
57  * Header transport. All writes go into an in-memory buffer until flush is
58  * called, at which point the transport writes the length of the entire
59  * binary chunk followed by the data payload. This allows the receiver on the
60  * other end to always do fixed-length reads.
61  *
62  * Subclass TFramedTransport because most of the read/write methods are similar
63  * and need similar buffers.  Major changes are readFrame & flush.
64  *
65  * Header Transport *must* be the same transport for both input and
66  * output when used on the server side - client responses should be
67  * the same protocol as those in the request.
68  */
69 class THeaderTransport : public TVirtualTransport<THeaderTransport, TFramedTransport> {
70 public:
71   static const int DEFAULT_BUFFER_SIZE = 512u;
72   static const int THRIFT_MAX_VARINT32_BYTES = 5;
73 
74   /// Use default buffer sizes.
75   explicit THeaderTransport(const std::shared_ptr<TTransport>& transport,
76                             std::shared_ptr<TConfiguration> config = nullptr)
TVirtualTransport(transport,config)77     : TVirtualTransport(transport, config),
78       outTransport_(transport),
79       protoId(T_COMPACT_PROTOCOL),
80       clientType(THRIFT_HEADER_CLIENT_TYPE),
81       seqId(0),
82       flags(0),
83       tBufSize_(0),
84       tBuf_(nullptr) {
85     if (!transport_) throw std::invalid_argument("transport is empty");
86     initBuffers();
87   }
88 
89   THeaderTransport(const std::shared_ptr<TTransport> inTransport,
90                    const std::shared_ptr<TTransport> outTransport,
91                    std::shared_ptr<TConfiguration> config = nullptr)
TVirtualTransport(inTransport,config)92     : TVirtualTransport(inTransport, config),
93       outTransport_(outTransport),
94       protoId(T_COMPACT_PROTOCOL),
95       clientType(THRIFT_HEADER_CLIENT_TYPE),
96       seqId(0),
97       flags(0),
98       tBufSize_(0),
99       tBuf_(nullptr) {
100     if (!transport_) throw std::invalid_argument("inTransport is empty");
101     if (!outTransport_) throw std::invalid_argument("outTransport is empty");
102     initBuffers();
103   }
104 
105   uint32_t readSlow(uint8_t* buf, uint32_t len) override;
106   void flush() override;
107 
108   void resizeTransformBuffer(uint32_t additionalSize = 0);
109 
110   uint16_t getProtocolId() const;
setProtocolId(uint16_t protoId)111   void setProtocolId(uint16_t protoId) { this->protoId = protoId; }
112 
113   void resetProtocol();
114 
115   /**
116    * We know we got a packet in header format here, try to parse the header
117    *
118    * @param headerSize size of the header portion
119    * @param sz Size of the whole message, including header
120    */
121   void readHeaderFormat(uint16_t headerSize, uint32_t sz);
122 
123   /**
124    * Untransform the data based on the received header flags
125    * On conclusion of function, setReadBuffer is called with the
126    * untransformed data.
127    *
128    * @param ptr ptr to data
129    * @param size of data
130    */
131   void untransform(uint8_t* ptr, uint32_t sz);
132 
133   /**
134    * Transform the data based on our write transform flags
135    * At conclusion of function the write buffer is set to the
136    * transformed data.
137    *
138    * @param ptr Ptr to data to transform
139    * @param sz Size of data buffer
140    */
141   void transform(uint8_t* ptr, uint32_t sz);
142 
getNumTransforms()143   uint16_t getNumTransforms() const {
144     return safe_numeric_cast<uint16_t>(writeTrans_.size());
145   }
146 
setTransform(uint16_t transId)147   void setTransform(uint16_t transId) { writeTrans_.push_back(transId); }
148 
149   // Info headers
150 
151   typedef std::map<std::string, std::string> StringToStringMap;
152 
153   // these work with write headers
154   void setHeader(const std::string& key, const std::string& value);
155 
156   void clearHeaders();
157 
getWriteHeaders()158   StringToStringMap& getWriteHeaders() { return writeHeaders_; }
159 
160   // these work with read headers
getHeaders()161   const StringToStringMap& getHeaders() const { return readHeaders_; }
162 
163   // accessors for seqId
getSequenceNumber()164   int32_t getSequenceNumber() const { return seqId; }
setSequenceNumber(int32_t seqId)165   void setSequenceNumber(int32_t seqId) { this->seqId = seqId; }
166 
167   enum TRANSFORMS {
168     ZLIB_TRANSFORM = 0x01,
169   };
170 
171 protected:
172   /**
173    * Reads a frame of input from the underlying stream.
174    *
175    * Returns true if a frame was read successfully, or false on EOF.
176    * (Raises a TTransportException if EOF occurs after a partial frame.)
177    */
178   bool readFrame() override;
179 
180   void ensureReadBuffer(uint32_t sz);
181   uint32_t getWriteBytes();
182 
initBuffers()183   void initBuffers() {
184     setReadBuffer(nullptr, 0);
185     setWriteBuffer(wBuf_.get(), wBufSize_);
186   }
187 
188   std::shared_ptr<TTransport> outTransport_;
189 
190   // 0 and 16th bits must be 0 to differentiate from framed & unframed
191   static const uint32_t HEADER_MAGIC = 0x0FFF0000;
192   static const uint32_t HEADER_MASK = 0xFFFF0000;
193   static const uint32_t FLAGS_MASK = 0x0000FFFF;
194 
195   static const uint32_t MAX_FRAME_SIZE = 0x3FFFFFFF;
196 
197   int16_t protoId;
198   uint16_t clientType;
199   uint32_t seqId;
200   uint16_t flags;
201 
202   std::vector<uint16_t> readTrans_;
203   std::vector<uint16_t> writeTrans_;
204 
205   // Map to use for headers
206   StringToStringMap readHeaders_;
207   StringToStringMap writeHeaders_;
208 
209   /**
210    * Returns the maximum number of bytes that write k/v headers can take
211    */
212   uint32_t getMaxWriteHeadersSize() const;
213 
214   struct infoIdType {
215     enum idType {
216       // start at 1 to avoid confusing header padding for an infoId
217       KEYVALUE = 1,
218       END // signal the end of infoIds we can handle
219     };
220   };
221 
222   // Buffers to use for transform processing
223   uint32_t tBufSize_;
224   std::unique_ptr<uint8_t[]> tBuf_;
225 
226   void readString(uint8_t*& ptr, /* out */ std::string& str, uint8_t const* headerBoundary);
227 
228   void writeString(uint8_t*& ptr, const std::string& str);
229 
230   // Varint utils
231   /**
232    * Read an i16 from the wire as a varint. The MSB of each byte is set
233    * if there is another byte to follow. This can read up to 3 bytes.
234    */
235   uint32_t readVarint16(uint8_t const* ptr, int16_t* i16, uint8_t const* boundary);
236 
237   /**
238    * Read an i32 from the wire as a varint. The MSB of each byte is set
239    * if there is another byte to follow. This can read up to 5 bytes.
240    */
241   uint32_t readVarint32(uint8_t const* ptr, int32_t* i32, uint8_t const* boundary);
242 
243   /**
244    * Write an i32 as a varint. Results in 1-5 bytes on the wire.
245    */
246   uint32_t writeVarint32(int32_t n, uint8_t* pkt);
247 
248   /**
249    * Write an i16 as a varint. Results in 1-3 bytes on the wire.
250    */
251   uint32_t writeVarint16(int16_t n, uint8_t* pkt);
252 };
253 
254 /**
255  * Wraps a transport into a header one.
256  *
257  */
258 class THeaderTransportFactory : public TTransportFactory {
259 public:
260   THeaderTransportFactory() = default;
261 
262   ~THeaderTransportFactory() override = default;
263 
264   /**
265    * Wraps the transport into a header one.
266    */
getTransport(std::shared_ptr<TTransport> trans)267   std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override {
268     return std::shared_ptr<TTransport>(new THeaderTransport(trans));
269   }
270 };
271 }
272 }
273 } // apache::thrift::transport
274 
275 #endif // #ifndef THRIFT_TRANSPORT_THEADERTRANSPORT_H_
276