1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20 #ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 21 #define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1 22 23 #include <thrift/transport/TTransport.h> 24 #include <thrift/Thrift.h> 25 #include <thrift/TProcessor.h> 26 27 #include <atomic> 28 #include <string> 29 #include <stdio.h> 30 31 #include <thrift/concurrency/Mutex.h> 32 #include <thrift/concurrency/Monitor.h> 33 #include <thrift/concurrency/ThreadFactory.h> 34 #include <thrift/concurrency/Thread.h> 35 36 namespace apache { 37 namespace thrift { 38 namespace transport { 39 40 using apache::thrift::TProcessor; 41 using apache::thrift::protocol::TProtocolFactory; 42 using apache::thrift::concurrency::Mutex; 43 using apache::thrift::concurrency::Monitor; 44 45 // Data pertaining to a single event 46 typedef struct eventInfo { 47 uint8_t* eventBuff_; 48 uint32_t eventSize_; 49 uint32_t eventBuffPos_; 50 eventInfoeventInfo51 eventInfo() : eventBuff_(nullptr), eventSize_(0), eventBuffPos_(0){}; ~eventInfoeventInfo52 ~eventInfo() { 53 if (eventBuff_) { 54 delete[] eventBuff_; 55 } 56 } 57 } eventInfo; 58 59 // information about current read state 60 typedef struct readState { 61 eventInfo* event_; 62 63 // keep track of event size 64 uint8_t eventSizeBuff_[4]; 65 uint8_t eventSizeBuffPos_; 66 bool readingSize_; 67 68 // read buffer variables 69 int32_t bufferPtr_; 70 int32_t bufferLen_; 71 72 // last successful dispatch point 73 int32_t lastDispatchPtr_; 74 resetStatereadState75 void resetState(uint32_t lastDispatchPtr) { 76 readingSize_ = true; 77 eventSizeBuffPos_ = 0; 78 lastDispatchPtr_ = lastDispatchPtr; 79 } 80 resetAllValuesreadState81 void resetAllValues() { 82 resetState(0); 83 bufferPtr_ = 0; 84 bufferLen_ = 0; 85 if (event_) { 86 delete (event_); 87 } 88 event_ = nullptr; 89 } 90 getEventSizereadState91 inline uint32_t getEventSize() { 92 const void* buffer = reinterpret_cast<const void*>(eventSizeBuff_); 93 return *reinterpret_cast<const uint32_t*>(buffer); 94 } 95 readStatereadState96 readState() { 97 event_ = nullptr; 98 resetAllValues(); 99 } 100 ~readStatereadState101 ~readState() { 102 if (event_) { 103 delete (event_); 104 } 105 } 106 107 } readState; 108 109 /** 110 * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events 111 * to be written to disk. Should be used in the following way: 112 * 1) Buffer created 113 * 2) Buffer written to (addEvent) 114 * 3) Buffer read from (getNext) 115 * 4) Buffer reset (reset) 116 * 5) Go back to 2, or destroy buffer 117 * 118 * The buffer should never be written to after it is read from, unless it is reset first. 119 * Note: The above rules are enforced mainly for debugging its sole client TFileTransport 120 * which uses the buffer in this way. 121 * 122 */ 123 class TFileTransportBuffer { 124 public: 125 TFileTransportBuffer(uint32_t size); 126 ~TFileTransportBuffer(); 127 128 bool addEvent(eventInfo* event); 129 eventInfo* getNext(); 130 void reset(); 131 bool isFull(); 132 bool isEmpty(); 133 134 private: 135 TFileTransportBuffer(); // should not be used 136 137 enum mode { WRITE, READ }; 138 mode bufferMode_; 139 140 uint32_t writePoint_; 141 uint32_t readPoint_; 142 uint32_t size_; 143 eventInfo** buffer_; 144 }; 145 146 /** 147 * Abstract interface for transports used to read files 148 */ 149 class TFileReaderTransport : virtual public TTransport { 150 public: 151 virtual int32_t getReadTimeout() = 0; 152 virtual void setReadTimeout(int32_t readTimeout) = 0; 153 154 virtual uint32_t getNumChunks() = 0; 155 virtual uint32_t getCurChunk() = 0; 156 virtual void seekToChunk(int32_t chunk) = 0; 157 virtual void seekToEnd() = 0; 158 }; 159 160 /** 161 * Abstract interface for transports used to write files 162 */ 163 class TFileWriterTransport : virtual public TTransport { 164 public: 165 virtual uint32_t getChunkSize() = 0; 166 virtual void setChunkSize(uint32_t chunkSize) = 0; 167 }; 168 169 /** 170 * File implementation of a transport. Reads and writes are done to a 171 * file on disk. 172 * 173 */ 174 class TFileTransport : public TFileReaderTransport, public TFileWriterTransport { 175 public: 176 TFileTransport(std::string path, bool readOnly = false, std::shared_ptr<TConfiguration> config = nullptr); 177 ~TFileTransport() override; 178 179 // TODO: what is the correct behaviour for this? 180 // the log file is generally always open isOpen()181 bool isOpen() const override { return true; } 182 183 void write(const uint8_t* buf, uint32_t len); 184 void flush() override; 185 186 uint32_t readAll(uint8_t* buf, uint32_t len); 187 uint32_t read(uint8_t* buf, uint32_t len); 188 bool peek() override; 189 190 // log-file specific functions 191 void seekToChunk(int32_t chunk) override; 192 void seekToEnd() override; 193 uint32_t getNumChunks() override; 194 uint32_t getCurChunk() override; 195 196 // for changing the output file 197 void resetOutputFile(int fd, std::string filename, off_t offset); 198 199 // Setter/Getter functions for user-controllable options setReadBuffSize(uint32_t readBuffSize)200 void setReadBuffSize(uint32_t readBuffSize) { 201 if (readBuffSize) { 202 readBuffSize_ = readBuffSize; 203 } 204 } getReadBuffSize()205 uint32_t getReadBuffSize() { return readBuffSize_; } 206 207 static const int32_t TAIL_READ_TIMEOUT = -1; 208 static const int32_t NO_TAIL_READ_TIMEOUT = 0; setReadTimeout(int32_t readTimeout)209 void setReadTimeout(int32_t readTimeout) override { readTimeout_ = readTimeout; } getReadTimeout()210 int32_t getReadTimeout() override { return readTimeout_; } 211 setChunkSize(uint32_t chunkSize)212 void setChunkSize(uint32_t chunkSize) override { 213 if (chunkSize) { 214 chunkSize_ = chunkSize; 215 } 216 } getChunkSize()217 uint32_t getChunkSize() override { return chunkSize_; } 218 setEventBufferSize(uint32_t bufferSize)219 void setEventBufferSize(uint32_t bufferSize) { 220 if (bufferAndThreadInitialized_) { 221 GlobalOutput("Cannot change the buffer size after writer thread started"); 222 return; 223 } 224 eventBufferSize_ = bufferSize; 225 } 226 getEventBufferSize()227 uint32_t getEventBufferSize() { return eventBufferSize_; } 228 setFlushMaxUs(uint32_t flushMaxUs)229 void setFlushMaxUs(uint32_t flushMaxUs) { 230 if (flushMaxUs) { 231 flushMaxUs_ = flushMaxUs; 232 } 233 } getFlushMaxUs()234 uint32_t getFlushMaxUs() { return flushMaxUs_; } 235 setFlushMaxBytes(uint32_t flushMaxBytes)236 void setFlushMaxBytes(uint32_t flushMaxBytes) { 237 if (flushMaxBytes) { 238 flushMaxBytes_ = flushMaxBytes; 239 } 240 } getFlushMaxBytes()241 uint32_t getFlushMaxBytes() { return flushMaxBytes_; } 242 setMaxEventSize(uint32_t maxEventSize)243 void setMaxEventSize(uint32_t maxEventSize) { maxEventSize_ = maxEventSize; } getMaxEventSize()244 uint32_t getMaxEventSize() { return maxEventSize_; } 245 setMaxCorruptedEvents(uint32_t maxCorruptedEvents)246 void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) { 247 maxCorruptedEvents_ = maxCorruptedEvents; 248 } getMaxCorruptedEvents()249 uint32_t getMaxCorruptedEvents() { return maxCorruptedEvents_; } 250 setEofSleepTimeUs(uint32_t eofSleepTime)251 void setEofSleepTimeUs(uint32_t eofSleepTime) { 252 if (eofSleepTime) { 253 eofSleepTime_ = eofSleepTime; 254 } 255 } getEofSleepTimeUs()256 uint32_t getEofSleepTimeUs() { return eofSleepTime_; } 257 258 /* 259 * Override TTransport *_virt() functions to invoke our implementations. 260 * We cannot use TVirtualTransport to provide these, since we need to inherit 261 * virtually from TTransport. 262 */ read_virt(uint8_t * buf,uint32_t len)263 uint32_t read_virt(uint8_t* buf, uint32_t len) override { return this->read(buf, len); } readAll_virt(uint8_t * buf,uint32_t len)264 uint32_t readAll_virt(uint8_t* buf, uint32_t len) override { return this->readAll(buf, len); } write_virt(const uint8_t * buf,uint32_t len)265 void write_virt(const uint8_t* buf, uint32_t len) override { this->write(buf, len); } 266 267 private: 268 // helper functions for writing to a file 269 void enqueueEvent(const uint8_t* buf, uint32_t eventLen); 270 bool swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline); 271 bool initBufferAndWriteThread(); 272 273 // control for writer thread startWriterThread(void * ptr)274 static void* startWriterThread(void* ptr) { 275 static_cast<TFileTransport*>(ptr)->writerThread(); 276 return nullptr; 277 } 278 void writerThread(); 279 280 // helper functions for reading from a file 281 eventInfo* readEvent(); 282 283 // event corruption-related functions 284 bool isEventCorrupted(); 285 void performRecovery(); 286 287 // Utility functions 288 void openLogFile(); 289 std::chrono::time_point<std::chrono::steady_clock> getNextFlushTime(); 290 291 // Class variables 292 readState readState_; 293 uint8_t* readBuff_; 294 eventInfo* currentEvent_; 295 296 uint32_t readBuffSize_; 297 static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024; 298 299 int32_t readTimeout_; 300 static const int32_t DEFAULT_READ_TIMEOUT_MS = 200; 301 302 // size of chunks that file will be split up into 303 uint32_t chunkSize_; 304 static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024; 305 306 // size of event buffers 307 uint32_t eventBufferSize_; 308 static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000; 309 310 // max number of microseconds that can pass without flushing 311 uint32_t flushMaxUs_; 312 static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000; 313 314 // max number of bytes that can be written without flushing 315 uint32_t flushMaxBytes_; 316 static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024; 317 318 // max event size 319 uint32_t maxEventSize_; 320 static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0; 321 322 // max number of corrupted events per chunk 323 uint32_t maxCorruptedEvents_; 324 static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0; 325 326 // sleep duration when EOF is hit 327 uint32_t eofSleepTime_; 328 static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000; 329 330 // sleep duration when a corrupted event is encountered 331 uint32_t corruptedEventSleepTime_; 332 static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000; 333 334 // sleep duration in seconds when an IO error is encountered in the writer thread 335 uint32_t writerThreadIOErrorSleepTime_; 336 static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000; 337 338 // writer thread 339 apache::thrift::concurrency::ThreadFactory threadFactory_; 340 std::shared_ptr<apache::thrift::concurrency::Thread> writerThread_; 341 342 // buffers to hold data before it is flushed. Each element of the buffer stores a msg that 343 // needs to be written to the file. The buffers are swapped by the writer thread. 344 TFileTransportBuffer* dequeueBuffer_; 345 TFileTransportBuffer* enqueueBuffer_; 346 347 // conditions used to block when the buffer is full or empty 348 Monitor notFull_, notEmpty_; 349 std::atomic<bool> closing_; 350 351 // To keep track of whether the buffer has been flushed 352 Monitor flushed_; 353 std::atomic<bool> forceFlush_; 354 355 // Mutex that is grabbed when enqueueing and swapping the read/write buffers 356 Mutex mutex_; 357 358 // File information 359 std::string filename_; 360 int fd_; 361 362 // Whether the writer thread and buffers have been initialized 363 bool bufferAndThreadInitialized_; 364 365 // Offset within the file 366 off_t offset_; 367 368 // event corruption information 369 uint32_t lastBadChunk_; 370 uint32_t numCorruptedEventsInChunk_; 371 372 bool readOnly_; 373 }; 374 375 // Exception thrown when EOF is hit 376 class TEOFException : public TTransportException { 377 public: TEOFException()378 TEOFException() : TTransportException(TTransportException::END_OF_FILE){}; 379 }; 380 381 // wrapper class to process events from a file containing thrift events 382 class TFileProcessor { 383 public: 384 /** 385 * Constructor that defaults output transport to null transport 386 * 387 * @param processor processes log-file events 388 * @param protocolFactory protocol factory 389 * @param inputTransport file transport 390 */ 391 TFileProcessor(std::shared_ptr<TProcessor> processor, 392 std::shared_ptr<TProtocolFactory> protocolFactory, 393 std::shared_ptr<TFileReaderTransport> inputTransport); 394 395 TFileProcessor(std::shared_ptr<TProcessor> processor, 396 std::shared_ptr<TProtocolFactory> inputProtocolFactory, 397 std::shared_ptr<TProtocolFactory> outputProtocolFactory, 398 std::shared_ptr<TFileReaderTransport> inputTransport); 399 400 /** 401 * Constructor 402 * 403 * @param processor processes log-file events 404 * @param protocolFactory protocol factory 405 * @param inputTransport input file transport 406 * @param output output transport 407 */ 408 TFileProcessor(std::shared_ptr<TProcessor> processor, 409 std::shared_ptr<TProtocolFactory> protocolFactory, 410 std::shared_ptr<TFileReaderTransport> inputTransport, 411 std::shared_ptr<TTransport> outputTransport); 412 413 /** 414 * processes events from the file 415 * 416 * @param numEvents number of events to process (0 for unlimited) 417 * @param tail tails the file if true 418 */ 419 void process(uint32_t numEvents, bool tail); 420 421 /** 422 * process events until the end of the chunk 423 * 424 */ 425 void processChunk(); 426 427 private: 428 std::shared_ptr<TProcessor> processor_; 429 std::shared_ptr<TProtocolFactory> inputProtocolFactory_; 430 std::shared_ptr<TProtocolFactory> outputProtocolFactory_; 431 std::shared_ptr<TFileReaderTransport> inputTransport_; 432 std::shared_ptr<TTransport> outputTransport_; 433 }; 434 } 435 } 436 } // apache::thrift::transport 437 438 #endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 439 440