1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 module thrift.transport.buffered;
20 
21 import std.algorithm : min;
22 import std.array : empty;
23 import std.exception : enforce;
24 import thrift.transport.base;
25 
26 /**
27  * Wraps another transport and buffers reads and writes until the internal
28  * buffers are exhausted, at which point new data is fetched resp. the
29  * accumulated data is written out at once.
30  */
31 final class TBufferedTransport : TBaseTransport {
32   /**
33    * Constructs a new instance, using the default buffer sizes.
34    *
35    * Params:
36    *   transport = The underlying transport to wrap.
37    */
this(TTransport transport)38   this(TTransport transport) {
39     this(transport, DEFAULT_BUFFER_SIZE);
40   }
41 
42   /**
43    * Constructs a new instance, using the specified buffer size.
44    *
45    * Params:
46    *   transport = The underlying transport to wrap.
47    *   bufferSize = The size of the read and write buffers to use, in bytes.
48    */
this(TTransport transport,size_t bufferSize)49   this(TTransport transport, size_t bufferSize) {
50     this(transport, bufferSize, bufferSize);
51   }
52 
53   /**
54    * Constructs a new instance, using the specified buffer size.
55    *
56    * Params:
57    *   transport = The underlying transport to wrap.
58    *   readBufferSize = The size of the read buffer to use, in bytes.
59    *   writeBufferSize = The size of the write buffer to use, in bytes.
60    */
this(TTransport transport,size_t readBufferSize,size_t writeBufferSize)61   this(TTransport transport, size_t readBufferSize, size_t writeBufferSize) {
62     transport_ = transport;
63     readBuffer_ = new ubyte[readBufferSize];
64     writeBuffer_ = new ubyte[writeBufferSize];
65     writeAvail_ = writeBuffer_;
66   }
67 
68   /// The default size of the read/write buffers, in bytes.
69   enum int DEFAULT_BUFFER_SIZE = 512;
70 
isOpen()71   override bool isOpen() @property {
72     return transport_.isOpen();
73   }
74 
peek()75   override bool peek() {
76     if (readAvail_.empty) {
77       // If there is nothing available to read, see if we can get something
78       // from the underlying transport.
79       auto bytesRead = transport_.read(readBuffer_);
80       readAvail_ = readBuffer_[0 .. bytesRead];
81     }
82 
83     return !readAvail_.empty;
84   }
85 
open()86   override void open() {
87     transport_.open();
88   }
89 
close()90   override void close() {
91     if (!isOpen) return;
92     flush();
93     transport_.close();
94   }
95 
read(ubyte[]buf)96   override size_t read(ubyte[] buf) {
97     if (readAvail_.empty) {
98       // No data left in our buffer, fetch some from the underlying transport.
99 
100       if (buf.length > readBuffer_.length) {
101         // If the amount of data requested is larger than our reading buffer,
102         // directly read to the passed buffer. This probably doesn't occur too
103         // often in practice (and even if it does, the underlying transport
104         // probably cannot fulfill the request at once anyway), but it can't
105         // harm to try…
106         return transport_.read(buf);
107       }
108 
109       auto bytesRead = transport_.read(readBuffer_);
110       readAvail_ = readBuffer_[0 .. bytesRead];
111     }
112 
113     // Hand over whatever we have.
114     auto give = min(readAvail_.length, buf.length);
115     buf[0 .. give] = readAvail_[0 .. give];
116     readAvail_ = readAvail_[give .. $];
117     return give;
118   }
119 
120   /**
121    * Shortcut version of readAll.
122    */
readAll(ubyte[]buf)123   override void readAll(ubyte[] buf) {
124     if (readAvail_.length >= buf.length) {
125       buf[] = readAvail_[0 .. buf.length];
126       readAvail_ = readAvail_[buf.length .. $];
127       return;
128     }
129 
130     super.readAll(buf);
131   }
132 
write(in ubyte[]buf)133   override void write(in ubyte[] buf) {
134     if (writeAvail_.length >= buf.length) {
135       // If the data fits in the buffer, just save it there.
136       writeAvail_[0 .. buf.length] = buf;
137       writeAvail_ = writeAvail_[buf.length .. $];
138       return;
139     }
140 
141     // We have to decide if we copy data from buf to our internal buffer, or
142     // just directly write them out. The same considerations about avoiding
143     // syscalls as for C++ apply here.
144     auto bytesAvail = writeAvail_.ptr - writeBuffer_.ptr;
145     if ((bytesAvail + buf.length >= 2 * writeBuffer_.length) || (bytesAvail == 0)) {
146       // We would immediately need two syscalls anyway (or we don't have
147       // anything) in our buffer to write, so just write out both buffers.
148       if (bytesAvail > 0) {
149         transport_.write(writeBuffer_[0 .. bytesAvail]);
150         writeAvail_ = writeBuffer_;
151       }
152 
153       transport_.write(buf);
154       return;
155     }
156 
157     // Fill up our internal buffer for a write.
158     writeAvail_[] = buf[0 .. writeAvail_.length];
159     auto left = buf[writeAvail_.length .. $];
160     transport_.write(writeBuffer_);
161 
162     // Copy the rest into our buffer.
163     writeBuffer_[0 .. left.length] = left[];
164     writeAvail_ = writeBuffer_[left.length .. $];
165   }
166 
flush()167   override void flush() {
168     // Write out any data waiting in the write buffer.
169     auto bytesAvail = writeAvail_.ptr - writeBuffer_.ptr;
170     if (bytesAvail > 0) {
171       // Note that we reset writeAvail_ prior to calling the underlying protocol
172       // to make sure the buffer is cleared even if the transport throws an
173       // exception.
174       writeAvail_ = writeBuffer_;
175       transport_.write(writeBuffer_[0 .. bytesAvail]);
176     }
177 
178     // Flush the underlying transport.
179     transport_.flush();
180   }
181 
borrow(ubyte * buf,size_t len)182   override const(ubyte)[] borrow(ubyte* buf, size_t len) {
183     if (len <= readAvail_.length) {
184       return readAvail_;
185     }
186     return null;
187   }
188 
consume(size_t len)189   override void consume(size_t len) {
190     enforce(len <= readBuffer_.length, new TTransportException(
191       "Invalid consume length.", TTransportException.Type.BAD_ARGS));
192     readAvail_ = readAvail_[len .. $];
193   }
194 
195   /**
196    * The wrapped transport.
197    */
underlyingTransport()198   TTransport underlyingTransport() @property {
199     return transport_;
200   }
201 
202 private:
203   TTransport transport_;
204 
205   ubyte[] readBuffer_;
206   ubyte[] writeBuffer_;
207 
208   ubyte[] readAvail_;
209   ubyte[] writeAvail_;
210 }
211 
212 /**
213  * Wraps given transports into TBufferedTransports.
214  */
215 alias TWrapperTransportFactory!TBufferedTransport TBufferedTransportFactory;
216