1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20 #include <thrift/transport/TTransportUtils.h>
21
22 using std::string;
23
24 namespace apache {
25 namespace thrift {
26 namespace transport {
27
read(uint8_t * buf,uint32_t len)28 uint32_t TPipedTransport::read(uint8_t* buf, uint32_t len) {
29 checkReadBytesAvailable(len);
30 uint32_t need = len;
31
32 // We don't have enough data yet
33 if (rLen_ - rPos_ < need) {
34 // Copy out whatever we have
35 if (rLen_ - rPos_ > 0) {
36 memcpy(buf, rBuf_ + rPos_, rLen_ - rPos_);
37 need -= rLen_ - rPos_;
38 buf += rLen_ - rPos_;
39 rPos_ = rLen_;
40 }
41
42 // Double the size of the underlying buffer if it is full
43 if (rLen_ == rBufSize_) {
44 rBufSize_ *= 2;
45 auto *tmpBuf = (uint8_t*)std::realloc(rBuf_, sizeof(uint8_t) * rBufSize_);
46 if (tmpBuf == nullptr) {
47 throw std::bad_alloc();
48 }
49 rBuf_ = tmpBuf;
50 }
51
52 // try to fill up the buffer
53 rLen_ += srcTrans_->read(rBuf_ + rPos_, rBufSize_ - rPos_);
54 }
55
56 // Hand over whatever we have
57 uint32_t give = need;
58 if (rLen_ - rPos_ < give) {
59 give = rLen_ - rPos_;
60 }
61 if (give > 0) {
62 memcpy(buf, rBuf_ + rPos_, give);
63 rPos_ += give;
64 need -= give;
65 }
66
67 return (len - need);
68 }
69
write(const uint8_t * buf,uint32_t len)70 void TPipedTransport::write(const uint8_t* buf, uint32_t len) {
71 if (len == 0) {
72 return;
73 }
74
75 // Make the buffer as big as it needs to be
76 if ((len + wLen_) >= wBufSize_) {
77 uint32_t newBufSize = wBufSize_ * 2;
78 while ((len + wLen_) >= newBufSize) {
79 newBufSize *= 2;
80 }
81 auto *tmpBuf= (uint8_t*)std::realloc(wBuf_, sizeof(uint8_t) * newBufSize);
82 if (tmpBuf == nullptr) {
83 throw std::bad_alloc();
84 }
85 wBuf_ = tmpBuf;
86
87 wBufSize_ = newBufSize;
88 }
89
90 // Copy into the buffer
91 memcpy(wBuf_ + wLen_, buf, len);
92 wLen_ += len;
93 }
94
flush()95 void TPipedTransport::flush() {
96 // Write out any data waiting in the write buffer
97 if (wLen_ > 0) {
98 srcTrans_->write(wBuf_, wLen_);
99 wLen_ = 0;
100 }
101
102 // Flush the underlying transport
103 srcTrans_->flush();
104 }
105
TPipedFileReaderTransport(std::shared_ptr<TFileReaderTransport> srcTrans,std::shared_ptr<TTransport> dstTrans,std::shared_ptr<TConfiguration> config)106 TPipedFileReaderTransport::TPipedFileReaderTransport(
107 std::shared_ptr<TFileReaderTransport> srcTrans,
108 std::shared_ptr<TTransport> dstTrans,
109 std::shared_ptr<TConfiguration> config)
110 : TPipedTransport(srcTrans, dstTrans, config), srcTrans_(srcTrans) {
111 }
112
113 TPipedFileReaderTransport::~TPipedFileReaderTransport() = default;
114
isOpen() const115 bool TPipedFileReaderTransport::isOpen() const {
116 return TPipedTransport::isOpen();
117 }
118
peek()119 bool TPipedFileReaderTransport::peek() {
120 return TPipedTransport::peek();
121 }
122
open()123 void TPipedFileReaderTransport::open() {
124 TPipedTransport::open();
125 }
126
close()127 void TPipedFileReaderTransport::close() {
128 TPipedTransport::close();
129 }
130
read(uint8_t * buf,uint32_t len)131 uint32_t TPipedFileReaderTransport::read(uint8_t* buf, uint32_t len) {
132 return TPipedTransport::read(buf, len);
133 }
134
readAll(uint8_t * buf,uint32_t len)135 uint32_t TPipedFileReaderTransport::readAll(uint8_t* buf, uint32_t len) {
136 checkReadBytesAvailable(len);
137 uint32_t have = 0;
138 uint32_t get = 0;
139
140 while (have < len) {
141 get = read(buf + have, len - have);
142 if (get <= 0) {
143 throw TEOFException();
144 }
145 have += get;
146 }
147
148 return have;
149 }
150
readEnd()151 uint32_t TPipedFileReaderTransport::readEnd() {
152 return TPipedTransport::readEnd();
153 }
154
write(const uint8_t * buf,uint32_t len)155 void TPipedFileReaderTransport::write(const uint8_t* buf, uint32_t len) {
156 TPipedTransport::write(buf, len);
157 }
158
writeEnd()159 uint32_t TPipedFileReaderTransport::writeEnd() {
160 return TPipedTransport::writeEnd();
161 }
162
flush()163 void TPipedFileReaderTransport::flush() {
164 TPipedTransport::flush();
165 }
166
getReadTimeout()167 int32_t TPipedFileReaderTransport::getReadTimeout() {
168 return srcTrans_->getReadTimeout();
169 }
170
setReadTimeout(int32_t readTimeout)171 void TPipedFileReaderTransport::setReadTimeout(int32_t readTimeout) {
172 srcTrans_->setReadTimeout(readTimeout);
173 }
174
getNumChunks()175 uint32_t TPipedFileReaderTransport::getNumChunks() {
176 return srcTrans_->getNumChunks();
177 }
178
getCurChunk()179 uint32_t TPipedFileReaderTransport::getCurChunk() {
180 return srcTrans_->getCurChunk();
181 }
182
seekToChunk(int32_t chunk)183 void TPipedFileReaderTransport::seekToChunk(int32_t chunk) {
184 srcTrans_->seekToChunk(chunk);
185 }
186
seekToEnd()187 void TPipedFileReaderTransport::seekToEnd() {
188 srcTrans_->seekToEnd();
189 }
190 }
191 }
192 } // apache::thrift::transport
193