1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 module thrift.transport.buffered; 20 21 import std.algorithm : min; 22 import std.array : empty; 23 import std.exception : enforce; 24 import thrift.transport.base; 25 26 /** 27 * Wraps another transport and buffers reads and writes until the internal 28 * buffers are exhausted, at which point new data is fetched resp. the 29 * accumulated data is written out at once. 30 */ 31 final class TBufferedTransport : TBaseTransport { 32 /** 33 * Constructs a new instance, using the default buffer sizes. 34 * 35 * Params: 36 * transport = The underlying transport to wrap. 37 */ this(TTransport transport)38 this(TTransport transport) { 39 this(transport, DEFAULT_BUFFER_SIZE); 40 } 41 42 /** 43 * Constructs a new instance, using the specified buffer size. 44 * 45 * Params: 46 * transport = The underlying transport to wrap. 47 * bufferSize = The size of the read and write buffers to use, in bytes. 48 */ this(TTransport transport,size_t bufferSize)49 this(TTransport transport, size_t bufferSize) { 50 this(transport, bufferSize, bufferSize); 51 } 52 53 /** 54 * Constructs a new instance, using the specified buffer size. 55 * 56 * Params: 57 * transport = The underlying transport to wrap. 58 * readBufferSize = The size of the read buffer to use, in bytes. 59 * writeBufferSize = The size of the write buffer to use, in bytes. 60 */ this(TTransport transport,size_t readBufferSize,size_t writeBufferSize)61 this(TTransport transport, size_t readBufferSize, size_t writeBufferSize) { 62 transport_ = transport; 63 readBuffer_ = new ubyte[readBufferSize]; 64 writeBuffer_ = new ubyte[writeBufferSize]; 65 writeAvail_ = writeBuffer_; 66 } 67 68 /// The default size of the read/write buffers, in bytes. 69 enum int DEFAULT_BUFFER_SIZE = 512; 70 isOpen()71 override bool isOpen() @property { 72 return transport_.isOpen(); 73 } 74 peek()75 override bool peek() { 76 if (readAvail_.empty) { 77 // If there is nothing available to read, see if we can get something 78 // from the underlying transport. 79 auto bytesRead = transport_.read(readBuffer_); 80 readAvail_ = readBuffer_[0 .. bytesRead]; 81 } 82 83 return !readAvail_.empty; 84 } 85 open()86 override void open() { 87 transport_.open(); 88 } 89 close()90 override void close() { 91 if (!isOpen) return; 92 flush(); 93 transport_.close(); 94 } 95 read(ubyte[]buf)96 override size_t read(ubyte[] buf) { 97 if (readAvail_.empty) { 98 // No data left in our buffer, fetch some from the underlying transport. 99 100 if (buf.length > readBuffer_.length) { 101 // If the amount of data requested is larger than our reading buffer, 102 // directly read to the passed buffer. This probably doesn't occur too 103 // often in practice (and even if it does, the underlying transport 104 // probably cannot fulfill the request at once anyway), but it can't 105 // harm to try… 106 return transport_.read(buf); 107 } 108 109 auto bytesRead = transport_.read(readBuffer_); 110 readAvail_ = readBuffer_[0 .. bytesRead]; 111 } 112 113 // Hand over whatever we have. 114 auto give = min(readAvail_.length, buf.length); 115 buf[0 .. give] = readAvail_[0 .. give]; 116 readAvail_ = readAvail_[give .. $]; 117 return give; 118 } 119 120 /** 121 * Shortcut version of readAll. 122 */ readAll(ubyte[]buf)123 override void readAll(ubyte[] buf) { 124 if (readAvail_.length >= buf.length) { 125 buf[] = readAvail_[0 .. buf.length]; 126 readAvail_ = readAvail_[buf.length .. $]; 127 return; 128 } 129 130 super.readAll(buf); 131 } 132 write(in ubyte[]buf)133 override void write(in ubyte[] buf) { 134 if (writeAvail_.length >= buf.length) { 135 // If the data fits in the buffer, just save it there. 136 writeAvail_[0 .. buf.length] = buf; 137 writeAvail_ = writeAvail_[buf.length .. $]; 138 return; 139 } 140 141 // We have to decide if we copy data from buf to our internal buffer, or 142 // just directly write them out. The same considerations about avoiding 143 // syscalls as for C++ apply here. 144 auto bytesAvail = writeAvail_.ptr - writeBuffer_.ptr; 145 if ((bytesAvail + buf.length >= 2 * writeBuffer_.length) || (bytesAvail == 0)) { 146 // We would immediately need two syscalls anyway (or we don't have 147 // anything) in our buffer to write, so just write out both buffers. 148 if (bytesAvail > 0) { 149 transport_.write(writeBuffer_[0 .. bytesAvail]); 150 writeAvail_ = writeBuffer_; 151 } 152 153 transport_.write(buf); 154 return; 155 } 156 157 // Fill up our internal buffer for a write. 158 writeAvail_[] = buf[0 .. writeAvail_.length]; 159 auto left = buf[writeAvail_.length .. $]; 160 transport_.write(writeBuffer_); 161 162 // Copy the rest into our buffer. 163 writeBuffer_[0 .. left.length] = left[]; 164 writeAvail_ = writeBuffer_[left.length .. $]; 165 } 166 flush()167 override void flush() { 168 // Write out any data waiting in the write buffer. 169 auto bytesAvail = writeAvail_.ptr - writeBuffer_.ptr; 170 if (bytesAvail > 0) { 171 // Note that we reset writeAvail_ prior to calling the underlying protocol 172 // to make sure the buffer is cleared even if the transport throws an 173 // exception. 174 writeAvail_ = writeBuffer_; 175 transport_.write(writeBuffer_[0 .. bytesAvail]); 176 } 177 178 // Flush the underlying transport. 179 transport_.flush(); 180 } 181 borrow(ubyte * buf,size_t len)182 override const(ubyte)[] borrow(ubyte* buf, size_t len) { 183 if (len <= readAvail_.length) { 184 return readAvail_; 185 } 186 return null; 187 } 188 consume(size_t len)189 override void consume(size_t len) { 190 enforce(len <= readBuffer_.length, new TTransportException( 191 "Invalid consume length.", TTransportException.Type.BAD_ARGS)); 192 readAvail_ = readAvail_[len .. $]; 193 } 194 195 /** 196 * The wrapped transport. 197 */ underlyingTransport()198 TTransport underlyingTransport() @property { 199 return transport_; 200 } 201 202 private: 203 TTransport transport_; 204 205 ubyte[] readBuffer_; 206 ubyte[] writeBuffer_; 207 208 ubyte[] readAvail_; 209 ubyte[] writeAvail_; 210 } 211 212 /** 213 * Wraps given transports into TBufferedTransports. 214 */ 215 alias TWrapperTransportFactory!TBufferedTransport TBufferedTransportFactory; 216