1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 #ifndef THRIFT_TRANSPORT_THEADERTRANSPORT_H_ 21 #define THRIFT_TRANSPORT_THEADERTRANSPORT_H_ 1 22 23 #include <bitset> 24 #include <limits> 25 #include <vector> 26 #include <stdexcept> 27 #include <string> 28 #include <map> 29 30 #ifdef HAVE_STDINT_H 31 #include <stdint.h> 32 #elif HAVE_INTTYPES_H 33 #include <inttypes.h> 34 #endif 35 36 #include <thrift/protocol/TProtocolTypes.h> 37 #include <thrift/transport/TBufferTransports.h> 38 #include <thrift/transport/TTransport.h> 39 #include <thrift/transport/TVirtualTransport.h> 40 41 enum CLIENT_TYPE { 42 THRIFT_HEADER_CLIENT_TYPE = 0, 43 THRIFT_FRAMED_BINARY = 1, 44 THRIFT_UNFRAMED_BINARY = 2, 45 THRIFT_FRAMED_COMPACT = 3, 46 THRIFT_UNFRAMED_COMPACT = 4, 47 THRIFT_UNKNOWN_CLIENT_TYPE = 5, 48 }; 49 50 namespace apache { 51 namespace thrift { 52 namespace transport { 53 54 using apache::thrift::protocol::T_COMPACT_PROTOCOL; 55 56 /** 57 * Header transport. All writes go into an in-memory buffer until flush is 58 * called, at which point the transport writes the length of the entire 59 * binary chunk followed by the data payload. This allows the receiver on the 60 * other end to always do fixed-length reads. 61 * 62 * Subclass TFramedTransport because most of the read/write methods are similar 63 * and need similar buffers. Major changes are readFrame & flush. 64 * 65 * Header Transport *must* be the same transport for both input and 66 * output when used on the server side - client responses should be 67 * the same protocol as those in the request. 68 */ 69 class THeaderTransport : public TVirtualTransport<THeaderTransport, TFramedTransport> { 70 public: 71 static const int DEFAULT_BUFFER_SIZE = 512u; 72 static const int THRIFT_MAX_VARINT32_BYTES = 5; 73 74 /// Use default buffer sizes. 75 explicit THeaderTransport(const std::shared_ptr<TTransport>& transport, 76 std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(transport,config)77 : TVirtualTransport(transport, config), 78 outTransport_(transport), 79 protoId(T_COMPACT_PROTOCOL), 80 clientType(THRIFT_HEADER_CLIENT_TYPE), 81 seqId(0), 82 flags(0), 83 tBufSize_(0), 84 tBuf_(nullptr) { 85 if (!transport_) throw std::invalid_argument("transport is empty"); 86 initBuffers(); 87 } 88 89 THeaderTransport(const std::shared_ptr<TTransport> inTransport, 90 const std::shared_ptr<TTransport> outTransport, 91 std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(inTransport,config)92 : TVirtualTransport(inTransport, config), 93 outTransport_(outTransport), 94 protoId(T_COMPACT_PROTOCOL), 95 clientType(THRIFT_HEADER_CLIENT_TYPE), 96 seqId(0), 97 flags(0), 98 tBufSize_(0), 99 tBuf_(nullptr) { 100 if (!transport_) throw std::invalid_argument("inTransport is empty"); 101 if (!outTransport_) throw std::invalid_argument("outTransport is empty"); 102 initBuffers(); 103 } 104 105 uint32_t readSlow(uint8_t* buf, uint32_t len) override; 106 void flush() override; 107 108 void resizeTransformBuffer(uint32_t additionalSize = 0); 109 110 uint16_t getProtocolId() const; setProtocolId(uint16_t protoId)111 void setProtocolId(uint16_t protoId) { this->protoId = protoId; } 112 113 void resetProtocol(); 114 115 /** 116 * We know we got a packet in header format here, try to parse the header 117 * 118 * @param headerSize size of the header portion 119 * @param sz Size of the whole message, including header 120 */ 121 void readHeaderFormat(uint16_t headerSize, uint32_t sz); 122 123 /** 124 * Untransform the data based on the received header flags 125 * On conclusion of function, setReadBuffer is called with the 126 * untransformed data. 127 * 128 * @param ptr ptr to data 129 * @param size of data 130 */ 131 void untransform(uint8_t* ptr, uint32_t sz); 132 133 /** 134 * Transform the data based on our write transform flags 135 * At conclusion of function the write buffer is set to the 136 * transformed data. 137 * 138 * @param ptr Ptr to data to transform 139 * @param sz Size of data buffer 140 */ 141 void transform(uint8_t* ptr, uint32_t sz); 142 getNumTransforms()143 uint16_t getNumTransforms() const { 144 return safe_numeric_cast<uint16_t>(writeTrans_.size()); 145 } 146 setTransform(uint16_t transId)147 void setTransform(uint16_t transId) { writeTrans_.push_back(transId); } 148 149 // Info headers 150 151 typedef std::map<std::string, std::string> StringToStringMap; 152 153 // these work with write headers 154 void setHeader(const std::string& key, const std::string& value); 155 156 void clearHeaders(); 157 getWriteHeaders()158 StringToStringMap& getWriteHeaders() { return writeHeaders_; } 159 160 // these work with read headers getHeaders()161 const StringToStringMap& getHeaders() const { return readHeaders_; } 162 163 // accessors for seqId getSequenceNumber()164 int32_t getSequenceNumber() const { return seqId; } setSequenceNumber(int32_t seqId)165 void setSequenceNumber(int32_t seqId) { this->seqId = seqId; } 166 167 enum TRANSFORMS { 168 ZLIB_TRANSFORM = 0x01, 169 }; 170 171 protected: 172 /** 173 * Reads a frame of input from the underlying stream. 174 * 175 * Returns true if a frame was read successfully, or false on EOF. 176 * (Raises a TTransportException if EOF occurs after a partial frame.) 177 */ 178 bool readFrame() override; 179 180 void ensureReadBuffer(uint32_t sz); 181 uint32_t getWriteBytes(); 182 initBuffers()183 void initBuffers() { 184 setReadBuffer(nullptr, 0); 185 setWriteBuffer(wBuf_.get(), wBufSize_); 186 } 187 188 std::shared_ptr<TTransport> outTransport_; 189 190 // 0 and 16th bits must be 0 to differentiate from framed & unframed 191 static const uint32_t HEADER_MAGIC = 0x0FFF0000; 192 static const uint32_t HEADER_MASK = 0xFFFF0000; 193 static const uint32_t FLAGS_MASK = 0x0000FFFF; 194 195 static const uint32_t MAX_FRAME_SIZE = 0x3FFFFFFF; 196 197 int16_t protoId; 198 uint16_t clientType; 199 uint32_t seqId; 200 uint16_t flags; 201 202 std::vector<uint16_t> readTrans_; 203 std::vector<uint16_t> writeTrans_; 204 205 // Map to use for headers 206 StringToStringMap readHeaders_; 207 StringToStringMap writeHeaders_; 208 209 /** 210 * Returns the maximum number of bytes that write k/v headers can take 211 */ 212 uint32_t getMaxWriteHeadersSize() const; 213 214 struct infoIdType { 215 enum idType { 216 // start at 1 to avoid confusing header padding for an infoId 217 KEYVALUE = 1, 218 END // signal the end of infoIds we can handle 219 }; 220 }; 221 222 // Buffers to use for transform processing 223 uint32_t tBufSize_; 224 std::unique_ptr<uint8_t[]> tBuf_; 225 226 void readString(uint8_t*& ptr, /* out */ std::string& str, uint8_t const* headerBoundary); 227 228 void writeString(uint8_t*& ptr, const std::string& str); 229 230 // Varint utils 231 /** 232 * Read an i16 from the wire as a varint. The MSB of each byte is set 233 * if there is another byte to follow. This can read up to 3 bytes. 234 */ 235 uint32_t readVarint16(uint8_t const* ptr, int16_t* i16, uint8_t const* boundary); 236 237 /** 238 * Read an i32 from the wire as a varint. The MSB of each byte is set 239 * if there is another byte to follow. This can read up to 5 bytes. 240 */ 241 uint32_t readVarint32(uint8_t const* ptr, int32_t* i32, uint8_t const* boundary); 242 243 /** 244 * Write an i32 as a varint. Results in 1-5 bytes on the wire. 245 */ 246 uint32_t writeVarint32(int32_t n, uint8_t* pkt); 247 248 /** 249 * Write an i16 as a varint. Results in 1-3 bytes on the wire. 250 */ 251 uint32_t writeVarint16(int16_t n, uint8_t* pkt); 252 }; 253 254 /** 255 * Wraps a transport into a header one. 256 * 257 */ 258 class THeaderTransportFactory : public TTransportFactory { 259 public: 260 THeaderTransportFactory() = default; 261 262 ~THeaderTransportFactory() override = default; 263 264 /** 265 * Wraps the transport into a header one. 266 */ getTransport(std::shared_ptr<TTransport> trans)267 std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override { 268 return std::shared_ptr<TTransport>(new THeaderTransport(trans)); 269 } 270 }; 271 } 272 } 273 } // apache::thrift::transport 274 275 #endif // #ifndef THRIFT_TRANSPORT_THEADERTRANSPORT_H_ 276