1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 21 #define _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 1 22 23 #include <cstdlib> 24 #include <cstring> 25 #include <limits> 26 27 #include <thrift/transport/TTransport.h> 28 #include <thrift/transport/TVirtualTransport.h> 29 30 #ifdef __GNUC__ 31 #define TDB_LIKELY(val) (__builtin_expect((val), 1)) 32 #define TDB_UNLIKELY(val) (__builtin_expect((val), 0)) 33 #else 34 #define TDB_LIKELY(val) (val) 35 #define TDB_UNLIKELY(val) (val) 36 #endif 37 38 namespace apache { 39 namespace thrift { 40 namespace transport { 41 42 /** 43 * Base class for all transports that use read/write buffers for performance. 44 * 45 * TBufferBase is designed to implement the fast-path "memcpy" style 46 * operations that work in the common case. It does so with small and 47 * (eventually) nonvirtual, inlinable methods. TBufferBase is an abstract 48 * class. Subclasses are expected to define the "slow path" operations 49 * that have to be done when the buffers are full or empty. 50 * 51 */ 52 class TBufferBase : public TVirtualTransport<TBufferBase> { 53 54 public: 55 /** 56 * Fast-path read. 57 * 58 * When we have enough data buffered to fulfill the read, we can satisfy it 59 * with a single memcpy, then adjust our internal pointers. If the buffer 60 * is empty, we call out to our slow path, implemented by a subclass. 61 * This method is meant to eventually be nonvirtual and inlinable. 62 */ read(uint8_t * buf,uint32_t len)63 uint32_t read(uint8_t* buf, uint32_t len) { 64 checkReadBytesAvailable(len); 65 uint8_t* new_rBase = rBase_ + len; 66 if (TDB_LIKELY(new_rBase <= rBound_)) { 67 std::memcpy(buf, rBase_, len); 68 rBase_ = new_rBase; 69 return len; 70 } 71 return readSlow(buf, len); 72 } 73 74 /** 75 * Shortcutted version of readAll. 76 */ readAll(uint8_t * buf,uint32_t len)77 uint32_t readAll(uint8_t* buf, uint32_t len) { 78 uint8_t* new_rBase = rBase_ + len; 79 if (TDB_LIKELY(new_rBase <= rBound_)) { 80 std::memcpy(buf, rBase_, len); 81 rBase_ = new_rBase; 82 return len; 83 } 84 return apache::thrift::transport::readAll(*this, buf, len); 85 } 86 87 /** 88 * Fast-path write. 89 * 90 * When we have enough empty space in our buffer to accommodate the write, we 91 * can satisfy it with a single memcpy, then adjust our internal pointers. 92 * If the buffer is full, we call out to our slow path, implemented by a 93 * subclass. This method is meant to eventually be nonvirtual and 94 * inlinable. 95 */ write(const uint8_t * buf,uint32_t len)96 void write(const uint8_t* buf, uint32_t len) { 97 uint8_t* new_wBase = wBase_ + len; 98 if (TDB_LIKELY(new_wBase <= wBound_)) { 99 std::memcpy(wBase_, buf, len); 100 wBase_ = new_wBase; 101 return; 102 } 103 writeSlow(buf, len); 104 } 105 106 /** 107 * Fast-path borrow. A lot like the fast-path read. 108 */ borrow(uint8_t * buf,uint32_t * len)109 const uint8_t* borrow(uint8_t* buf, uint32_t* len) { 110 if (TDB_LIKELY(static_cast<ptrdiff_t>(*len) <= rBound_ - rBase_)) { 111 // With strict aliasing, writing to len shouldn't force us to 112 // refetch rBase_ from memory. TODO(dreiss): Verify this. 113 *len = static_cast<uint32_t>(rBound_ - rBase_); 114 return rBase_; 115 } 116 return borrowSlow(buf, len); 117 } 118 119 /** 120 * Consume doesn't require a slow path. 121 */ consume(uint32_t len)122 void consume(uint32_t len) { 123 countConsumedMessageBytes(len); 124 if (TDB_LIKELY(static_cast<ptrdiff_t>(len) <= rBound_ - rBase_)) { 125 rBase_ += len; 126 } else { 127 throw TTransportException(TTransportException::BAD_ARGS, "consume did not follow a borrow."); 128 } 129 } 130 131 protected: 132 /// Slow path read. 133 virtual uint32_t readSlow(uint8_t* buf, uint32_t len) = 0; 134 135 /// Slow path write. 136 virtual void writeSlow(const uint8_t* buf, uint32_t len) = 0; 137 138 /** 139 * Slow path borrow. 140 * 141 * POSTCONDITION: return == nullptr || rBound_ - rBase_ >= *len 142 */ 143 virtual const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) = 0; 144 145 /** 146 * Trivial constructor. 147 * 148 * Initialize pointers safely. Constructing is not a very 149 * performance-sensitive operation, so it is okay to just leave it to 150 * the concrete class to set up pointers correctly. 151 */ 152 TBufferBase(std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)153 : TVirtualTransport(config), rBase_(nullptr), rBound_(nullptr), wBase_(nullptr), wBound_(nullptr) {} 154 155 /// Convenience mutator for setting the read buffer. setReadBuffer(uint8_t * buf,uint32_t len)156 void setReadBuffer(uint8_t* buf, uint32_t len) { 157 rBase_ = buf; 158 rBound_ = buf + len; 159 } 160 161 /// Convenience mutator for setting the write buffer. setWriteBuffer(uint8_t * buf,uint32_t len)162 void setWriteBuffer(uint8_t* buf, uint32_t len) { 163 wBase_ = buf; 164 wBound_ = buf + len; 165 } 166 167 ~TBufferBase() override = default; 168 169 /// Reads begin here. 170 uint8_t* rBase_; 171 /// Reads may extend to just before here. 172 uint8_t* rBound_; 173 174 /// Writes begin here. 175 uint8_t* wBase_; 176 /// Writes may extend to just before here. 177 uint8_t* wBound_; 178 }; 179 180 /** 181 * Buffered transport. For reads it will read more data than is requested 182 * and will serve future data out of a local buffer. For writes, data is 183 * stored to an in memory buffer before being written out. 184 * 185 */ 186 class TBufferedTransport : public TVirtualTransport<TBufferedTransport, TBufferBase> { 187 public: 188 static const int DEFAULT_BUFFER_SIZE = 512; 189 190 /// Use default buffer sizes. 191 TBufferedTransport(std::shared_ptr<TTransport> transport, std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)192 : TVirtualTransport(config), 193 transport_(transport), 194 rBufSize_(DEFAULT_BUFFER_SIZE), 195 wBufSize_(DEFAULT_BUFFER_SIZE), 196 rBuf_(new uint8_t[rBufSize_]), 197 wBuf_(new uint8_t[wBufSize_]) { 198 initPointers(); 199 } 200 201 /// Use specified buffer sizes. 202 TBufferedTransport(std::shared_ptr<TTransport> transport, uint32_t sz, std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)203 : TVirtualTransport(config), 204 transport_(transport), 205 rBufSize_(sz), 206 wBufSize_(sz), 207 rBuf_(new uint8_t[rBufSize_]), 208 wBuf_(new uint8_t[wBufSize_]) { 209 initPointers(); 210 } 211 212 /// Use specified read and write buffer sizes. 213 TBufferedTransport(std::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz, 214 std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)215 : TVirtualTransport(config), 216 transport_(transport), 217 rBufSize_(rsz), 218 wBufSize_(wsz), 219 rBuf_(new uint8_t[rBufSize_]), 220 wBuf_(new uint8_t[wBufSize_]) { 221 initPointers(); 222 } 223 open()224 void open() override { transport_->open(); } 225 isOpen()226 bool isOpen() const override { return transport_->isOpen(); } 227 peek()228 bool peek() override { 229 if (rBase_ == rBound_) { 230 setReadBuffer(rBuf_.get(), transport_->read(rBuf_.get(), rBufSize_)); 231 } 232 return (rBound_ > rBase_); 233 } 234 close()235 void close() override { 236 flush(); 237 transport_->close(); 238 } 239 240 uint32_t readSlow(uint8_t* buf, uint32_t len) override; 241 242 void writeSlow(const uint8_t* buf, uint32_t len) override; 243 244 void flush() override; 245 246 /** 247 * Returns the origin of the underlying transport 248 */ getOrigin()249 const std::string getOrigin() const override { return transport_->getOrigin(); } 250 251 /** 252 * The following behavior is currently implemented by TBufferedTransport, 253 * but that may change in a future version: 254 * 1/ If len is at most rBufSize_, borrow will never return nullptr. 255 * Depending on the underlying transport, it could throw an exception 256 * or hang forever. 257 * 2/ Some borrow requests may copy bytes internally. However, 258 * if len is at most rBufSize_/2, none of the copied bytes 259 * will ever have to be copied again. For optimial performance, 260 * stay under this limit. 261 */ 262 const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override; 263 getUnderlyingTransport()264 std::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; } 265 266 /* 267 * TVirtualTransport provides a default implementation of readAll(). 268 * We want to use the TBufferBase version instead. 269 */ readAll(uint8_t * buf,uint32_t len)270 uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); } 271 272 protected: initPointers()273 void initPointers() { 274 setReadBuffer(rBuf_.get(), 0); 275 setWriteBuffer(wBuf_.get(), wBufSize_); 276 // Write size never changes. 277 } 278 279 std::shared_ptr<TTransport> transport_; 280 281 uint32_t rBufSize_; 282 uint32_t wBufSize_; 283 std::unique_ptr<uint8_t[]> rBuf_; 284 std::unique_ptr<uint8_t[]> wBuf_; 285 }; 286 287 /** 288 * Wraps a transport into a buffered one. 289 * 290 */ 291 class TBufferedTransportFactory : public TTransportFactory { 292 public: 293 TBufferedTransportFactory() = default; 294 295 ~TBufferedTransportFactory() override = default; 296 297 /** 298 * Wraps the transport into a buffered one. 299 */ getTransport(std::shared_ptr<TTransport> trans)300 std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override { 301 return std::shared_ptr<TTransport>(new TBufferedTransport(trans)); 302 } 303 }; 304 305 /** 306 * Framed transport. All writes go into an in-memory buffer until flush is 307 * called, at which point the transport writes the length of the entire 308 * binary chunk followed by the data payload. This allows the receiver on the 309 * other end to always do fixed-length reads. 310 * 311 */ 312 class TFramedTransport : public TVirtualTransport<TFramedTransport, TBufferBase> { 313 public: 314 static const int DEFAULT_BUFFER_SIZE = 512; 315 static const int DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024; 316 317 /// Use default buffer sizes. 318 TFramedTransport(std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)319 : TVirtualTransport(config), 320 transport_(), 321 rBufSize_(0), 322 wBufSize_(DEFAULT_BUFFER_SIZE), 323 rBuf_(), 324 wBuf_(new uint8_t[wBufSize_]), 325 bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()) { 326 initPointers(); 327 } 328 329 TFramedTransport(std::shared_ptr<TTransport> transport, std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)330 : TVirtualTransport(config), 331 transport_(transport), 332 rBufSize_(0), 333 wBufSize_(DEFAULT_BUFFER_SIZE), 334 rBuf_(), 335 wBuf_(new uint8_t[wBufSize_]), 336 bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()), 337 maxFrameSize_(configuration_->getMaxFrameSize()) { 338 initPointers(); 339 } 340 341 TFramedTransport(std::shared_ptr<TTransport> transport, 342 uint32_t sz, 343 uint32_t bufReclaimThresh = (std::numeric_limits<uint32_t>::max)(), 344 std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)345 : TVirtualTransport(config), 346 transport_(transport), 347 rBufSize_(0), 348 wBufSize_(sz), 349 rBuf_(), 350 wBuf_(new uint8_t[wBufSize_]), 351 bufReclaimThresh_(bufReclaimThresh), 352 maxFrameSize_(configuration_->getMaxFrameSize()) { 353 initPointers(); 354 } 355 open()356 void open() override { transport_->open(); } 357 isOpen()358 bool isOpen() const override { return transport_->isOpen(); } 359 peek()360 bool peek() override { return (rBase_ < rBound_) || transport_->peek(); } 361 close()362 void close() override { 363 flush(); 364 transport_->close(); 365 } 366 367 uint32_t readSlow(uint8_t* buf, uint32_t len) override; 368 369 void writeSlow(const uint8_t* buf, uint32_t len) override; 370 371 void flush() override; 372 373 uint32_t readEnd() override; 374 375 uint32_t writeEnd() override; 376 377 const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override; 378 getUnderlyingTransport()379 std::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; } 380 381 /* 382 * TVirtualTransport provides a default implementation of readAll(). 383 * We want to use the TBufferBase version instead. 384 */ 385 using TBufferBase::readAll; 386 387 /** 388 * Returns the origin of the underlying transport 389 */ getOrigin()390 const std::string getOrigin() const override { return transport_->getOrigin(); } 391 392 /** 393 * Set the maximum size of the frame at read 394 */ setMaxFrameSize(uint32_t maxFrameSize)395 void setMaxFrameSize(uint32_t maxFrameSize) { maxFrameSize_ = maxFrameSize; } 396 397 /** 398 * Get the maximum size of the frame at read 399 */ getMaxFrameSize()400 uint32_t getMaxFrameSize() { return maxFrameSize_; } 401 402 protected: 403 /** 404 * Reads a frame of input from the underlying stream. 405 * 406 * Returns true if a frame was read successfully, or false on EOF. 407 * (Raises a TTransportException if EOF occurs after a partial frame.) 408 */ 409 virtual bool readFrame(); 410 initPointers()411 void initPointers() { 412 setReadBuffer(nullptr, 0); 413 setWriteBuffer(wBuf_.get(), wBufSize_); 414 415 // Pad the buffer so we can insert the size later. 416 int32_t pad = 0; 417 this->write((uint8_t*)&pad, sizeof(pad)); 418 } 419 420 std::shared_ptr<TTransport> transport_; 421 422 uint32_t rBufSize_; 423 uint32_t wBufSize_; 424 std::unique_ptr<uint8_t[]> rBuf_; 425 std::unique_ptr<uint8_t[]> wBuf_; 426 uint32_t bufReclaimThresh_; 427 uint32_t maxFrameSize_; 428 }; 429 430 /** 431 * Wraps a transport into a framed one. 432 * 433 */ 434 class TFramedTransportFactory : public TTransportFactory { 435 public: 436 TFramedTransportFactory() = default; 437 438 ~TFramedTransportFactory() override = default; 439 440 /** 441 * Wraps the transport into a framed one. 442 */ getTransport(std::shared_ptr<TTransport> trans)443 std::shared_ptr<TTransport> getTransport(std::shared_ptr<TTransport> trans) override { 444 return std::shared_ptr<TTransport>(new TFramedTransport(trans)); 445 } 446 }; 447 448 /** 449 * A memory buffer is a tranpsort that simply reads from and writes to an 450 * in memory buffer. Anytime you call write on it, the data is simply placed 451 * into a buffer, and anytime you call read, data is read from that buffer. 452 * 453 * The buffers are allocated using C constructs malloc,realloc, and the size 454 * doubles as necessary. We've considered using scoped 455 * 456 */ 457 class TMemoryBuffer : public TVirtualTransport<TMemoryBuffer, TBufferBase> { 458 private: 459 // Common initialization done by all constructors. initCommon(uint8_t * buf,uint32_t size,bool owner,uint32_t wPos)460 void initCommon(uint8_t* buf, uint32_t size, bool owner, uint32_t wPos) { 461 462 maxBufferSize_ = (std::numeric_limits<uint32_t>::max)(); 463 464 if (buf == nullptr && size != 0) { 465 assert(owner); 466 buf = (uint8_t*)std::malloc(size); 467 if (buf == nullptr) { 468 throw std::bad_alloc(); 469 } 470 } 471 472 buffer_ = buf; 473 bufferSize_ = size; 474 475 rBase_ = buffer_; 476 rBound_ = buffer_ + wPos; 477 // TODO(dreiss): Investigate NULL-ing this if !owner. 478 wBase_ = buffer_ + wPos; 479 wBound_ = buffer_ + bufferSize_; 480 481 owner_ = owner; 482 483 // rBound_ is really an artifact. In principle, it should always be 484 // equal to wBase_. We update it in a few places (computeRead, etc.). 485 } 486 487 public: 488 static const uint32_t defaultSize = 1024; 489 490 /** 491 * This enum specifies how a TMemoryBuffer should treat 492 * memory passed to it via constructors or resetBuffer. 493 * 494 * OBSERVE: 495 * TMemoryBuffer will simply store a pointer to the memory. 496 * It is the callers responsibility to ensure that the pointer 497 * remains valid for the lifetime of the TMemoryBuffer, 498 * and that it is properly cleaned up. 499 * Note that no data can be written to observed buffers. 500 * 501 * COPY: 502 * TMemoryBuffer will make an internal copy of the buffer. 503 * The caller has no responsibilities. 504 * 505 * TAKE_OWNERSHIP: 506 * TMemoryBuffer will become the "owner" of the buffer, 507 * and will be responsible for freeing it. 508 * The memory must have been allocated with malloc. 509 */ 510 enum MemoryPolicy { OBSERVE = 1, COPY = 2, TAKE_OWNERSHIP = 3 }; 511 512 /** 513 * Construct a TMemoryBuffer with a default-sized buffer, 514 * owned by the TMemoryBuffer object. 515 */ 516 TMemoryBuffer(std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)517 : TVirtualTransport(config) { 518 initCommon(nullptr, defaultSize, true, 0); 519 } 520 521 /** 522 * Construct a TMemoryBuffer with a buffer of a specified size, 523 * owned by the TMemoryBuffer object. 524 * 525 * @param sz The initial size of the buffer. 526 */ 527 TMemoryBuffer(uint32_t sz, std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)528 : TVirtualTransport(config) { 529 initCommon(nullptr, sz, true, 0); 530 } 531 532 /** 533 * Construct a TMemoryBuffer with buf as its initial contents. 534 * 535 * @param buf The initial contents of the buffer. 536 * Note that, while buf is a non-const pointer, 537 * TMemoryBuffer will not write to it if policy == OBSERVE, 538 * so it is safe to const_cast<uint8_t*>(whatever). 539 * @param sz The size of @c buf. 540 * @param policy See @link MemoryPolicy @endlink . 541 */ 542 TMemoryBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE, std::shared_ptr<TConfiguration> config = nullptr) TVirtualTransport(config)543 : TVirtualTransport(config) { 544 if (buf == nullptr && sz != 0) { 545 throw TTransportException(TTransportException::BAD_ARGS, 546 "TMemoryBuffer given null buffer with non-zero size."); 547 } 548 549 switch (policy) { 550 case OBSERVE: 551 case TAKE_OWNERSHIP: 552 initCommon(buf, sz, policy == TAKE_OWNERSHIP, sz); 553 break; 554 case COPY: 555 initCommon(nullptr, sz, true, 0); 556 this->write(buf, sz); 557 break; 558 default: 559 throw TTransportException(TTransportException::BAD_ARGS, 560 "Invalid MemoryPolicy for TMemoryBuffer"); 561 } 562 } 563 ~TMemoryBuffer()564 ~TMemoryBuffer() override { 565 if (owner_) { 566 std::free(buffer_); 567 } 568 } 569 isOpen()570 bool isOpen() const override { return true; } 571 peek()572 bool peek() override { return (rBase_ < wBase_); } 573 open()574 void open() override {} 575 close()576 void close() override {} 577 578 // TODO(dreiss): Make bufPtr const. getBuffer(uint8_t ** bufPtr,uint32_t * sz)579 void getBuffer(uint8_t** bufPtr, uint32_t* sz) { 580 *bufPtr = rBase_; 581 *sz = static_cast<uint32_t>(wBase_ - rBase_); 582 } 583 getBufferAsString()584 std::string getBufferAsString() { 585 if (buffer_ == nullptr) { 586 return ""; 587 } 588 uint8_t* buf; 589 uint32_t sz; 590 getBuffer(&buf, &sz); 591 return std::string((char*)buf, (std::string::size_type)sz); 592 } 593 appendBufferToString(std::string & str)594 void appendBufferToString(std::string& str) { 595 if (buffer_ == nullptr) { 596 return; 597 } 598 uint8_t* buf; 599 uint32_t sz; 600 getBuffer(&buf, &sz); 601 str.append((char*)buf, sz); 602 } 603 resetBuffer()604 void resetBuffer() { 605 rBase_ = buffer_; 606 rBound_ = buffer_; 607 wBase_ = buffer_; 608 // It isn't safe to write into a buffer we don't own. 609 if (!owner_) { 610 wBound_ = wBase_; 611 bufferSize_ = 0; 612 } 613 } 614 615 /// See constructor documentation. 616 void resetBuffer(uint8_t* buf, uint32_t sz, MemoryPolicy policy = OBSERVE) { 617 // Use a variant of the copy-and-swap trick for assignment operators. 618 // This is sub-optimal in terms of performance for two reasons: 619 // 1/ The constructing and swapping of the (small) values 620 // in the temporary object takes some time, and is not necessary. 621 // 2/ If policy == COPY, we allocate the new buffer before 622 // freeing the old one, precluding the possibility of 623 // reusing that memory. 624 // I doubt that either of these problems could be optimized away, 625 // but the second is probably no a common case, and the first is minor. 626 // I don't expect resetBuffer to be a common operation, so I'm willing to 627 // bite the performance bullet to make the method this simple. 628 629 // Construct the new buffer. 630 TMemoryBuffer new_buffer(buf, sz, policy); 631 // Move it into ourself. 632 this->swap(new_buffer); 633 // Our old self gets destroyed. 634 } 635 636 /// See constructor documentation. resetBuffer(uint32_t sz)637 void resetBuffer(uint32_t sz) { 638 // Construct the new buffer. 639 TMemoryBuffer new_buffer(sz); 640 // Move it into ourself. 641 this->swap(new_buffer); 642 // Our old self gets destroyed. 643 } 644 readAsString(uint32_t len)645 std::string readAsString(uint32_t len) { 646 std::string str; 647 (void)readAppendToString(str, len); 648 return str; 649 } 650 651 uint32_t readAppendToString(std::string& str, uint32_t len); 652 653 // return number of bytes read readEnd()654 uint32_t readEnd() override { 655 // This cast should be safe, because buffer_'s size is a uint32_t 656 auto bytes = static_cast<uint32_t>(rBase_ - buffer_); 657 if (rBase_ == wBase_) { 658 resetBuffer(); 659 } 660 resetConsumedMessageSize(); 661 return bytes; 662 } 663 664 // Return number of bytes written writeEnd()665 uint32_t writeEnd() override { 666 // This cast should be safe, because buffer_'s size is a uint32_t 667 return static_cast<uint32_t>(wBase_ - buffer_); 668 } 669 available_read()670 uint32_t available_read() const { 671 // Remember, wBase_ is the real rBound_. 672 return static_cast<uint32_t>(wBase_ - rBase_); 673 } 674 available_write()675 uint32_t available_write() const { return static_cast<uint32_t>(wBound_ - wBase_); } 676 677 // Returns a pointer to where the client can write data to append to 678 // the TMemoryBuffer, and ensures the buffer is big enough to accommodate a 679 // write of the provided length. The returned pointer is very convenient for 680 // passing to read(), recv(), or similar. You must call wroteBytes() as soon 681 // as data is written or the buffer will not be aware that data has changed. getWritePtr(uint32_t len)682 uint8_t* getWritePtr(uint32_t len) { 683 ensureCanWrite(len); 684 return wBase_; 685 } 686 687 // Informs the buffer that the client has written 'len' bytes into storage 688 // that had been provided by getWritePtr(). 689 void wroteBytes(uint32_t len); 690 691 /* 692 * TVirtualTransport provides a default implementation of readAll(). 693 * We want to use the TBufferBase version instead. 694 */ readAll(uint8_t * buf,uint32_t len)695 uint32_t readAll(uint8_t* buf, uint32_t len) { return TBufferBase::readAll(buf, len); } 696 697 //! \brief Get the current buffer size 698 //! \returns the current buffer size getBufferSize()699 uint32_t getBufferSize() const { 700 return bufferSize_; 701 } 702 703 //! \brief Get the current maximum buffer size 704 //! \returns the current maximum buffer size getMaxBufferSize()705 uint32_t getMaxBufferSize() const { 706 return maxBufferSize_; 707 } 708 709 //! \brief Change the maximum buffer size 710 //! \param[in] maxSize the new maximum buffer size allowed to grow to 711 //! \throws TTransportException(BAD_ARGS) if maxSize is less than the current buffer size setMaxBufferSize(uint32_t maxSize)712 void setMaxBufferSize(uint32_t maxSize) { 713 if (maxSize < bufferSize_) { 714 throw TTransportException(TTransportException::BAD_ARGS, 715 "Maximum buffer size would be less than current buffer size"); 716 } 717 maxBufferSize_ = maxSize; 718 } 719 720 protected: swap(TMemoryBuffer & that)721 void swap(TMemoryBuffer& that) { 722 using std::swap; 723 swap(buffer_, that.buffer_); 724 swap(bufferSize_, that.bufferSize_); 725 726 swap(rBase_, that.rBase_); 727 swap(rBound_, that.rBound_); 728 swap(wBase_, that.wBase_); 729 swap(wBound_, that.wBound_); 730 731 swap(owner_, that.owner_); 732 } 733 734 // Make sure there's at least 'len' bytes available for writing. 735 void ensureCanWrite(uint32_t len); 736 737 // Compute the position and available data for reading. 738 void computeRead(uint32_t len, uint8_t** out_start, uint32_t* out_give); 739 740 uint32_t readSlow(uint8_t* buf, uint32_t len) override; 741 742 void writeSlow(const uint8_t* buf, uint32_t len) override; 743 744 const uint8_t* borrowSlow(uint8_t* buf, uint32_t* len) override; 745 746 // Data buffer 747 uint8_t* buffer_; 748 749 // Allocated buffer size 750 uint32_t bufferSize_; 751 752 // Maximum allowed size 753 uint32_t maxBufferSize_; 754 755 // Is this object the owner of the buffer? 756 bool owner_; 757 758 // Don't forget to update constrctors, initCommon, and swap if 759 // you add new members. 760 }; 761 } 762 } 763 } // apache::thrift::transport 764 765 #endif // #ifndef _THRIFT_TRANSPORT_TBUFFERTRANSPORTS_H_ 766