1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  *   http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #include <thrift/thrift-config.h>
21 
22 #include <stdlib.h>
23 #include <time.h>
24 #ifdef HAVE_SYS_SOCKET_H
25 #include <sys/socket.h>
26 #endif
27 #include <sstream>
28 #include <fstream>
29 
30 #include <boost/mpl/list.hpp>
31 #include <boost/shared_array.hpp>
32 #include <boost/random.hpp>
33 #include <boost/type_traits.hpp>
34 #include <boost/test/unit_test.hpp>
35 #include <boost/version.hpp>
36 
37 #include <thrift/transport/TBufferTransports.h>
38 #include <thrift/transport/TFDTransport.h>
39 #include <thrift/transport/TFileTransport.h>
40 #include <thrift/transport/TZlibTransport.h>
41 #include <thrift/transport/TSocket.h>
42 
43 #include <thrift/concurrency/FunctionRunner.h>
44 #if _WIN32
45 #include <thrift/transport/TPipe.h>
46 #include <thrift/windows/TWinsockSingleton.h>
47 #endif
48 
49 using namespace apache::thrift::transport;
50 using namespace apache::thrift;
51 
52 static boost::mt19937 rng;
53 
initrand(unsigned int seed)54 void initrand(unsigned int seed) {
55   rng.seed(seed);
56 }
57 
58 class SizeGenerator {
59 public:
60   virtual ~SizeGenerator() = default;
61   virtual uint32_t nextSize() = 0;
62   virtual std::string describe() const = 0;
63 };
64 
65 class ConstantSizeGenerator : public SizeGenerator {
66 public:
ConstantSizeGenerator(uint32_t value)67   ConstantSizeGenerator(uint32_t value) : value_(value) {}
nextSize()68   uint32_t nextSize() override { return value_; }
describe() const69   std::string describe() const override {
70     std::ostringstream desc;
71     desc << value_;
72     return desc.str();
73   }
74 
75 private:
76   uint32_t value_;
77 };
78 
79 class RandomSizeGenerator : public SizeGenerator {
80 public:
RandomSizeGenerator(uint32_t min,uint32_t max)81   RandomSizeGenerator(uint32_t min, uint32_t max)
82     : generator_(rng, boost::uniform_int<int>(min, max)) {}
83 
nextSize()84   uint32_t nextSize() override { return generator_(); }
85 
describe() const86   std::string describe() const override {
87     std::ostringstream desc;
88     desc << "rand(" << getMin() << ", " << getMax() << ")";
89     return desc.str();
90   }
91 
getMin() const92   uint32_t getMin() const { return (generator_.distribution().min)(); }
getMax() const93   uint32_t getMax() const { return (generator_.distribution().max)(); }
94 
95 private:
96   boost::variate_generator<boost::mt19937&, boost::uniform_int<int> > generator_;
97 };
98 
99 /**
100  * This class exists solely to make the TEST_RW() macro easier to use.
101  * - it can be constructed implicitly from an integer
102  * - it can contain either a ConstantSizeGenerator or a RandomSizeGenerator
103  *   (TEST_RW can't take a SizeGenerator pointer or reference, since it needs
104  *   to make a copy of the generator to bind it to the test function.)
105  */
106 class GenericSizeGenerator : public SizeGenerator {
107 public:
GenericSizeGenerator(uint32_t value)108   GenericSizeGenerator(uint32_t value) : generator_(new ConstantSizeGenerator(value)) {}
GenericSizeGenerator(uint32_t min,uint32_t max)109   GenericSizeGenerator(uint32_t min, uint32_t max)
110     : generator_(new RandomSizeGenerator(min, max)) {}
111 
nextSize()112   uint32_t nextSize() override { return generator_->nextSize(); }
describe() const113   std::string describe() const override { return generator_->describe(); }
114 
115 private:
116   std::shared_ptr<SizeGenerator> generator_;
117 };
118 
119 /**************************************************************************
120  * Classes to set up coupled transports
121  **************************************************************************/
122 
123 /**
124  * Helper class to represent a coupled pair of transports.
125  *
126  * Data written to the out transport can be read from the in transport.
127  *
128  * This is used as the base class for the various coupled transport
129  * implementations.  It shouldn't be instantiated directly.
130  */
131 template <class Transport_>
132 class CoupledTransports {
133 public:
134   virtual ~CoupledTransports() = default;
135   typedef Transport_ TransportType;
136 
CoupledTransports()137   CoupledTransports() : in(), out() {}
138 
139   std::shared_ptr<Transport_> in;
140   std::shared_ptr<Transport_> out;
141 
142 private:
143   CoupledTransports(const CoupledTransports&) = delete;
144   CoupledTransports& operator=(const CoupledTransports&) = delete;
145 };
146 
147 /**
148  * Coupled TMemoryBuffers
149  */
150 class CoupledMemoryBuffers : public CoupledTransports<TMemoryBuffer> {
151 public:
CoupledMemoryBuffers()152   CoupledMemoryBuffers() : buf(new TMemoryBuffer) {
153     in = buf;
154     out = buf;
155   }
156 
157   std::shared_ptr<TMemoryBuffer> buf;
158 };
159 
160 /**
161  * Helper template class for creating coupled transports that wrap
162  * another transport.
163  */
164 template <class WrapperTransport_, class InnerCoupledTransports_>
165 class CoupledWrapperTransportsT : public CoupledTransports<WrapperTransport_> {
166 public:
CoupledWrapperTransportsT()167   CoupledWrapperTransportsT() {
168     if (inner_.in) {
169       this->in.reset(new WrapperTransport_(inner_.in));
170     }
171     if (inner_.out) {
172       this->out.reset(new WrapperTransport_(inner_.out));
173     }
174   }
175 
176   InnerCoupledTransports_ inner_;
177 };
178 
179 /**
180  * Coupled TBufferedTransports.
181  */
182 template <class InnerTransport_>
183 class CoupledBufferedTransportsT
184     : public CoupledWrapperTransportsT<TBufferedTransport, InnerTransport_> {};
185 
186 typedef CoupledBufferedTransportsT<CoupledMemoryBuffers> CoupledBufferedTransports;
187 
188 /**
189  * Coupled TFramedTransports.
190  */
191 template <class InnerTransport_>
192 class CoupledFramedTransportsT
193     : public CoupledWrapperTransportsT<TFramedTransport, InnerTransport_> {};
194 
195 typedef CoupledFramedTransportsT<CoupledMemoryBuffers> CoupledFramedTransports;
196 
197 /**
198  * Coupled TZlibTransports.
199  */
200 template <class InnerTransport_>
201 class CoupledZlibTransportsT : public CoupledWrapperTransportsT<TZlibTransport, InnerTransport_> {};
202 
203 typedef CoupledZlibTransportsT<CoupledMemoryBuffers> CoupledZlibTransports;
204 
205 #ifndef _WIN32
206 // FD transport doesn't make much sense on Windows.
207 /**
208  * Coupled TFDTransports.
209  */
210 class CoupledFDTransports : public CoupledTransports<TFDTransport> {
211 public:
CoupledFDTransports()212   CoupledFDTransports() {
213     int pipes[2];
214 
215     if (pipe(pipes) != 0) {
216       return;
217     }
218 
219     in.reset(new TFDTransport(pipes[0], TFDTransport::CLOSE_ON_DESTROY));
220     out.reset(new TFDTransport(pipes[1], TFDTransport::CLOSE_ON_DESTROY));
221   }
222 };
223 #else
224 /**
225  * Coupled pipe transports
226  */
227 class CoupledPipeTransports : public CoupledTransports<TPipe> {
228 public:
229   HANDLE hRead;
230   HANDLE hWrite;
231 
CoupledPipeTransports()232   CoupledPipeTransports() {
233     BOOST_REQUIRE(CreatePipe(&hRead, &hWrite, nullptr, 1048576 * 2));
234     in.reset(new TPipe(hRead, hWrite));
235     in->open();
236     out = in;
237   }
238 };
239 #endif
240 
241 /**
242  * Coupled TSockets
243  */
244 class CoupledSocketTransports : public CoupledTransports<TSocket> {
245 public:
CoupledSocketTransports()246   CoupledSocketTransports() {
247     THRIFT_SOCKET sockets[2] = {0};
248     if (THRIFT_SOCKETPAIR(PF_UNIX, SOCK_STREAM, 0, sockets) != 0) {
249       return;
250     }
251 
252     in.reset(new TSocket(sockets[0]));
253     out.reset(new TSocket(sockets[1]));
254     out->setSendTimeout(100);
255   }
256 };
257 
258 // These could be made to work on Windows, but I don't care enough to make it happen
259 #ifndef _WIN32
260 /**
261  * Coupled TFileTransports
262  */
263 class CoupledFileTransports : public CoupledTransports<TFileTransport> {
264 public:
CoupledFileTransports()265   CoupledFileTransports() {
266 #ifndef _WIN32
267     const char* tmp_dir = "/tmp";
268 #define FILENAME_SUFFIX "/thrift.transport_test"
269 #else
270     const char* tmp_dir = getenv("TMP");
271 #define FILENAME_SUFFIX "\\thrift.transport_test"
272 #endif
273 
274     // Create a temporary file to use
275     filename.resize(strlen(tmp_dir) + strlen(FILENAME_SUFFIX));
276     THRIFT_SNPRINTF(&filename[0], filename.size(), "%s" FILENAME_SUFFIX, tmp_dir);
277 #undef FILENAME_SUFFIX
278 
279     { std::ofstream dummy_creation(filename.c_str(), std::ofstream::trunc); }
280 
281     in.reset(new TFileTransport(filename, true));
282     out.reset(new TFileTransport(filename));
283   }
284 
~CoupledFileTransports()285   ~CoupledFileTransports() override { remove(filename.c_str()); }
286 
287   std::string filename;
288 };
289 #endif
290 
291 /**
292  * Wrapper around another CoupledTransports implementation that exposes the
293  * transports as TTransport pointers.
294  *
295  * This is used since accessing a transport via a "TTransport*" exercises a
296  * different code path than using the base pointer class.  As part of the
297  * template code changes, most transport methods are no longer virtual.
298  */
299 template <class CoupledTransports_>
300 class CoupledTTransports : public CoupledTransports<TTransport> {
301 public:
CoupledTTransports()302   CoupledTTransports() : transports() {
303     in = transports.in;
304     out = transports.out;
305   }
306 
307   CoupledTransports_ transports;
308 };
309 
310 /**
311  * Wrapper around another CoupledTransports implementation that exposes the
312  * transports as TBufferBase pointers.
313  *
314  * This can only be instantiated with a transport type that is a subclass of
315  * TBufferBase.
316  */
317 template <class CoupledTransports_>
318 class CoupledBufferBases : public CoupledTransports<TBufferBase> {
319 public:
CoupledBufferBases()320   CoupledBufferBases() : transports() {
321     in = transports.in;
322     out = transports.out;
323   }
324 
325   CoupledTransports_ transports;
326 };
327 
328 /**************************************************************************
329  * Alarm handling code for use in tests that check the transport blocking
330  * semantics.
331  *
332  * If the transport ends up blocking, we don't want to hang forever.  We use
333  * SIGALRM to fire schedule signal to wake up and try to write data so the
334  * transport will unblock.
335  *
336  * It isn't really the safest thing in the world to be mucking around with
337  * complicated global data structures in a signal handler.  It should probably
338  * be okay though, since we know the main thread should always be blocked in a
339  * read() request when the signal handler is running.
340  **************************************************************************/
341 
342 struct TriggerInfo {
TriggerInfoTriggerInfo343   TriggerInfo(int seconds, const std::shared_ptr<TTransport>& transport, uint32_t writeLength)
344     : timeoutSeconds(seconds), transport(transport), writeLength(writeLength), next(nullptr) {}
345 
346   int timeoutSeconds;
347   std::shared_ptr<TTransport> transport;
348   uint32_t writeLength;
349   TriggerInfo* next;
350 };
351 
352 apache::thrift::concurrency::Monitor g_alarm_monitor;
353 TriggerInfo* g_triggerInfo;
354 unsigned int g_numTriggersFired;
355 bool g_teardown = false;
356 
alarm_handler()357 void alarm_handler() {
358   TriggerInfo* info = nullptr;
359   {
360     apache::thrift::concurrency::Synchronized s(g_alarm_monitor);
361     // The alarm timed out, which almost certainly means we're stuck
362     // on a transport that is incorrectly blocked.
363     ++g_numTriggersFired;
364 
365     // Note: we print messages to stdout instead of stderr, since
366     // tools/test/runner only records stdout messages in the failure messages for
367     // boost tests.  (boost prints its test info to stdout.)
368     printf("Timeout alarm expired; attempting to unblock transport\n");
369     if (g_triggerInfo == nullptr) {
370       printf("  trigger stack is empty!\n");
371     }
372 
373     // Pop off the first TriggerInfo.
374     // If there is another one, schedule an alarm for it.
375     info = g_triggerInfo;
376     g_triggerInfo = info->next;
377   }
378 
379   // Write some data to the transport to hopefully unblock it.
380   auto* buf = new uint8_t[info->writeLength];
381   memset(buf, 'b', info->writeLength);
382   std::unique_ptr<uint8_t[]> array(buf);
383   info->transport->write(buf, info->writeLength);
384   info->transport->flush();
385 
386   delete info;
387 }
388 
alarm_handler_wrapper()389 void alarm_handler_wrapper() {
390   int64_t timeout = 0; // timeout of 0 means wait forever
391   while (true) {
392     bool fireHandler = false;
393     {
394       apache::thrift::concurrency::Synchronized s(g_alarm_monitor);
395       if (g_teardown)
396         return;
397       // calculate timeout
398       if (g_triggerInfo == nullptr) {
399         timeout = 0;
400       } else {
401         timeout = g_triggerInfo->timeoutSeconds * 1000;
402       }
403 
404       int waitResult = g_alarm_monitor.waitForTimeRelative(timeout);
405       if (waitResult == THRIFT_ETIMEDOUT)
406         fireHandler = true;
407     }
408     if (fireHandler)
409       alarm_handler(); // calling outside the lock
410   }
411 }
412 
413 /**
414  * Add a trigger to be scheduled "seconds" seconds after the
415  * last currently scheduled trigger.
416  *
417  * (Note that this is not "seconds" from now.  That might be more logical, but
418  * would require slightly more complicated sorting, rather than just appending
419  * to the end.)
420  */
add_trigger(unsigned int seconds,const std::shared_ptr<TTransport> & transport,uint32_t write_len)421 void add_trigger(unsigned int seconds,
422                  const std::shared_ptr<TTransport>& transport,
423                  uint32_t write_len) {
424   auto* info = new TriggerInfo(seconds, transport, write_len);
425   {
426     apache::thrift::concurrency::Synchronized s(g_alarm_monitor);
427     if (g_triggerInfo == nullptr) {
428       // This is the first trigger.
429       // Set g_triggerInfo, and schedule the alarm
430       g_triggerInfo = info;
431       g_alarm_monitor.notify();
432     } else {
433       // Add this trigger to the end of the list
434       TriggerInfo* prev = g_triggerInfo;
435       while (prev->next) {
436         prev = prev->next;
437       }
438       prev->next = info;
439     }
440   }
441 }
442 
clear_triggers()443 void clear_triggers() {
444   TriggerInfo* info = nullptr;
445 
446   {
447     apache::thrift::concurrency::Synchronized s(g_alarm_monitor);
448     info = g_triggerInfo;
449     g_triggerInfo = nullptr;
450     g_numTriggersFired = 0;
451     g_alarm_monitor.notify();
452   }
453 
454   while (info != nullptr) {
455     TriggerInfo* next = info->next;
456     delete info;
457     info = next;
458   }
459 }
460 
set_trigger(unsigned int seconds,const std::shared_ptr<TTransport> & transport,uint32_t write_len)461 void set_trigger(unsigned int seconds,
462                  const std::shared_ptr<TTransport>& transport,
463                  uint32_t write_len) {
464   clear_triggers();
465   add_trigger(seconds, transport, write_len);
466 }
467 
468 /**************************************************************************
469  * Test functions
470  **************************************************************************/
471 
472 /**
473  * Test interleaved write and read calls.
474  *
475  * Generates a buffer totalSize bytes long, then writes it to the transport,
476  * and verifies the written data can be read back correctly.
477  *
478  * Mode of operation:
479  * - call wChunkGenerator to figure out how large of a chunk to write
480  *   - call wSizeGenerator to get the size for individual write() calls,
481  *     and do this repeatedly until the entire chunk is written.
482  * - call rChunkGenerator to figure out how large of a chunk to read
483  *   - call rSizeGenerator to get the size for individual read() calls,
484  *     and do this repeatedly until the entire chunk is read.
485  * - repeat until the full buffer is written and read back,
486  *   then compare the data read back against the original buffer
487  *
488  *
489  * - If any of the size generators return 0, this means to use the maximum
490  *   possible size.
491  *
492  * - If maxOutstanding is non-zero, write chunk sizes will be chosen such that
493  *   there are never more than maxOutstanding bytes waiting to be read back.
494  */
495 template <class CoupledTransports>
test_rw(uint32_t totalSize,SizeGenerator & wSizeGenerator,SizeGenerator & rSizeGenerator,SizeGenerator & wChunkGenerator,SizeGenerator & rChunkGenerator,uint32_t maxOutstanding)496 void test_rw(uint32_t totalSize,
497              SizeGenerator& wSizeGenerator,
498              SizeGenerator& rSizeGenerator,
499              SizeGenerator& wChunkGenerator,
500              SizeGenerator& rChunkGenerator,
501              uint32_t maxOutstanding) {
502   CoupledTransports transports;
503   BOOST_REQUIRE(transports.in != nullptr);
504   BOOST_REQUIRE(transports.out != nullptr);
505 
506   boost::shared_array<uint8_t> wbuf = boost::shared_array<uint8_t>(new uint8_t[totalSize]);
507   boost::shared_array<uint8_t> rbuf = boost::shared_array<uint8_t>(new uint8_t[totalSize]);
508 
509   // store some data in wbuf
510   for (uint32_t n = 0; n < totalSize; ++n) {
511     wbuf[n] = (n & 0xff);
512   }
513   // clear rbuf
514   memset(rbuf.get(), 0, totalSize);
515 
516   uint32_t total_written = 0;
517   uint32_t total_read = 0;
518   while (total_read < totalSize) {
519     // Determine how large a chunk of data to write
520     uint32_t wchunk_size = wChunkGenerator.nextSize();
521     if (wchunk_size == 0 || wchunk_size > totalSize - total_written) {
522       wchunk_size = totalSize - total_written;
523     }
524 
525     // Make sure (total_written - total_read) + wchunk_size
526     // is less than maxOutstanding
527     if (maxOutstanding > 0 && wchunk_size > maxOutstanding - (total_written - total_read)) {
528       wchunk_size = maxOutstanding - (total_written - total_read);
529     }
530 
531     // Write the chunk
532     uint32_t chunk_written = 0;
533     while (chunk_written < wchunk_size) {
534       uint32_t write_size = wSizeGenerator.nextSize();
535       if (write_size == 0 || write_size > wchunk_size - chunk_written) {
536         write_size = wchunk_size - chunk_written;
537       }
538 
539       try {
540         transports.out->write(wbuf.get() + total_written, write_size);
541       } catch (TTransportException& te) {
542         if (te.getType() == TTransportException::TIMED_OUT)
543           break;
544         throw te;
545       }
546       chunk_written += write_size;
547       total_written += write_size;
548     }
549 
550     // Flush the data, so it will be available in the read transport
551     // Don't flush if wchunk_size is 0.  (This should only happen if
552     // total_written == totalSize already, and we're only reading now.)
553     if (wchunk_size > 0) {
554       transports.out->flush();
555     }
556 
557     // Determine how large a chunk of data to read back
558     uint32_t rchunk_size = rChunkGenerator.nextSize();
559     if (rchunk_size == 0 || rchunk_size > total_written - total_read) {
560       rchunk_size = total_written - total_read;
561     }
562 
563     // Read the chunk
564     uint32_t chunk_read = 0;
565     while (chunk_read < rchunk_size) {
566       uint32_t read_size = rSizeGenerator.nextSize();
567       if (read_size == 0 || read_size > rchunk_size - chunk_read) {
568         read_size = rchunk_size - chunk_read;
569       }
570 
571       int bytes_read = -1;
572       try {
573         bytes_read = transports.in->read(rbuf.get() + total_read, read_size);
574       } catch (TTransportException& e) {
575         BOOST_FAIL("read(pos=" << total_read << ", size=" << read_size << ") threw exception \""
576                                << e.what() << "\"; written so far: " << total_written << " / "
577                                << totalSize << " bytes");
578       }
579 
580       BOOST_REQUIRE_MESSAGE(bytes_read > 0,
581                             "read(pos=" << total_read << ", size=" << read_size << ") returned "
582                                         << bytes_read << "; written so far: " << total_written
583                                         << " / " << totalSize << " bytes");
584       chunk_read += bytes_read;
585       total_read += bytes_read;
586     }
587   }
588 
589   // make sure the data read back is identical to the data written
590   BOOST_CHECK_EQUAL(memcmp(rbuf.get(), wbuf.get(), totalSize), 0);
591 }
592 
593 template <class CoupledTransports>
test_read_part_available()594 void test_read_part_available() {
595   CoupledTransports transports;
596   BOOST_REQUIRE(transports.in != nullptr);
597   BOOST_REQUIRE(transports.out != nullptr);
598 
599   uint8_t write_buf[16];
600   uint8_t read_buf[16];
601   memset(write_buf, 'a', sizeof(write_buf));
602 
603   // Attemping to read 10 bytes when only 9 are available should return 9
604   // immediately.
605   transports.out->write(write_buf, 9);
606   transports.out->flush();
607   set_trigger(3, transports.out, 1);
608   uint32_t bytes_read = transports.in->read(read_buf, 10);
609   BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)0);
610   BOOST_CHECK_EQUAL(bytes_read, (uint32_t)9);
611 
612   clear_triggers();
613 }
614 
615 template <class CoupledTransports>
test_read_part_available_in_chunks()616 void test_read_part_available_in_chunks() {
617   CoupledTransports transports;
618   BOOST_REQUIRE(transports.in != nullptr);
619   BOOST_REQUIRE(transports.out != nullptr);
620 
621   uint8_t write_buf[16];
622   uint8_t read_buf[16];
623   memset(write_buf, 'a', sizeof(write_buf));
624 
625   // Write 10 bytes (in a single frame, for transports that use framing)
626   transports.out->write(write_buf, 10);
627   transports.out->flush();
628 
629   // Read 1 byte, to force the transport to read the frame
630   uint32_t bytes_read = transports.in->read(read_buf, 1);
631   BOOST_CHECK_EQUAL(bytes_read, 1u);
632 
633   // Read more than what is remaining and verify the transport does not block
634   set_trigger(3, transports.out, 1);
635   bytes_read = transports.in->read(read_buf, 10);
636   BOOST_CHECK_EQUAL(g_numTriggersFired, 0u);
637   BOOST_CHECK_EQUAL(bytes_read, 9u);
638 
639   clear_triggers();
640 }
641 
642 template <class CoupledTransports>
test_read_partial_midframe()643 void test_read_partial_midframe() {
644   CoupledTransports transports;
645   BOOST_REQUIRE(transports.in != nullptr);
646   BOOST_REQUIRE(transports.out != nullptr);
647 
648   uint8_t write_buf[16];
649   uint8_t read_buf[16];
650   memset(write_buf, 'a', sizeof(write_buf));
651 
652   // Attempt to read 10 bytes, when only 9 are available, but after we have
653   // already read part of the data that is available.  This exercises a
654   // different code path for several of the transports.
655   //
656   // For transports that add their own framing (e.g., TFramedTransport and
657   // TFileTransport), the two flush calls break up the data in to a 10 byte
658   // frame and a 3 byte frame.  The first read then puts us partway through the
659   // first frame, and then we attempt to read past the end of that frame, and
660   // through the next frame, too.
661   //
662   // For buffered transports that perform read-ahead (e.g.,
663   // TBufferedTransport), the read-ahead will most likely see all 13 bytes
664   // written on the first read.  The next read will then attempt to read past
665   // the end of the read-ahead buffer.
666   //
667   // Flush 10 bytes, then 3 bytes.  This creates 2 separate frames for
668   // transports that track framing internally.
669   transports.out->write(write_buf, 10);
670   transports.out->flush();
671   transports.out->write(write_buf, 3);
672   transports.out->flush();
673 
674   // Now read 4 bytes, so that we are partway through the written data.
675   uint32_t bytes_read = transports.in->read(read_buf, 4);
676   BOOST_CHECK_EQUAL(bytes_read, (uint32_t)4);
677 
678   // Now attempt to read 10 bytes.  Only 9 more are available.
679   //
680   // We should be able to get all 9 bytes, but it might take multiple read
681   // calls, since it is valid for read() to return fewer bytes than requested.
682   // (Most transports do immediately return 9 bytes, but the framing transports
683   // tend to only return to the end of the current frame, which is 6 bytes in
684   // this case.)
685   uint32_t total_read = 0;
686   while (total_read < 9) {
687     set_trigger(3, transports.out, 1);
688     bytes_read = transports.in->read(read_buf, 10);
689     BOOST_REQUIRE_EQUAL(g_numTriggersFired, (unsigned int)0);
690     BOOST_REQUIRE_GT(bytes_read, (uint32_t)0);
691     total_read += bytes_read;
692     BOOST_REQUIRE_LE(total_read, (uint32_t)9);
693   }
694 
695   BOOST_CHECK_EQUAL(total_read, (uint32_t)9);
696 
697   clear_triggers();
698 }
699 
700 template <class CoupledTransports>
test_borrow_part_available()701 void test_borrow_part_available() {
702   CoupledTransports transports;
703   BOOST_REQUIRE(transports.in != nullptr);
704   BOOST_REQUIRE(transports.out != nullptr);
705 
706   uint8_t write_buf[16];
707   uint8_t read_buf[16];
708   memset(write_buf, 'a', sizeof(write_buf));
709 
710   // Attemping to borrow 10 bytes when only 9 are available should return nullptr
711   // immediately.
712   transports.out->write(write_buf, 9);
713   transports.out->flush();
714   set_trigger(3, transports.out, 1);
715   uint32_t borrow_len = 10;
716   const uint8_t* borrowed_buf = transports.in->borrow(read_buf, &borrow_len);
717   BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)0);
718   BOOST_CHECK(borrowed_buf == nullptr);
719 
720   clear_triggers();
721 }
722 
723 template <class CoupledTransports>
test_read_none_available()724 void test_read_none_available() {
725   CoupledTransports transports;
726   BOOST_REQUIRE(transports.in != nullptr);
727   BOOST_REQUIRE(transports.out != nullptr);
728 
729   uint8_t read_buf[16];
730 
731   // Attempting to read when no data is available should either block until
732   // some data is available, or fail immediately.  (e.g., TSocket blocks,
733   // TMemoryBuffer just fails.)
734   //
735   // If the transport blocks, it should succeed once some data is available,
736   // even if less than the amount requested becomes available.
737   set_trigger(1, transports.out, 2);
738   add_trigger(1, transports.out, 8);
739   uint32_t bytes_read = transports.in->read(read_buf, 10);
740   if (bytes_read == 0) {
741     BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)0);
742     clear_triggers();
743   } else {
744     BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)1);
745     BOOST_CHECK_EQUAL(bytes_read, (uint32_t)2);
746   }
747 
748   clear_triggers();
749 }
750 
751 template <class CoupledTransports>
test_borrow_none_available()752 void test_borrow_none_available() {
753   CoupledTransports transports;
754   BOOST_REQUIRE(transports.in != nullptr);
755   BOOST_REQUIRE(transports.out != nullptr);
756 
757   uint8_t write_buf[16];
758   memset(write_buf, 'a', sizeof(write_buf));
759 
760   // Attempting to borrow when no data is available should fail immediately
761   set_trigger(1, transports.out, 10);
762   uint32_t borrow_len = 10;
763   const uint8_t* borrowed_buf = transports.in->borrow(nullptr, &borrow_len);
764   BOOST_CHECK(borrowed_buf == nullptr);
765   BOOST_CHECK_EQUAL(g_numTriggersFired, (unsigned int)0);
766 
767   clear_triggers();
768 }
769 
770 /**************************************************************************
771  * Test case generation
772  *
773  * Pretty ugly and annoying.  This would be much easier if we the unit test
774  * framework didn't force each test to be a separate function.
775  * - Writing a completely separate function definition for each of these would
776  *   result in a lot of repetitive boilerplate code.
777  * - Combining many tests into a single function makes it more difficult to
778  *   tell precisely which tests failed.  It also means you can't get a progress
779  *   update after each test, and the tests are already fairly slow.
780  * - Similar registration could be achieved with BOOST_TEST_CASE_TEMPLATE,
781  *   but it requires a lot of awkward MPL code, and results in useless test
782  *   case names.  (The names are generated from std::type_info::name(), which
783  *   is compiler-dependent.  gcc returns mangled names.)
784  **************************************************************************/
785 
786 #define ADD_TEST_RW(CoupledTransports, totalSize, ...)                                             \
787   addTestRW<CoupledTransports>(BOOST_STRINGIZE(CoupledTransports), totalSize, ##__VA_ARGS__);
788 
789 #define TEST_RW(CoupledTransports, totalSize, ...)                                                 \
790   do {                                                                                             \
791     /* Add the test as specified, to test the non-virtual function calls */                        \
792     ADD_TEST_RW(CoupledTransports, totalSize, ##__VA_ARGS__);                                      \
793     /*                                                                                             \
794      * Also test using the transport as a TTransport*, to test                                     \
795      * the read_virt()/write_virt() calls                                                          \
796      */                                                                                            \
797     ADD_TEST_RW(CoupledTTransports<CoupledTransports>, totalSize, ##__VA_ARGS__);                  \
798     /* Test wrapping the transport with TBufferedTransport */                                      \
799     ADD_TEST_RW(CoupledBufferedTransportsT<CoupledTransports>, totalSize, ##__VA_ARGS__);          \
800     /* Test wrapping the transport with TFramedTransports */                                       \
801     ADD_TEST_RW(CoupledFramedTransportsT<CoupledTransports>, totalSize, ##__VA_ARGS__);            \
802     /* Test wrapping the transport with TZlibTransport */                                          \
803     ADD_TEST_RW(CoupledZlibTransportsT<CoupledTransports>, totalSize, ##__VA_ARGS__);              \
804   } while (0)
805 
806 #define ADD_TEST_BLOCKING(CoupledTransports)                                                       \
807   addTestBlocking<CoupledTransports>(BOOST_STRINGIZE(CoupledTransports));
808 
809 #define TEST_BLOCKING_BEHAVIOR(CoupledTransports)                                                  \
810   ADD_TEST_BLOCKING(CoupledTransports);                                                            \
811   ADD_TEST_BLOCKING(CoupledTTransports<CoupledTransports>);                                        \
812   ADD_TEST_BLOCKING(CoupledBufferedTransportsT<CoupledTransports>);                                \
813   ADD_TEST_BLOCKING(CoupledFramedTransportsT<CoupledTransports>);                                  \
814   ADD_TEST_BLOCKING(CoupledZlibTransportsT<CoupledTransports>);
815 
816 class TransportTestGen {
817 public:
TransportTestGen(boost::unit_test::test_suite * suite,float sizeMultiplier)818   TransportTestGen(boost::unit_test::test_suite* suite, float sizeMultiplier)
819     : suite_(suite), sizeMultiplier_(sizeMultiplier) {}
820 
generate()821   void generate() {
822     GenericSizeGenerator rand4k(1, 4096);
823 
824     /*
825      * We do the basically the same set of tests for each transport type,
826      * although we tweak the parameters in some places.
827      */
828 
829     // TMemoryBuffer tests
830     TEST_RW(CoupledMemoryBuffers, 1024 * 1024, 0, 0);
831     TEST_RW(CoupledMemoryBuffers, 1024 * 256, rand4k, rand4k);
832     TEST_RW(CoupledMemoryBuffers, 1024 * 256, 167, 163);
833     TEST_RW(CoupledMemoryBuffers, 1024 * 16, 1, 1);
834 
835     TEST_RW(CoupledMemoryBuffers, 1024 * 256, 0, 0, rand4k, rand4k);
836     TEST_RW(CoupledMemoryBuffers, 1024 * 256, rand4k, rand4k, rand4k, rand4k);
837     TEST_RW(CoupledMemoryBuffers, 1024 * 256, 167, 163, rand4k, rand4k);
838     TEST_RW(CoupledMemoryBuffers, 1024 * 16, 1, 1, rand4k, rand4k);
839 
840     TEST_BLOCKING_BEHAVIOR(CoupledMemoryBuffers);
841 
842 #ifndef _WIN32
843     // TFDTransport tests
844     // Since CoupledFDTransports tests with a pipe, writes will block
845     // if there is too much outstanding unread data in the pipe.
846     uint32_t fd_max_outstanding = 4096;
847     TEST_RW(CoupledFDTransports, 1024 * 1024, 0, 0, 0, 0, fd_max_outstanding);
848     TEST_RW(CoupledFDTransports, 1024 * 256, rand4k, rand4k, 0, 0, fd_max_outstanding);
849     TEST_RW(CoupledFDTransports, 1024 * 256, 167, 163, 0, 0, fd_max_outstanding);
850     TEST_RW(CoupledFDTransports, 1024 * 16, 1, 1, 0, 0, fd_max_outstanding);
851 
852     TEST_RW(CoupledFDTransports, 1024 * 256, 0, 0, rand4k, rand4k, fd_max_outstanding);
853     TEST_RW(CoupledFDTransports, 1024 * 256, rand4k, rand4k, rand4k, rand4k, fd_max_outstanding);
854     TEST_RW(CoupledFDTransports, 1024 * 256, 167, 163, rand4k, rand4k, fd_max_outstanding);
855     TEST_RW(CoupledFDTransports, 1024 * 16, 1, 1, rand4k, rand4k, fd_max_outstanding);
856 
857     TEST_BLOCKING_BEHAVIOR(CoupledFDTransports);
858 #else
859     // TPipe tests (WIN32 only)
860     TEST_RW(CoupledPipeTransports, 1024 * 1024, 0, 0);
861     TEST_RW(CoupledPipeTransports, 1024 * 256, rand4k, rand4k);
862     TEST_RW(CoupledPipeTransports, 1024 * 256, 167, 163);
863     TEST_RW(CoupledPipeTransports, 1024 * 16, 1, 1);
864 
865     TEST_RW(CoupledPipeTransports, 1024 * 256, 0, 0, rand4k, rand4k);
866     TEST_RW(CoupledPipeTransports, 1024 * 256, rand4k, rand4k, rand4k, rand4k);
867     TEST_RW(CoupledPipeTransports, 1024 * 256, 167, 163, rand4k, rand4k);
868     TEST_RW(CoupledPipeTransports, 1024 * 16, 1, 1, rand4k, rand4k);
869 
870     TEST_BLOCKING_BEHAVIOR(CoupledPipeTransports);
871 #endif //_WIN32
872 
873     // TSocket tests
874     uint32_t socket_max_outstanding = 4096;
875     TEST_RW(CoupledSocketTransports, 1024 * 1024, 0, 0, 0, 0, socket_max_outstanding);
876     TEST_RW(CoupledSocketTransports, 1024 * 256, rand4k, rand4k, 0, 0, socket_max_outstanding);
877     TEST_RW(CoupledSocketTransports, 1024 * 256, 167, 163, 0, 0, socket_max_outstanding);
878     // Doh.  Apparently writing to a socket has some additional overhead for
879     // each send() call.  If we have more than ~400 outstanding 1-byte write
880     // requests, additional send() calls start blocking.
881     TEST_RW(CoupledSocketTransports, 1024 * 16, 1, 1, 0, 0, socket_max_outstanding);
882     TEST_RW(CoupledSocketTransports, 1024 * 256, 0, 0, rand4k, rand4k, socket_max_outstanding);
883     TEST_RW(CoupledSocketTransports,
884             1024 * 256,
885             rand4k,
886             rand4k,
887             rand4k,
888             rand4k,
889             socket_max_outstanding);
890     TEST_RW(CoupledSocketTransports, 1024 * 256, 167, 163, rand4k, rand4k, socket_max_outstanding);
891     TEST_RW(CoupledSocketTransports, 1024 * 16, 1, 1, rand4k, rand4k, socket_max_outstanding);
892 
893     TEST_BLOCKING_BEHAVIOR(CoupledSocketTransports);
894 
895 // These could be made to work on Windows, but I don't care enough to make it happen
896 #ifndef _WIN32
897     // TFileTransport tests
898     // We use smaller buffer sizes here, since TFileTransport is fairly slow.
899     //
900     // TFileTransport can't write more than 16MB at once
901     uint32_t max_write_at_once = 1024 * 1024 * 16 - 4;
902     TEST_RW(CoupledFileTransports, 1024 * 1024, max_write_at_once, 0);
903     TEST_RW(CoupledFileTransports, 1024 * 128, rand4k, rand4k);
904     TEST_RW(CoupledFileTransports, 1024 * 128, 167, 163);
905     TEST_RW(CoupledFileTransports, 1024 * 2, 1, 1);
906 
907     TEST_RW(CoupledFileTransports, 1024 * 64, 0, 0, rand4k, rand4k);
908     TEST_RW(CoupledFileTransports, 1024 * 64, rand4k, rand4k, rand4k, rand4k);
909     TEST_RW(CoupledFileTransports, 1024 * 64, 167, 163, rand4k, rand4k);
910     TEST_RW(CoupledFileTransports, 1024 * 2, 1, 1, rand4k, rand4k);
911 
912     TEST_BLOCKING_BEHAVIOR(CoupledFileTransports);
913 #endif
914 
915     // Add some tests that access TBufferedTransport and TFramedTransport
916     // via TTransport pointers and TBufferBase pointers.
917     ADD_TEST_RW(CoupledTTransports<CoupledBufferedTransports>,
918                 1024 * 1024,
919                 rand4k,
920                 rand4k,
921                 rand4k,
922                 rand4k);
923     ADD_TEST_RW(CoupledBufferBases<CoupledBufferedTransports>,
924                 1024 * 1024,
925                 rand4k,
926                 rand4k,
927                 rand4k,
928                 rand4k);
929     ADD_TEST_RW(CoupledTTransports<CoupledFramedTransports>,
930                 1024 * 1024,
931                 rand4k,
932                 rand4k,
933                 rand4k,
934                 rand4k);
935     ADD_TEST_RW(CoupledBufferBases<CoupledFramedTransports>,
936                 1024 * 1024,
937                 rand4k,
938                 rand4k,
939                 rand4k,
940                 rand4k);
941 
942     // Test using TZlibTransport via a TTransport pointer
943     ADD_TEST_RW(CoupledTTransports<CoupledZlibTransports>,
944                 1024 * 1024,
945                 rand4k,
946                 rand4k,
947                 rand4k,
948                 rand4k);
949   }
950 
951 #if (BOOST_VERSION >= 105900)
952 #define MAKE_TEST_CASE(_FUNC, _NAME) boost::unit_test::make_test_case(_FUNC, _NAME, __FILE__, __LINE__)
953 #else
954 #define MAKE_TEST_CASE(_FUNC, _NAME) boost::unit_test::make_test_case(_FUNC, _NAME)
955 #endif
956 
957 private:
958   template <class CoupledTransports>
addTestRW(const char * transport_name,uint32_t totalSize,GenericSizeGenerator wSizeGen,GenericSizeGenerator rSizeGen,GenericSizeGenerator wChunkSizeGen=0,GenericSizeGenerator rChunkSizeGen=0,uint32_t maxOutstanding=0,uint32_t expectedFailures=0)959   void addTestRW(const char* transport_name,
960                  uint32_t totalSize,
961                  GenericSizeGenerator wSizeGen,
962                  GenericSizeGenerator rSizeGen,
963                  GenericSizeGenerator wChunkSizeGen = 0,
964                  GenericSizeGenerator rChunkSizeGen = 0,
965                  uint32_t maxOutstanding = 0,
966                  uint32_t expectedFailures = 0) {
967     // adjust totalSize by the specified sizeMultiplier_ first
968     totalSize = static_cast<uint32_t>(totalSize * sizeMultiplier_);
969 
970     std::ostringstream name;
971     name << transport_name << "::test_rw(" << totalSize << ", " << wSizeGen.describe() << ", "
972          << rSizeGen.describe() << ", " << wChunkSizeGen.describe() << ", "
973          << rChunkSizeGen.describe() << ", " << maxOutstanding << ")";
974 
975 #if (BOOST_VERSION >= 105900)
976     std::function<void ()> test_func
977 #else
978     boost::unit_test::callback0<> test_func
979 #endif
980         = std::bind(test_rw<CoupledTransports>,
981                                        totalSize,
982                                        wSizeGen,
983                                        rSizeGen,
984                                        wChunkSizeGen,
985                                        rChunkSizeGen,
986                                        maxOutstanding);
987     suite_->add(MAKE_TEST_CASE(test_func, name.str()), expectedFailures);
988   }
989 
990   template <class CoupledTransports>
addTestBlocking(const char * transportName,uint32_t expectedFailures=0)991   void addTestBlocking(const char* transportName, uint32_t expectedFailures = 0) {
992     char name[1024];
993 
994     THRIFT_SNPRINTF(name, sizeof(name), "%s::test_read_part_available()", transportName);
995     suite_->add(MAKE_TEST_CASE(test_read_part_available<CoupledTransports>, name), expectedFailures);
996 
997     THRIFT_SNPRINTF(name, sizeof(name), "%s::test_read_part_available_in_chunks()", transportName);
998     suite_->add(MAKE_TEST_CASE(test_read_part_available_in_chunks<CoupledTransports>, name), expectedFailures);
999 
1000     THRIFT_SNPRINTF(name, sizeof(name), "%s::test_read_partial_midframe()", transportName);
1001     suite_->add(MAKE_TEST_CASE(test_read_partial_midframe<CoupledTransports>, name), expectedFailures);
1002 
1003     THRIFT_SNPRINTF(name, sizeof(name), "%s::test_read_none_available()", transportName);
1004     suite_->add(MAKE_TEST_CASE(test_read_none_available<CoupledTransports>, name), expectedFailures);
1005 
1006     THRIFT_SNPRINTF(name, sizeof(name), "%s::test_borrow_part_available()", transportName);
1007     suite_->add(MAKE_TEST_CASE(test_borrow_part_available<CoupledTransports>, name), expectedFailures);
1008 
1009     THRIFT_SNPRINTF(name, sizeof(name), "%s::test_borrow_none_available()", transportName);
1010     suite_->add(MAKE_TEST_CASE(test_borrow_none_available<CoupledTransports>, name), expectedFailures);
1011   }
1012 
1013   boost::unit_test::test_suite* suite_;
1014   // sizeMultiplier_ is configurable via the command line, and allows the
1015   // user to adjust between smaller buffers that can be tested quickly,
1016   // or larger buffers that more thoroughly exercise the code, but take
1017   // longer.
1018   float sizeMultiplier_;
1019 };
1020 
1021 /**************************************************************************
1022  * General Initialization
1023  **************************************************************************/
1024 
1025 struct global_fixture {
1026   std::shared_ptr<apache::thrift::concurrency::Thread> alarmThread_;
global_fixtureglobal_fixture1027   global_fixture() {
1028 #if _WIN32
1029     apache::thrift::transport::TWinsockSingleton::create();
1030 #endif
1031 
1032     apache::thrift::concurrency::ThreadFactory factory;
1033     factory.setDetached(false);
1034 
1035     alarmThread_ = factory.newThread(
1036         apache::thrift::concurrency::FunctionRunner::create(alarm_handler_wrapper));
1037     alarmThread_->start();
1038   }
~global_fixtureglobal_fixture1039   ~global_fixture() {
1040     {
1041       apache::thrift::concurrency::Synchronized s(g_alarm_monitor);
1042       g_teardown = true;
1043       g_alarm_monitor.notify();
1044     }
1045     alarmThread_->join();
1046   }
1047 };
1048 
1049 #if (BOOST_VERSION >= 105900)
1050 BOOST_GLOBAL_FIXTURE(global_fixture);
1051 #else
BOOST_GLOBAL_FIXTURE(global_fixture)1052 BOOST_GLOBAL_FIXTURE(global_fixture)
1053 #endif
1054 
1055 #ifdef BOOST_TEST_DYN_LINK
1056 bool init_unit_test_suite() {
1057   struct timeval tv;
1058   THRIFT_GETTIMEOFDAY(&tv, nullptr);
1059   int seed = tv.tv_sec ^ tv.tv_usec;
1060 
1061   initrand(seed);
1062 
1063   boost::unit_test::test_suite* suite = &boost::unit_test::framework::master_test_suite();
1064   suite->p_name.value = "TransportTest";
1065   TransportTestGen transport_test_generator(suite, 1);
1066   transport_test_generator.generate();
1067   return true;
1068 }
1069 
main(int argc,char * argv[])1070 int main( int argc, char* argv[] ) {
1071   return ::boost::unit_test::unit_test_main(&init_unit_test_suite,argc,argv);
1072 }
1073 #else
1074 boost::unit_test::test_suite* init_unit_test_suite(int argc, char* argv[]) {
1075   THRIFT_UNUSED_VARIABLE(argc);
1076   THRIFT_UNUSED_VARIABLE(argv);
1077   struct timeval tv;
1078   THRIFT_GETTIMEOFDAY(&tv, nullptr);
1079   int seed = tv.tv_sec ^ tv.tv_usec;
1080 
1081   initrand(seed);
1082 
1083   boost::unit_test::test_suite* suite = &boost::unit_test::framework::master_test_suite();
1084   suite->p_name.value = "TransportTest";
1085   TransportTestGen transport_test_generator(suite, 1);
1086   transport_test_generator.generate();
1087   return nullptr;
1088 }
1089 #endif
1090