1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef _THRIFT_TRANSPORT_TFILETRANSPORT_H_
21 #define _THRIFT_TRANSPORT_TFILETRANSPORT_H_ 1
22 
23 #include <thrift/transport/TTransport.h>
24 #include <thrift/Thrift.h>
25 #include <thrift/TProcessor.h>
26 
27 #include <atomic>
28 #include <string>
29 #include <stdio.h>
30 
31 #include <thrift/concurrency/Mutex.h>
32 #include <thrift/concurrency/Monitor.h>
33 #include <thrift/concurrency/ThreadFactory.h>
34 #include <thrift/concurrency/Thread.h>
35 
36 namespace apache {
37 namespace thrift {
38 namespace transport {
39 
40 using apache::thrift::TProcessor;
41 using apache::thrift::protocol::TProtocolFactory;
42 using apache::thrift::concurrency::Mutex;
43 using apache::thrift::concurrency::Monitor;
44 
45 // Data pertaining to a single event
46 typedef struct eventInfo {
47   uint8_t* eventBuff_;
48   uint32_t eventSize_;
49   uint32_t eventBuffPos_;
50 
eventInfoeventInfo51   eventInfo() : eventBuff_(nullptr), eventSize_(0), eventBuffPos_(0){};
~eventInfoeventInfo52   ~eventInfo() {
53     if (eventBuff_) {
54       delete[] eventBuff_;
55     }
56   }
57 } eventInfo;
58 
59 // information about current read state
60 typedef struct readState {
61   eventInfo* event_;
62 
63   // keep track of event size
64   uint8_t eventSizeBuff_[4];
65   uint8_t eventSizeBuffPos_;
66   bool readingSize_;
67 
68   // read buffer variables
69   int32_t bufferPtr_;
70   int32_t bufferLen_;
71 
72   // last successful dispatch point
73   int32_t lastDispatchPtr_;
74 
resetStatereadState75   void resetState(uint32_t lastDispatchPtr) {
76     readingSize_ = true;
77     eventSizeBuffPos_ = 0;
78     lastDispatchPtr_ = lastDispatchPtr;
79   }
80 
resetAllValuesreadState81   void resetAllValues() {
82     resetState(0);
83     bufferPtr_ = 0;
84     bufferLen_ = 0;
85     if (event_) {
86       delete (event_);
87     }
88     event_ = nullptr;
89   }
90 
getEventSizereadState91   inline uint32_t getEventSize() {
92     const void* buffer = reinterpret_cast<const void*>(eventSizeBuff_);
93     return *reinterpret_cast<const uint32_t*>(buffer);
94   }
95 
readStatereadState96   readState() {
97     event_ = nullptr;
98     resetAllValues();
99   }
100 
~readStatereadState101   ~readState() {
102     if (event_) {
103       delete (event_);
104     }
105   }
106 
107 } readState;
108 
109 /**
110  * TFileTransportBuffer - buffer class used by TFileTransport for queueing up events
111  * to be written to disk.  Should be used in the following way:
112  *  1) Buffer created
113  *  2) Buffer written to (addEvent)
114  *  3) Buffer read from (getNext)
115  *  4) Buffer reset (reset)
116  *  5) Go back to 2, or destroy buffer
117  *
118  * The buffer should never be written to after it is read from, unless it is reset first.
119  * Note: The above rules are enforced mainly for debugging its sole client TFileTransport
120  *       which uses the buffer in this way.
121  *
122  */
123 class TFileTransportBuffer {
124 public:
125   TFileTransportBuffer(uint32_t size);
126   ~TFileTransportBuffer();
127 
128   bool addEvent(eventInfo* event);
129   eventInfo* getNext();
130   void reset();
131   bool isFull();
132   bool isEmpty();
133 
134 private:
135   TFileTransportBuffer(); // should not be used
136 
137   enum mode { WRITE, READ };
138   mode bufferMode_;
139 
140   uint32_t writePoint_;
141   uint32_t readPoint_;
142   uint32_t size_;
143   eventInfo** buffer_;
144 };
145 
146 /**
147  * Abstract interface for transports used to read files
148  */
149 class TFileReaderTransport : virtual public TTransport {
150 public:
151   virtual int32_t getReadTimeout() = 0;
152   virtual void setReadTimeout(int32_t readTimeout) = 0;
153 
154   virtual uint32_t getNumChunks() = 0;
155   virtual uint32_t getCurChunk() = 0;
156   virtual void seekToChunk(int32_t chunk) = 0;
157   virtual void seekToEnd() = 0;
158 };
159 
160 /**
161  * Abstract interface for transports used to write files
162  */
163 class TFileWriterTransport : virtual public TTransport {
164 public:
165   virtual uint32_t getChunkSize() = 0;
166   virtual void setChunkSize(uint32_t chunkSize) = 0;
167 };
168 
169 /**
170  * File implementation of a transport. Reads and writes are done to a
171  * file on disk.
172  *
173  */
174 class TFileTransport : public TFileReaderTransport, public TFileWriterTransport {
175 public:
176   TFileTransport(std::string path, bool readOnly = false, std::shared_ptr<TConfiguration> config = nullptr);
177   ~TFileTransport() override;
178 
179   // TODO: what is the correct behaviour for this?
180   // the log file is generally always open
isOpen()181   bool isOpen() const override { return true; }
182 
183   void write(const uint8_t* buf, uint32_t len);
184   void flush() override;
185 
186   uint32_t readAll(uint8_t* buf, uint32_t len);
187   uint32_t read(uint8_t* buf, uint32_t len);
188   bool peek() override;
189 
190   // log-file specific functions
191   void seekToChunk(int32_t chunk) override;
192   void seekToEnd() override;
193   uint32_t getNumChunks() override;
194   uint32_t getCurChunk() override;
195 
196   // for changing the output file
197   void resetOutputFile(int fd, std::string filename, off_t offset);
198 
199   // Setter/Getter functions for user-controllable options
setReadBuffSize(uint32_t readBuffSize)200   void setReadBuffSize(uint32_t readBuffSize) {
201     if (readBuffSize) {
202       readBuffSize_ = readBuffSize;
203     }
204   }
getReadBuffSize()205   uint32_t getReadBuffSize() { return readBuffSize_; }
206 
207   static const int32_t TAIL_READ_TIMEOUT = -1;
208   static const int32_t NO_TAIL_READ_TIMEOUT = 0;
setReadTimeout(int32_t readTimeout)209   void setReadTimeout(int32_t readTimeout) override { readTimeout_ = readTimeout; }
getReadTimeout()210   int32_t getReadTimeout() override { return readTimeout_; }
211 
setChunkSize(uint32_t chunkSize)212   void setChunkSize(uint32_t chunkSize) override {
213     if (chunkSize) {
214       chunkSize_ = chunkSize;
215     }
216   }
getChunkSize()217   uint32_t getChunkSize() override { return chunkSize_; }
218 
setEventBufferSize(uint32_t bufferSize)219   void setEventBufferSize(uint32_t bufferSize) {
220     if (bufferAndThreadInitialized_) {
221       GlobalOutput("Cannot change the buffer size after writer thread started");
222       return;
223     }
224     eventBufferSize_ = bufferSize;
225   }
226 
getEventBufferSize()227   uint32_t getEventBufferSize() { return eventBufferSize_; }
228 
setFlushMaxUs(uint32_t flushMaxUs)229   void setFlushMaxUs(uint32_t flushMaxUs) {
230     if (flushMaxUs) {
231       flushMaxUs_ = flushMaxUs;
232     }
233   }
getFlushMaxUs()234   uint32_t getFlushMaxUs() { return flushMaxUs_; }
235 
setFlushMaxBytes(uint32_t flushMaxBytes)236   void setFlushMaxBytes(uint32_t flushMaxBytes) {
237     if (flushMaxBytes) {
238       flushMaxBytes_ = flushMaxBytes;
239     }
240   }
getFlushMaxBytes()241   uint32_t getFlushMaxBytes() { return flushMaxBytes_; }
242 
setMaxEventSize(uint32_t maxEventSize)243   void setMaxEventSize(uint32_t maxEventSize) { maxEventSize_ = maxEventSize; }
getMaxEventSize()244   uint32_t getMaxEventSize() { return maxEventSize_; }
245 
setMaxCorruptedEvents(uint32_t maxCorruptedEvents)246   void setMaxCorruptedEvents(uint32_t maxCorruptedEvents) {
247     maxCorruptedEvents_ = maxCorruptedEvents;
248   }
getMaxCorruptedEvents()249   uint32_t getMaxCorruptedEvents() { return maxCorruptedEvents_; }
250 
setEofSleepTimeUs(uint32_t eofSleepTime)251   void setEofSleepTimeUs(uint32_t eofSleepTime) {
252     if (eofSleepTime) {
253       eofSleepTime_ = eofSleepTime;
254     }
255   }
getEofSleepTimeUs()256   uint32_t getEofSleepTimeUs() { return eofSleepTime_; }
257 
258   /*
259    * Override TTransport *_virt() functions to invoke our implementations.
260    * We cannot use TVirtualTransport to provide these, since we need to inherit
261    * virtually from TTransport.
262    */
read_virt(uint8_t * buf,uint32_t len)263   uint32_t read_virt(uint8_t* buf, uint32_t len) override { return this->read(buf, len); }
readAll_virt(uint8_t * buf,uint32_t len)264   uint32_t readAll_virt(uint8_t* buf, uint32_t len) override { return this->readAll(buf, len); }
write_virt(const uint8_t * buf,uint32_t len)265   void write_virt(const uint8_t* buf, uint32_t len) override { this->write(buf, len); }
266 
267 private:
268   // helper functions for writing to a file
269   void enqueueEvent(const uint8_t* buf, uint32_t eventLen);
270   bool swapEventBuffers(const std::chrono::time_point<std::chrono::steady_clock> *deadline);
271   bool initBufferAndWriteThread();
272 
273   // control for writer thread
startWriterThread(void * ptr)274   static void* startWriterThread(void* ptr) {
275     static_cast<TFileTransport*>(ptr)->writerThread();
276     return nullptr;
277   }
278   void writerThread();
279 
280   // helper functions for reading from a file
281   eventInfo* readEvent();
282 
283   // event corruption-related functions
284   bool isEventCorrupted();
285   void performRecovery();
286 
287   // Utility functions
288   void openLogFile();
289   std::chrono::time_point<std::chrono::steady_clock> getNextFlushTime();
290 
291   // Class variables
292   readState readState_;
293   uint8_t* readBuff_;
294   eventInfo* currentEvent_;
295 
296   uint32_t readBuffSize_;
297   static const uint32_t DEFAULT_READ_BUFF_SIZE = 1 * 1024 * 1024;
298 
299   int32_t readTimeout_;
300   static const int32_t DEFAULT_READ_TIMEOUT_MS = 200;
301 
302   // size of chunks that file will be split up into
303   uint32_t chunkSize_;
304   static const uint32_t DEFAULT_CHUNK_SIZE = 16 * 1024 * 1024;
305 
306   // size of event buffers
307   uint32_t eventBufferSize_;
308   static const uint32_t DEFAULT_EVENT_BUFFER_SIZE = 10000;
309 
310   // max number of microseconds that can pass without flushing
311   uint32_t flushMaxUs_;
312   static const uint32_t DEFAULT_FLUSH_MAX_US = 3000000;
313 
314   // max number of bytes that can be written without flushing
315   uint32_t flushMaxBytes_;
316   static const uint32_t DEFAULT_FLUSH_MAX_BYTES = 1000 * 1024;
317 
318   // max event size
319   uint32_t maxEventSize_;
320   static const uint32_t DEFAULT_MAX_EVENT_SIZE = 0;
321 
322   // max number of corrupted events per chunk
323   uint32_t maxCorruptedEvents_;
324   static const uint32_t DEFAULT_MAX_CORRUPTED_EVENTS = 0;
325 
326   // sleep duration when EOF is hit
327   uint32_t eofSleepTime_;
328   static const uint32_t DEFAULT_EOF_SLEEP_TIME_US = 500 * 1000;
329 
330   // sleep duration when a corrupted event is encountered
331   uint32_t corruptedEventSleepTime_;
332   static const uint32_t DEFAULT_CORRUPTED_SLEEP_TIME_US = 1 * 1000 * 1000;
333 
334   // sleep duration in seconds when an IO error is encountered in the writer thread
335   uint32_t writerThreadIOErrorSleepTime_;
336   static const uint32_t DEFAULT_WRITER_THREAD_SLEEP_TIME_US = 60 * 1000 * 1000;
337 
338   // writer thread
339   apache::thrift::concurrency::ThreadFactory threadFactory_;
340   std::shared_ptr<apache::thrift::concurrency::Thread> writerThread_;
341 
342   // buffers to hold data before it is flushed. Each element of the buffer stores a msg that
343   // needs to be written to the file.  The buffers are swapped by the writer thread.
344   TFileTransportBuffer* dequeueBuffer_;
345   TFileTransportBuffer* enqueueBuffer_;
346 
347   // conditions used to block when the buffer is full or empty
348   Monitor notFull_, notEmpty_;
349   std::atomic<bool> closing_;
350 
351   // To keep track of whether the buffer has been flushed
352   Monitor flushed_;
353   std::atomic<bool> forceFlush_;
354 
355   // Mutex that is grabbed when enqueueing and swapping the read/write buffers
356   Mutex mutex_;
357 
358   // File information
359   std::string filename_;
360   int fd_;
361 
362   // Whether the writer thread and buffers have been initialized
363   bool bufferAndThreadInitialized_;
364 
365   // Offset within the file
366   off_t offset_;
367 
368   // event corruption information
369   uint32_t lastBadChunk_;
370   uint32_t numCorruptedEventsInChunk_;
371 
372   bool readOnly_;
373 };
374 
375 // Exception thrown when EOF is hit
376 class TEOFException : public TTransportException {
377 public:
TEOFException()378   TEOFException() : TTransportException(TTransportException::END_OF_FILE){};
379 };
380 
381 // wrapper class to process events from a file containing thrift events
382 class TFileProcessor {
383 public:
384   /**
385    * Constructor that defaults output transport to null transport
386    *
387    * @param processor processes log-file events
388    * @param protocolFactory protocol factory
389    * @param inputTransport file transport
390    */
391   TFileProcessor(std::shared_ptr<TProcessor> processor,
392                  std::shared_ptr<TProtocolFactory> protocolFactory,
393                  std::shared_ptr<TFileReaderTransport> inputTransport);
394 
395   TFileProcessor(std::shared_ptr<TProcessor> processor,
396                  std::shared_ptr<TProtocolFactory> inputProtocolFactory,
397                  std::shared_ptr<TProtocolFactory> outputProtocolFactory,
398                  std::shared_ptr<TFileReaderTransport> inputTransport);
399 
400   /**
401    * Constructor
402    *
403    * @param processor processes log-file events
404    * @param protocolFactory protocol factory
405    * @param inputTransport input file transport
406    * @param output output transport
407    */
408   TFileProcessor(std::shared_ptr<TProcessor> processor,
409                  std::shared_ptr<TProtocolFactory> protocolFactory,
410                  std::shared_ptr<TFileReaderTransport> inputTransport,
411                  std::shared_ptr<TTransport> outputTransport);
412 
413   /**
414    * processes events from the file
415    *
416    * @param numEvents number of events to process (0 for unlimited)
417    * @param tail tails the file if true
418    */
419   void process(uint32_t numEvents, bool tail);
420 
421   /**
422    * process events until the end of the chunk
423    *
424    */
425   void processChunk();
426 
427 private:
428   std::shared_ptr<TProcessor> processor_;
429   std::shared_ptr<TProtocolFactory> inputProtocolFactory_;
430   std::shared_ptr<TProtocolFactory> outputProtocolFactory_;
431   std::shared_ptr<TFileReaderTransport> inputTransport_;
432   std::shared_ptr<TTransport> outputTransport_;
433 };
434 }
435 }
436 } // apache::thrift::transport
437 
438 #endif // _THRIFT_TRANSPORT_TFILETRANSPORT_H_
439 
440